From 424585dd2ad476bd93c2f2d3675ca65b340e39bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex?= Date: Sat, 9 Nov 2024 03:54:10 +0100 Subject: [PATCH] Implement message parser and average latency calculator. (#38) * Implement message parser and average latency calculator. * Fix node_id type parsing, add more stats. * Add more stats, slight refactor. * Remove unnecessary class * Improve naming. * Adapt to new log format --- .gitignore | 2 + scripts/latency.py | 130 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 132 insertions(+) create mode 100755 scripts/latency.py diff --git a/.gitignore b/.gitignore index 03f3409..7712c5f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +.venv/ __pycache__/ *.py[cod] *$py.class @@ -10,3 +11,4 @@ Cargo.lock simlib/**/target simlib/**/Cargo.lock simlib/test.json +*.ignore* diff --git a/scripts/latency.py b/scripts/latency.py new file mode 100755 index 0000000..6fd970c --- /dev/null +++ b/scripts/latency.py @@ -0,0 +1,130 @@ +# !/usr/bin/env python +import json +import sys +from collections.abc import Iterable +from typing import Dict, Optional +import statistics +import argparse + +from json_stream.base import TransientStreamingJSONObject + +JsonStream = Iterable[TransientStreamingJSONObject] + + +class Message: + def __init__(self, message_id: str, step_a: Optional[int]): + self.id = message_id + self.step_a = int(step_a) if step_a is not None else None + self.step_b = None + + def __hash__(self): + return self.id + + def __repr__(self): + return f"[{self.id}] {self.step_a} -> {self.step_b}" + + @property + def latency(self) -> Optional[int]: + if self.step_a is not None and self.step_b is not None: + return abs(self.step_a - self.step_b) + + +MessageStorage = Dict[str, Message] + + +def compute_results(message_storage: MessageStorage, step_duration: int) -> str: + latencies = [message_record.latency for message_record in message_storage.values()] + valued_latencies = [latency for latency in latencies if latency is not None] + incomplete_latencies = sum((1 for latency in latencies if latency is None)) + + total_messages = len(latencies) + total_messages_full_latency = len(valued_latencies) + total_messages_incomplete_latency = incomplete_latencies + latency_average_steps = statistics.mean(valued_latencies) + latency_average_ms = "{:.2f}ms".format(latency_average_steps * step_duration) + latency_median_steps = statistics.median(valued_latencies) + latency_median_ms = "{:.2f}ms".format(latency_median_steps * step_duration) + max_latency_steps = max(valued_latencies) + max_latency_ms = "{:.2f}ms".format(max_latency_steps * step_duration) + min_latency_steps = min(valued_latencies) + min_latency_ms = "{:.2f}ms".format(min_latency_steps * step_duration) + + return f"""[Results] +- Total messages: {total_messages} + - Full latencies: {total_messages_full_latency} + - Incomplete latencies: {total_messages_incomplete_latency} +- Averages + - Steps: {latency_average_steps} + - Duration: {latency_average_ms} +- Median + - Steps: {latency_median_steps} + - Duration: {latency_median_ms} +- Max + - Steps: {max_latency_steps} + - Duration: {max_latency_ms} +- Min + - Steps: {min_latency_steps} + - Duration: {min_latency_ms}""" + + +def parse_record_stream(record_stream: Iterable[str]) -> MessageStorage: + storage: MessageStorage = {} + + for record in record_stream: + json_record = json.loads(record) + payload_id = json_record["payload_id"] + step_id = json_record["step_id"] + + if (stored_message := storage.get(payload_id)) is None: + storage[payload_id] = Message(payload_id, step_id) + else: + stored_message.step_b = step_id + + return storage + + +def line_to_json_stream(record_stream: Iterable[str]) -> Iterable[str]: + for record in record_stream: + bracket_pos = record.rfind("{") + yield record[bracket_pos:] + + +def get_pipe_stream() -> Iterable[str]: + yield from sys.stdin + + +def get_file_stream(input_filename) -> Iterable[str]: + with open(input_filename, "r") as file: + yield from file + + +def get_input_stream(input_filename: Optional[str]) -> Iterable[str]: + stream = get_file_stream(input_filename) if input_filename is not None else get_pipe_stream() + return line_to_json_stream(stream) + + +def build_argument_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Log analysis for nomos-simulations.") + parser.add_argument( + "--step-duration", + type=int, + default=100, + help="Duration (in ms) of each step in the simulation." + ) + parser.add_argument( + "input_file", + nargs="?", + help="The file to parse. If not provided, input will be read from stdin." + ) + return parser + + +if __name__ == "__main__": + argument_parser = build_argument_parser() + arguments = argument_parser.parse_args() + + input_stream = get_input_stream(arguments.input_file) + messages = parse_record_stream(input_stream) + + results = compute_results(messages, arguments.step_duration) + print(results)