diff --git a/simlib/blendnet-sims/scripts/latency.py b/simlib/blendnet-sims/scripts/latency.py index 46f71ef..6485c38 100755 --- a/simlib/blendnet-sims/scripts/latency.py +++ b/simlib/blendnet-sims/scripts/latency.py @@ -1,346 +1,85 @@ # !/usr/bin/env python import argparse import json -import statistics -from collections.abc import Iterable -from dataclasses import asdict, dataclass, field -from typing import Any, Dict, Optional +from dataclasses import asdict, dataclass +from typing import Iterable import mixlog - - -@dataclass -class Event: - topic: str - msg_id: str - step_id: int - node_id: int - - -@dataclass -class Latency: - start_event: Event - end_event: Optional[Event] = None - steps: Optional[int] = None - - def finish(self, event: Event): - assert self.end_event is None - assert event.step_id >= self.start_event.step_id - self.end_event = event - self.steps = self.end_event.step_id - self.start_event.step_id - - def finished(self) -> bool: - return self.end_event is not None - - -@dataclass -class Message: - id: str - total_latency: Latency - persistent_transmission_latencies: list[Latency] = field(default_factory=list) - temporal_processor_latencies: list[Latency] = field(default_factory=list) - - def __hash__(self): - return self.id - - def fully_unwrapped(self, event: Event): - self.total_latency.finish(event) - - def persistent_transmission_scheduled(self, event: Event): - Message.start_new_latency(self.persistent_transmission_latencies, event) - - def persistent_transmission_released(self, event: Event): - Message.finish_recent_latency(self.persistent_transmission_latencies, event) - - def temporal_processor_scheduled(self, event: Event): - Message.start_new_latency(self.temporal_processor_latencies, event) - - def temporal_processor_released(self, event: Event): - Message.finish_recent_latency(self.temporal_processor_latencies, event) - - @staticmethod - def start_new_latency(latencies: list[Latency], event: Event): - latencies.append(Latency(event)) - - @staticmethod - def finish_recent_latency(latencies: list[Latency], event: Event): - for latency in reversed(latencies): - if latency.start_event.node_id == event.node_id: - assert not latency.finished() - latency.finish(event) - return - raise Exception("No latency to finish") - - @property - def latency(self) -> Optional[int]: - return self.total_latency.steps - - def __eq__(self, other): - if not isinstance(other, Message): - return NotImplemented - return self.latency == other.latency - - def __lt__(self, other): - if not isinstance(other, Message): - return NotImplemented - if self.latency is None or other.latency is None: - return NotImplemented - return self.latency < other.latency - - -MessageStorage = Dict[str, Message] - - -@dataclass -class QueueEvent: - event: Event - queue_size_after_event: int - - -@dataclass -class NodeEvents: - id: int - msg_generation_events: list[Event] = field(default_factory=list) - persistent_transmission_events: list[QueueEvent] = field(default_factory=list) - persistent_transmission_queue_size: int = 0 - temporal_processor_events: list[QueueEvent] = field(default_factory=list) - temporal_processor_queue_size: int = 0 - fully_unwrapped_msg_events: list[Event] = field(default_factory=list) - - def add_msg_generation_event(self, event: Event): - self.msg_generation_events.append(event) - - def add_persistent_transmission_event(self, event: Event): - if event.topic == "PersistentTransmissionScheduled": - self.persistent_transmission_queue_size += 1 - elif event.topic == "MessageReleasedFromPersistentTransmission": - self.persistent_transmission_queue_size -= 1 - else: - raise Exception( - f"Unexpected event topic for persistent transmission: {event.topic}" - ) - self.persistent_transmission_events.append( - QueueEvent(event, self.persistent_transmission_queue_size) - ) - - def add_temporal_processor_event(self, event: Event): - if event.topic == "TemporalProcessorScheduled": - self.temporal_processor_queue_size += 1 - elif event.topic == "MessageReleasedFromTemporalProcessor": - self.temporal_processor_queue_size -= 1 - else: - raise Exception( - f"Unexpected event topic for temporal processor: {event.topic}" - ) - self.temporal_processor_events.append( - QueueEvent(event, self.temporal_processor_queue_size) - ) - - def add_fully_unwrapped_msg_event(self, event: Event): - self.fully_unwrapped_msg_events.append(event) - - -class NodeStorage: - def __init__(self): - self.storage: dict[int, NodeEvents] = {} - - def get(self, node_id: int) -> NodeEvents: - if node_id not in self.storage: - self.storage[node_id] = NodeEvents(node_id) - return self.storage[node_id] - - def to_dict(self) -> dict[str, dict[str, Any]]: - return {str(node_id): asdict(node) for node_id, node in self.storage.items()} +import pandas as pd @dataclass class LatencyAnalysis: total_messages: int - total_complete_messages: int - total_incomplete_messages: int - min_latency_steps: int min_latency_ms: int min_latency_analysis: "MessageLatencyAnalysis" - max_latency_steps: int max_latency_ms: int max_latency_analysis: "MessageLatencyAnalysis" - avg_latency_steps: float avg_latency_ms: int - median_latency_steps: float median_latency_ms: int @classmethod - def build( - cls, - message_storage: MessageStorage, - node_storage: NodeStorage, - step_duration: int, - ) -> "LatencyAnalysis": - complete_messages = [ - message - for message in message_storage.values() - if message.latency is not None - ] - incomplete_messages = sum( - (1 for message in message_storage.values() if message.latency is None) - ) + def build(cls, input_stream: Iterable[tuple[str, dict]]) -> "LatencyAnalysis": + latencies = [] + message_ids = [] + messages: dict[str, dict] = {} + for topic, record in input_stream: + if topic != "MessageFullyUnwrapped": + continue + latencies.append(record["total_duration"]) + message_id = record["message"]["payload_id"] + message_ids.append(message_id) + messages[message_id] = record - total_messages = len(message_storage) - total_complete_messages = len(complete_messages) - total_incomplete_messages = incomplete_messages - - complete_latencies = [ - message.latency - for message in complete_messages - if message.latency is not None - ] - - min_message = min(complete_messages) - min_latency_steps = min_message.latency - assert min_latency_steps is not None - min_latency_ms = int(min_latency_steps * step_duration) - min_latency_analysis = MessageLatencyAnalysis.build( - min_message, node_storage, step_duration - ) - - max_message = max(complete_messages) - max_latency_steps = max_message.latency - assert max_latency_steps is not None - max_latency_ms = int(max_latency_steps * step_duration) - max_latency_analysis = MessageLatencyAnalysis.build( - max_message, node_storage, step_duration - ) - - avg_latency_steps = statistics.mean(complete_latencies) - avg_latency_ms = int(avg_latency_steps * step_duration) - median_latency_steps = statistics.median(complete_latencies) - median_latency_ms = int(median_latency_steps * step_duration) + latencies = pd.Series(latencies) + message_ids = pd.Series(message_ids) return cls( - total_messages=total_messages, - total_complete_messages=total_complete_messages, - total_incomplete_messages=total_incomplete_messages, - min_latency_steps=min_latency_steps, - min_latency_ms=min_latency_ms, - min_latency_analysis=min_latency_analysis, - max_latency_steps=max_latency_steps, - max_latency_ms=max_latency_ms, - max_latency_analysis=max_latency_analysis, - avg_latency_steps=avg_latency_steps, - avg_latency_ms=avg_latency_ms, - median_latency_steps=median_latency_steps, - median_latency_ms=median_latency_ms, + int(latencies.count()), + int(latencies.min()), + MessageLatencyAnalysis.build( + messages[str(message_ids[latencies.idxmin()])] + ), + int(latencies.max()), + MessageLatencyAnalysis.build( + messages[str(message_ids[latencies.idxmax()])] + ), + int(latencies.mean()), + int(latencies.median()), ) @dataclass class MessageLatencyAnalysis: message_id: str - persistent_latencies_step: list[int] persistent_latencies_ms: list[int] - persistent_queue_sizes: list[int] - temporal_latencies_step: list[int] + persistent_indices: list[int] temporal_latencies_ms: list[int] - temporal_queue_sizes: list[int] + temporal_indices: list[int] @classmethod def build( cls, - message: Message, - node_storage: NodeStorage, - step_duration: int, + message: dict, ) -> "MessageLatencyAnalysis": - persistent_latencies_step = [] - persistent_latencies_ms = [] - persistent_queue_sizes = [] - for latency in message.persistent_transmission_latencies: - if latency.steps is None: - continue - persistent_latencies_step.append(latency.steps) - persistent_latencies_ms.append(latency.steps * step_duration) - persistent_queue_sizes.append( - next( - ( - event.queue_size_after_event - for event in node_storage.get( - latency.start_event.node_id - ).persistent_transmission_events - if event.event.topic == "PersistentTransmissionScheduled" - and event.event.msg_id == message.id - ) + analysis = cls(message["message"]["payload_id"], [], [], [], []) + + for event in message["history"]: + event_type = event["event_type"] + if "PersistentTransmissionScheduled" in event_type: + analysis.persistent_indices.append( + int(event_type["PersistentTransmissionScheduled"]["index"]) ) - ) - - temporal_latencies_step = [] - temporal_latencies_ms = [] - temporal_queue_sizes = [] - for latency in message.temporal_processor_latencies: - if latency.steps is None: - continue - temporal_latencies_step.append(latency.steps) - temporal_latencies_ms.append(latency.steps * step_duration) - temporal_queue_sizes.append( - next( - ( - event.queue_size_after_event - for event in node_storage.get( - latency.start_event.node_id - ).temporal_processor_events - if event.event.topic == "TemporalProcessorScheduled" - and event.event.msg_id == message.id - ) + elif event_type == "PersistentTransmissionReleased": + analysis.persistent_latencies_ms.append(event["duration_from_prev"]) + elif "TemporalProcessorScheduled" in event_type: + analysis.temporal_indices.append( + int(event_type["TemporalProcessorScheduled"]["index"]) ) - ) + elif event_type == "TemporalProcessorReleased": + analysis.temporal_latencies_ms.append(event["duration_from_prev"]) - return cls( - message_id=message.id, - persistent_latencies_step=persistent_latencies_step, - persistent_latencies_ms=persistent_latencies_ms, - persistent_queue_sizes=persistent_queue_sizes, - temporal_latencies_step=temporal_latencies_step, - temporal_latencies_ms=temporal_latencies_ms, - temporal_queue_sizes=temporal_queue_sizes, - ) - - -def parse_record_stream( - record_stream: Iterable[tuple[str, dict]], -) -> tuple[MessageStorage, NodeStorage]: - msg_storage: MessageStorage = {} - node_storage: NodeStorage = NodeStorage() - - for topic, record in record_stream: - if topic in ("DataMessageGenerated", "CoverMessageGenerated"): - event = event_from_record(topic, record) - payload_id = record["payload_id"] - msg_storage[payload_id] = Message(payload_id, Latency(event)) - node_storage.get(record["node_id"]).add_msg_generation_event(event) - elif topic == "MessageFullyUnwrapped": - event = event_from_record(topic, record) - msg_storage[record["payload_id"]].fully_unwrapped(event) - node_storage.get(record["node_id"]).add_fully_unwrapped_msg_event(event) - elif topic == "PersistentTransmissionScheduled": - event = event_from_record(topic, record) - msg_storage[record["payload_id"]].persistent_transmission_scheduled(event) - node_storage.get(record["node_id"]).add_persistent_transmission_event(event) - elif topic == "MessageReleasedFromPersistentTransmission": - event = event_from_record(topic, record) - msg_storage[record["payload_id"]].persistent_transmission_released(event) - node_storage.get(record["node_id"]).add_persistent_transmission_event(event) - elif topic == "TemporalProcessorScheduled": - event = event_from_record(topic, record) - msg_storage[record["payload_id"]].temporal_processor_scheduled(event) - node_storage.get(record["node_id"]).add_temporal_processor_event(event) - elif topic == "MessageReleasedFromTemporalProcessor": - event = event_from_record(topic, record) - msg_storage[record["payload_id"]].temporal_processor_released(event) - node_storage.get(record["node_id"]).add_temporal_processor_event(event) - - return msg_storage, node_storage - - -def event_from_record(topic: str, record: dict) -> Event: - return Event(topic, record["payload_id"], record["step_id"], record["node_id"]) + return analysis def build_argument_parser() -> argparse.ArgumentParser: @@ -363,8 +102,5 @@ if __name__ == "__main__": argument_parser = build_argument_parser() arguments = argument_parser.parse_args() - input_stream = mixlog.get_input_stream(arguments.input_file) - messages, nodes = parse_record_stream(input_stream) - - results = LatencyAnalysis.build(messages, nodes, arguments.step_duration) - print(json.dumps(asdict(results), indent=2)) + analysis = LatencyAnalysis.build(mixlog.get_input_stream(arguments.input_file)) + print(json.dumps(asdict(analysis), indent=2))