From 4ac7cfed8bc98e5a0e484da9dae9008732111436 Mon Sep 17 00:00:00 2001 From: Alejandro Cabeza Romero Date: Fri, 8 Nov 2024 14:02:33 +0100 Subject: [PATCH] Adapt to new log format --- scripts/latency.py | 121 ++++++++++++++++++++++----------------------- 1 file changed, 58 insertions(+), 63 deletions(-) diff --git a/scripts/latency.py b/scripts/latency.py index 79e6e67..6fd970c 100755 --- a/scripts/latency.py +++ b/scripts/latency.py @@ -1,8 +1,8 @@ # !/usr/bin/env python +import json import sys from collections.abc import Iterable from typing import Dict, Optional -import json_stream import statistics import argparse @@ -12,101 +12,95 @@ JsonStream = Iterable[TransientStreamingJSONObject] class Message: + def __init__(self, message_id: str, step_a: Optional[int]): + self.id = message_id + self.step_a = int(step_a) if step_a is not None else None + self.step_b = None + def __hash__(self): return self.id - def __init__( - self, - message_id: str, - generator_node: Optional[str], - generator_step: Optional[int], - unwrapper_node: Optional[str], - unwrapper_step: Optional[int] - ): - self.id = message_id - self.generator_node = generator_node - self.generator_step = int(generator_step) if generator_step is not None else None - self.unwrapper_node = unwrapper_node - self.unwrapper_step = int(unwrapper_step) if unwrapper_step is not None else None - def __repr__(self): - return f"[{self.id}] {self.generator_node}-{self.generator_step} -> {self.unwrapper_node}-{self.unwrapper_step}" + return f"[{self.id}] {self.step_a} -> {self.step_b}" @property def latency(self) -> Optional[int]: - if self.unwrapper_step is not None and self.generator_step is not None: - return self.unwrapper_step - self.generator_step + if self.step_a is not None and self.step_b is not None: + return abs(self.step_a - self.step_b) MessageStorage = Dict[str, Message] -def print_results(message_storage: MessageStorage, step_duration: int): +def compute_results(message_storage: MessageStorage, step_duration: int) -> str: latencies = [message_record.latency for message_record in message_storage.values()] valued_latencies = [latency for latency in latencies if latency is not None] incomplete_latencies = sum((1 for latency in latencies if latency is None)) + total_messages = len(latencies) + total_messages_full_latency = len(valued_latencies) + total_messages_incomplete_latency = incomplete_latencies latency_average_steps = statistics.mean(valued_latencies) + latency_average_ms = "{:.2f}ms".format(latency_average_steps * step_duration) latency_median_steps = statistics.median(valued_latencies) + latency_median_ms = "{:.2f}ms".format(latency_median_steps * step_duration) + max_latency_steps = max(valued_latencies) + max_latency_ms = "{:.2f}ms".format(max_latency_steps * step_duration) + min_latency_steps = min(valued_latencies) + min_latency_ms = "{:.2f}ms".format(min_latency_steps * step_duration) - print("[Results]") - print(f"- Total messages: {len(latencies)}") - print(f" - Full latencies: {len(valued_latencies)}") - print(f" - Incomplete latencies: {incomplete_latencies}") - print("- Average") - print(f" - Steps: {latency_average_steps}") - print(" - Duration: {:.2f}ms".format(latency_average_steps * step_duration)) - print("- Median") - print(f" - Steps: {latency_median_steps}") - print(" - Duration: {:.2f}ms".format(latency_median_steps * step_duration)) + return f"""[Results] +- Total messages: {total_messages} + - Full latencies: {total_messages_full_latency} + - Incomplete latencies: {total_messages_incomplete_latency} +- Averages + - Steps: {latency_average_steps} + - Duration: {latency_average_ms} +- Median + - Steps: {latency_median_steps} + - Duration: {latency_median_ms} +- Max + - Steps: {max_latency_steps} + - Duration: {max_latency_ms} +- Min + - Steps: {min_latency_steps} + - Duration: {min_latency_ms}""" -def parse_record_stream(record_stream: JsonStream) -> MessageStorage: +def parse_record_stream(record_stream: Iterable[str]) -> MessageStorage: storage: MessageStorage = {} for record in record_stream: - node_id = record["node_id"] - _step_id = record["step_id"] + json_record = json.loads(record) + payload_id = json_record["payload_id"] + step_id = json_record["step_id"] - data_messages_generated = record["data_messages_generated"] - for generated_message_id, generated_message_step_id in data_messages_generated.items(): - stored_message = storage.get(generated_message_id) - if stored_message: - stored_message.generator_node = node_id - stored_message.generator_step = generated_message_step_id - else: - storage[generated_message_id] = Message( - generated_message_id, node_id, generated_message_step_id, None, None - ) - - data_messages_fully_unwrapped = record["data_messages_fully_unwrapped"] - for generated_message_id, generated_message_step_id in data_messages_fully_unwrapped.items(): - stored_message = storage.get(generated_message_id) - if stored_message: - stored_message.unwrapper_node = node_id - stored_message.unwrapper_step = generated_message_step_id - else: - storage[generated_message_id] = Message( - generated_message_id, None, None, node_id, generated_message_step_id - ) + if (stored_message := storage.get(payload_id)) is None: + storage[payload_id] = Message(payload_id, step_id) + else: + stored_message.step_b = step_id return storage -def from_pipe() -> JsonStream: - yield from json_stream.load(sys.stdin) +def line_to_json_stream(record_stream: Iterable[str]) -> Iterable[str]: + for record in record_stream: + bracket_pos = record.rfind("{") + yield record[bracket_pos:] -def from_file(input_filename) -> JsonStream: +def get_pipe_stream() -> Iterable[str]: + yield from sys.stdin + + +def get_file_stream(input_filename) -> Iterable[str]: with open(input_filename, "r") as file: - data = json_stream.load(file) - yield from data["records"] + yield from file -def get_input_stream(input_filename: Optional[str]) -> JsonStream: - if input_filename is not None: - return from_file(input_filename) - return from_pipe() +def get_input_stream(input_filename: Optional[str]) -> Iterable[str]: + stream = get_file_stream(input_filename) if input_filename is not None else get_pipe_stream() + return line_to_json_stream(stream) def build_argument_parser() -> argparse.ArgumentParser: @@ -132,4 +126,5 @@ if __name__ == "__main__": input_stream = get_input_stream(arguments.input_file) messages = parse_record_stream(input_stream) - print_results(messages, arguments.step_duration) + results = compute_results(messages, arguments.step_duration) + print(results)