diff --git a/simlib/blendnet-sims/scripts/latency.py b/simlib/blendnet-sims/scripts/latency.py index 6f1f113..d0936ec 100755 --- a/simlib/blendnet-sims/scripts/latency.py +++ b/simlib/blendnet-sims/scripts/latency.py @@ -153,53 +153,139 @@ class NodeStorage: return {str(node_id): asdict(node) for node_id, node in self.storage.items()} -def compute_results( - message_storage: MessageStorage, step_duration: int -) -> dict[str, int | float | str]: - complete_messages = [ - message for message in message_storage.values() if message.latency is not None - ] - incomplete_messages = sum( - (1 for message in message_storage.values() if message.latency is None) - ) +@dataclass +class LatencyAnalysis: + total_messages: int + total_complete_messages: int + total_incomplete_messages: int + min_latency_steps: int + min_latency_ms: int + min_latency_analysis: "MessageLatencyAnalysis" + max_latency_steps: int + max_latency_ms: int + max_latency_analysis: "MessageLatencyAnalysis" + avg_latency_steps: float + avg_latency_ms: int + median_latency_steps: float + median_latency_ms: int - total_messages = len(message_storage) - total_complete_messages = len(complete_messages) - total_incomplete_messages = incomplete_messages + @classmethod + def build( + cls, + message_storage: MessageStorage, + node_storage: NodeStorage, + step_duration: int, + ) -> "LatencyAnalysis": + complete_messages = [ + message + for message in message_storage.values() + if message.latency is not None + ] + incomplete_messages = sum( + (1 for message in message_storage.values() if message.latency is None) + ) - complete_latencies = [ - message.latency for message in complete_messages if message.latency is not None - ] - latency_average_steps = statistics.mean(complete_latencies) - latency_average_ms = "{:.2f}".format(latency_average_steps * step_duration) - latency_median_steps = statistics.median(complete_latencies) - latency_median_ms = "{:.2f}".format(latency_median_steps * step_duration) + total_messages = len(message_storage) + total_complete_messages = len(complete_messages) + total_incomplete_messages = incomplete_messages - max_message = max(complete_messages) - max_latency_steps = max_message.latency - assert max_latency_steps is not None - max_latency_ms = "{:.2f}".format(max_latency_steps * step_duration) + complete_latencies = [ + message.latency + for message in complete_messages + if message.latency is not None + ] - min_message = min(complete_messages) - min_latency_steps = min_message.latency - assert min_latency_steps is not None - min_latency_ms = "{:.2f}".format(min_latency_steps * step_duration) + min_message = min(complete_messages) + min_latency_steps = min_message.latency + assert min_latency_steps is not None + min_latency_ms = int(min_latency_steps * step_duration) + min_latency_analysis = MessageLatencyAnalysis.build(min_message, node_storage) - return { - "total_messages": total_messages, - "total_complete_messages": total_complete_messages, - "total_incomplete_messages": total_incomplete_messages, - "latency_average_steps": latency_average_steps, - "latency_average_ms": latency_average_ms, - "latency_median_steps": latency_median_steps, - "latency_median_ms": latency_median_ms, - "max_latency_message_id": max_message.id, - "max_latency_steps": max_latency_steps, - "max_latency_ms": max_latency_ms, - "min_latency_message_id": min_message.id, - "min_latency_steps": min_latency_steps, - "min_latency_ms": min_latency_ms, - } + max_message = max(complete_messages) + max_latency_steps = max_message.latency + assert max_latency_steps is not None + max_latency_ms = int(max_latency_steps * step_duration) + max_latency_analysis = MessageLatencyAnalysis.build(max_message, node_storage) + + avg_latency_steps = statistics.mean(complete_latencies) + avg_latency_ms = int(avg_latency_steps * step_duration) + median_latency_steps = statistics.median(complete_latencies) + median_latency_ms = int(median_latency_steps * step_duration) + + return cls( + total_messages, + total_complete_messages, + total_incomplete_messages, + min_latency_steps, + min_latency_ms, + min_latency_analysis, + max_latency_steps, + max_latency_ms, + max_latency_analysis, + avg_latency_steps, + avg_latency_ms, + median_latency_steps, + median_latency_ms, + ) + + +@dataclass +class MessageLatencyAnalysis: + message_id: str + persistent_latencies: list[int] + persistent_queue_sizes: list[int] + temporal_latencies: list[int] + temporal_queue_sizes: list[int] + + @classmethod + def build( + cls, message: Message, node_storage: NodeStorage + ) -> "MessageLatencyAnalysis": + persistent_latencies = [] + persistent_queue_sizes = [] + for latency in message.persistent_transmission_latencies: + if latency.steps is None: + continue + persistent_latencies.append(latency.steps) + persistent_queue_sizes.append( + next( + ( + event.queue_size_after_event + for event in node_storage.get( + latency.start_event.node_id + ).persistent_transmission_events + if event.event.topic == "PersistentTransmissionScheduled" + and event.event.msg_id == message.id + ) + ) + ) + + temporal_latencies = [] + temporal_queue_sizes = [] + for latency in message.temporal_processor_latencies: + if latency.steps is None: + continue + temporal_latencies.append(latency.steps) + temporal_queue_sizes.append( + next( + ( + event.queue_size_after_event + for event in node_storage.get( + latency.start_event.node_id + ).temporal_processor_events + if event.event.topic == "TemporalProcessorScheduled" + and event.event.msg_id == message.id + ) + ) + ) + + return cls( + message.id, + persistent_latencies, + persistent_queue_sizes, + temporal_latencies, + temporal_queue_sizes, + ) def parse_record_stream( @@ -263,7 +349,7 @@ if __name__ == "__main__": arguments = argument_parser.parse_args() input_stream = mixlog.get_input_stream(arguments.input_file) - messages, _ = parse_record_stream(input_stream) + messages, nodes = parse_record_stream(input_stream) - results = compute_results(messages, arguments.step_duration) - print(json.dumps(results, indent=4)) + results = LatencyAnalysis.build(messages, nodes, arguments.step_duration) + print(json.dumps(asdict(results), indent=2))