diff --git a/simlib/blendnet-sims/scripts/latency.py b/simlib/blendnet-sims/scripts/latency.py index 51941ef..28a6521 100755 --- a/simlib/blendnet-sims/scripts/latency.py +++ b/simlib/blendnet-sims/scripts/latency.py @@ -8,22 +8,73 @@ from typing import Dict, Optional import mixlog +class Event: + def __init__(self, step_id: int, node_id: int): + self.step_id = step_id + self.node_id = node_id + + +class Latency: + def __init__(self, start_event: Event): + self.start_event = start_event + self.end_event = None + + def finish(self, event: Event): + assert self.end_event is None + assert event.step_id >= self.start_event.step_id + self.end_event = event + + def finished(self) -> bool: + return self.end_event is not None + + @property + def value(self) -> Optional[int]: + if self.end_event is None: + return None + return self.end_event.step_id - self.start_event.step_id + + class Message: - def __init__(self, message_id: str, step_a: Optional[int]): + def __init__(self, message_id: str, msg_gen_event: Event): self.id = message_id - self.step_a = int(step_a) if step_a is not None else None - self.step_b = None + self.total_latency = Latency(msg_gen_event) + self.persistent_transmission_latencies: list[Latency] = [] + self.blend_latencies: list[Latency] = [] def __hash__(self): return self.id - def __repr__(self): - return f"[{self.id}] {self.step_a} -> {self.step_b}" + def fully_unwrapped(self, event: Event): + self.total_latency.finish(event) + + def persistent_transmission_scheduled(self, event: Event): + Message.start_new_latency(self.persistent_transmission_latencies, event) + + def persistent_transmission_released(self, event: Event): + Message.finish_recent_latency(self.persistent_transmission_latencies, event) + + def blend_scheduled(self, event: Event): + Message.start_new_latency(self.blend_latencies, event) + + def blend_released(self, event: Event): + Message.finish_recent_latency(self.blend_latencies, event) + + @staticmethod + def start_new_latency(latencies: list[Latency], event: Event): + latencies.append(Latency(event)) + + @staticmethod + def finish_recent_latency(latencies: list[Latency], event: Event): + for latency in reversed(latencies): + if latency.start_event.node_id == event.node_id: + assert not latency.finished() + latency.finish(event) + return + raise Exception("No latency to finish") @property def latency(self) -> Optional[int]: - if self.step_a is not None and self.step_b is not None: - return abs(self.step_a - self.step_b) + return self.total_latency.value def __eq__(self, other): if not isinstance(other, Message): @@ -93,26 +144,32 @@ def compute_results( def parse_record_stream(record_stream: Iterable[tuple[str, dict]]) -> MessageStorage: storage: MessageStorage = {} - for _, record in filter( - lambda x: x[0] - in ( - "DataMessageGenerated", - "CoverMessageGenerated", - "MessageFullyUnwrapped", - ), - record_stream, - ): - payload_id = record["payload_id"] - step_id = record["step_id"] - - if (stored_message := storage.get(payload_id)) is None: - storage[payload_id] = Message(payload_id, step_id) - else: - stored_message.step_b = step_id + for topic, record in record_stream: + if topic in ("DataMessageGenerated", "CoverMessageGenerated"): + payload_id = record["payload_id"] + storage[payload_id] = Message(payload_id, event_from_record(record)) + elif topic == "MessageFullyUnwrapped": + storage[record["payload_id"]].fully_unwrapped(event_from_record(record)) + elif topic == "PersistentTransmissionScheduled": + storage[record["payload_id"]].persistent_transmission_scheduled( + event_from_record(record) + ) + elif topic == "PersistentTransmissionReleased": + storage[record["payload_id"]].persistent_transmission_released( + event_from_record(record) + ) + elif topic == "BlendScheduled": + storage[record["payload_id"]].blend_scheduled(event_from_record(record)) + elif topic == "BlendReleased": + storage[record["payload_id"]].blend_released(event_from_record(record)) return storage +def event_from_record(record: dict) -> Event: + return Event(record["step_id"], record["node_id"]) + + def build_argument_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Log analysis for nomos-simulations.") parser.add_argument(