diff --git a/simlib/blendnet-sims/scripts/batch.py b/simlib/blendnet-sims/scripts/batch.py index 424bdba..e2a7440 100644 --- a/simlib/blendnet-sims/scripts/batch.py +++ b/simlib/blendnet-sims/scripts/batch.py @@ -114,15 +114,19 @@ print( ) for idx, log_path in enumerate(log_paths): network_diameter = topology_result(log_path)["diameter"] - message_stroage = latency.parse_record_stream(mixlog.get_input_stream(log_path)) + message_storage, node_storage = latency.parse_record_stream( + mixlog.get_input_stream(log_path) + ) with open(f"{log_dir}/msgs-{idx}.json", "w") as file: json.dump( - {msg_id: asdict(msg) for msg_id, msg in message_stroage.items()}, + {msg_id: asdict(msg) for msg_id, msg in message_storage.items()}, file, indent=2, ) + with open(f"{log_dir}/nodes-{idx}.json", "w") as file: + json.dump(node_storage.to_dict(), file, indent=2) - latency_res = latency.compute_results(message_stroage, STEP_DURATION_MS) + latency_res = latency.compute_results(message_storage, STEP_DURATION_MS) msg_count = latency_res["total_complete_messages"] min_latency = float(latency_res["min_latency_ms"]) / 1000.0 min_latency_msg_id = latency_res["min_latency_message_id"] diff --git a/simlib/blendnet-sims/scripts/latency.py b/simlib/blendnet-sims/scripts/latency.py index a020a32..e56f179 100755 --- a/simlib/blendnet-sims/scripts/latency.py +++ b/simlib/blendnet-sims/scripts/latency.py @@ -3,14 +3,16 @@ import argparse import json import statistics from collections.abc import Iterable -from dataclasses import dataclass, field -from typing import Dict, Optional +from dataclasses import asdict, dataclass, field +from typing import Any, Dict, Optional import mixlog @dataclass class Event: + topic: str + msg_id: str step_id: int node_id: int @@ -93,6 +95,40 @@ class Message: MessageStorage = Dict[str, Message] +@dataclass +class NodeEvents: + id: int + msg_generation_events: list[Event] = field(default_factory=list) + persistent_transmission_events: list[Event] = field(default_factory=list) + blend_events: list[Event] = field(default_factory=list) + fully_unwrapped_msg_events: list[Event] = field(default_factory=list) + + def add_msg_generation_event(self, event: Event): + self.msg_generation_events.append(event) + + def add_persistent_transmission_event(self, event: Event): + self.persistent_transmission_events.append(event) + + def add_blend_event(self, event: Event): + self.blend_events.append(event) + + def add_fully_unwrapped_msg_event(self, event: Event): + self.fully_unwrapped_msg_events.append(event) + + +class NodeStorage: + def __init__(self): + self.storage: dict[int, NodeEvents] = {} + + def get(self, node_id: int) -> NodeEvents: + if node_id not in self.storage: + self.storage[node_id] = NodeEvents(node_id) + return self.storage[node_id] + + def to_dict(self) -> dict[str, dict[str, Any]]: + return {str(node_id): asdict(node) for node_id, node in self.storage.items()} + + def compute_results( message_storage: MessageStorage, step_duration: int ) -> dict[str, int | float | str]: @@ -142,35 +178,44 @@ def compute_results( } -def parse_record_stream(record_stream: Iterable[tuple[str, dict]]) -> MessageStorage: - storage: MessageStorage = {} +def parse_record_stream( + record_stream: Iterable[tuple[str, dict]], +) -> tuple[MessageStorage, NodeStorage]: + msg_storage: MessageStorage = {} + node_storage: NodeStorage = NodeStorage() for topic, record in record_stream: if topic in ("DataMessageGenerated", "CoverMessageGenerated"): + event = event_from_record(topic, record) payload_id = record["payload_id"] - storage[payload_id] = Message( - payload_id, Latency(event_from_record(record)) - ) + msg_storage[payload_id] = Message(payload_id, Latency(event)) + node_storage.get(record["node_id"]).add_msg_generation_event(event) elif topic == "MessageFullyUnwrapped": - storage[record["payload_id"]].fully_unwrapped(event_from_record(record)) + event = event_from_record(topic, record) + msg_storage[record["payload_id"]].fully_unwrapped(event) + node_storage.get(record["node_id"]).add_fully_unwrapped_msg_event(event) elif topic == "PersistentTransmissionScheduled": - storage[record["payload_id"]].persistent_transmission_scheduled( - event_from_record(record) - ) + event = event_from_record(topic, record) + msg_storage[record["payload_id"]].persistent_transmission_scheduled(event) + node_storage.get(record["node_id"]).add_persistent_transmission_event(event) elif topic == "MessageReleasedFromPersistentTransmission": - storage[record["payload_id"]].persistent_transmission_released( - event_from_record(record) - ) + event = event_from_record(topic, record) + msg_storage[record["payload_id"]].persistent_transmission_released(event) + node_storage.get(record["node_id"]).add_persistent_transmission_event(event) elif topic == "BlendScheduled": - storage[record["payload_id"]].blend_scheduled(event_from_record(record)) + event = event_from_record(topic, record) + msg_storage[record["payload_id"]].blend_scheduled(event) + node_storage.get(record["node_id"]).add_blend_event(event) elif topic == "MessageReleasedFromBlend": - storage[record["payload_id"]].blend_released(event_from_record(record)) + event = event_from_record(topic, record) + msg_storage[record["payload_id"]].blend_released(event) + node_storage.get(record["node_id"]).add_blend_event(event) - return storage + return msg_storage, node_storage -def event_from_record(record: dict) -> Event: - return Event(record["step_id"], record["node_id"]) +def event_from_record(topic: str, record: dict) -> Event: + return Event(topic, record["payload_id"], record["step_id"], record["node_id"]) def build_argument_parser() -> argparse.ArgumentParser: @@ -194,7 +239,7 @@ if __name__ == "__main__": arguments = argument_parser.parse_args() input_stream = mixlog.get_input_stream(arguments.input_file) - messages = parse_record_stream(input_stream) + messages, _ = parse_record_stream(input_stream) results = compute_results(messages, arguments.step_duration) print(json.dumps(results, indent=4))