From 2a7965d1dd55a459b8e3f1c6924f8dca36b0bdf3 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Fri, 20 Dec 2024 22:09:24 +0900 Subject: [PATCH] add QueueEvent and handle events from temporal processor properly --- simlib/blendnet-sims/scripts/latency.py | 36 ++++++++++++++++++++----- 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/simlib/blendnet-sims/scripts/latency.py b/simlib/blendnet-sims/scripts/latency.py index 5d603ab..0a2e5dc 100755 --- a/simlib/blendnet-sims/scripts/latency.py +++ b/simlib/blendnet-sims/scripts/latency.py @@ -91,22 +91,46 @@ class Message: MessageStorage = Dict[str, Message] +@dataclass +class QueueEvent: + event: Event + queue_size_after_event: int + + @dataclass class NodeEvents: id: int msg_generation_events: list[Event] = field(default_factory=list) - persistent_transmission_events: list[Event] = field(default_factory=list) - blend_events: list[Event] = field(default_factory=list) + persistent_transmission_events: list[QueueEvent] = field(default_factory=list) + persistent_transmission_queue_size: int = 0 + blend_events: list[QueueEvent] = field(default_factory=list) + blend_queue_size: int = 0 fully_unwrapped_msg_events: list[Event] = field(default_factory=list) def add_msg_generation_event(self, event: Event): self.msg_generation_events.append(event) def add_persistent_transmission_event(self, event: Event): - self.persistent_transmission_events.append(event) + if event.topic == "PersistentTransmissionScheduled": + self.persistent_transmission_queue_size += 1 + elif event.topic == "MessageReleasedFromPersistentTransmission": + self.persistent_transmission_queue_size -= 1 + else: + raise Exception( + f"Unexpected event topic for persistent transmission: {event.topic}" + ) + self.persistent_transmission_events.append( + QueueEvent(event, self.persistent_transmission_queue_size) + ) def add_blend_event(self, event: Event): - self.blend_events.append(event) + if event.topic == "TemporalProcessorScheduled": + self.blend_queue_size += 1 + elif event.topic == "MessageReleasedFromTemporalProcessor": + self.blend_queue_size -= 1 + else: + raise Exception(f"Unexpected event topic for blend: {event.topic}") + self.blend_events.append(QueueEvent(event, self.blend_queue_size)) def add_fully_unwrapped_msg_event(self, event: Event): self.fully_unwrapped_msg_events.append(event) @@ -198,11 +222,11 @@ def parse_record_stream( event = event_from_record(topic, record) msg_storage[record["payload_id"]].persistent_transmission_released(event) node_storage.get(record["node_id"]).add_persistent_transmission_event(event) - elif topic == "BlendScheduled": + elif topic == "TemporalProcessorScheduled": event = event_from_record(topic, record) msg_storage[record["payload_id"]].blend_scheduled(event) node_storage.get(record["node_id"]).add_blend_event(event) - elif topic == "MessageReleasedFromBlend": + elif topic == "MessageReleasedFromTemporalProcessor": event = event_from_record(topic, record) msg_storage[record["payload_id"]].blend_released(event) node_storage.get(record["node_id"]).add_blend_event(event)