add QueueEvent and handle events from temporal processor properly

This commit is contained in:
Youngjoon Lee 2024-12-20 22:09:24 +09:00
parent d6c5ff2004
commit 2a7965d1dd
No known key found for this signature in database
GPG Key ID: 303963A54A81DD4D

View File

@ -91,22 +91,46 @@ class Message:
MessageStorage = Dict[str, Message]
@dataclass
class QueueEvent:
event: Event
queue_size_after_event: int
@dataclass
class NodeEvents:
id: int
msg_generation_events: list[Event] = field(default_factory=list)
persistent_transmission_events: list[Event] = field(default_factory=list)
blend_events: list[Event] = field(default_factory=list)
persistent_transmission_events: list[QueueEvent] = field(default_factory=list)
persistent_transmission_queue_size: int = 0
blend_events: list[QueueEvent] = field(default_factory=list)
blend_queue_size: int = 0
fully_unwrapped_msg_events: list[Event] = field(default_factory=list)
def add_msg_generation_event(self, event: Event):
self.msg_generation_events.append(event)
def add_persistent_transmission_event(self, event: Event):
self.persistent_transmission_events.append(event)
if event.topic == "PersistentTransmissionScheduled":
self.persistent_transmission_queue_size += 1
elif event.topic == "MessageReleasedFromPersistentTransmission":
self.persistent_transmission_queue_size -= 1
else:
raise Exception(
f"Unexpected event topic for persistent transmission: {event.topic}"
)
self.persistent_transmission_events.append(
QueueEvent(event, self.persistent_transmission_queue_size)
)
def add_blend_event(self, event: Event):
self.blend_events.append(event)
if event.topic == "TemporalProcessorScheduled":
self.blend_queue_size += 1
elif event.topic == "MessageReleasedFromTemporalProcessor":
self.blend_queue_size -= 1
else:
raise Exception(f"Unexpected event topic for blend: {event.topic}")
self.blend_events.append(QueueEvent(event, self.blend_queue_size))
def add_fully_unwrapped_msg_event(self, event: Event):
self.fully_unwrapped_msg_events.append(event)
@ -198,11 +222,11 @@ def parse_record_stream(
event = event_from_record(topic, record)
msg_storage[record["payload_id"]].persistent_transmission_released(event)
node_storage.get(record["node_id"]).add_persistent_transmission_event(event)
elif topic == "BlendScheduled":
elif topic == "TemporalProcessorScheduled":
event = event_from_record(topic, record)
msg_storage[record["payload_id"]].blend_scheduled(event)
node_storage.get(record["node_id"]).add_blend_event(event)
elif topic == "MessageReleasedFromBlend":
elif topic == "MessageReleasedFromTemporalProcessor":
event = event_from_record(topic, record)
msg_storage[record["payload_id"]].blend_released(event)
node_storage.get(record["node_id"]).add_blend_event(event)