rename missing old names

This commit is contained in:
Youngjoon Lee 2024-12-21 15:17:59 +09:00
parent 1c0fd656c3
commit a71d36272d
No known key found for this signature in database
GPG Key ID: 303963A54A81DD4D

View File

@ -38,7 +38,7 @@ class Message:
id: str
total_latency: Latency
persistent_transmission_latencies: list[Latency] = field(default_factory=list)
blend_latencies: list[Latency] = field(default_factory=list)
temporal_processor_latencies: list[Latency] = field(default_factory=list)
def __hash__(self):
return self.id
@ -52,11 +52,11 @@ class Message:
def persistent_transmission_released(self, event: Event):
Message.finish_recent_latency(self.persistent_transmission_latencies, event)
def blend_scheduled(self, event: Event):
Message.start_new_latency(self.blend_latencies, event)
def temporal_processor_scheduled(self, event: Event):
Message.start_new_latency(self.temporal_processor_latencies, event)
def blend_released(self, event: Event):
Message.finish_recent_latency(self.blend_latencies, event)
def temporal_processor_released(self, event: Event):
Message.finish_recent_latency(self.temporal_processor_latencies, event)
@staticmethod
def start_new_latency(latencies: list[Latency], event: Event):
@ -103,8 +103,8 @@ class NodeEvents:
msg_generation_events: list[Event] = field(default_factory=list)
persistent_transmission_events: list[QueueEvent] = field(default_factory=list)
persistent_transmission_queue_size: int = 0
blend_events: list[QueueEvent] = field(default_factory=list)
blend_queue_size: int = 0
temporal_processor_events: list[QueueEvent] = field(default_factory=list)
temporal_processor_queue_size: int = 0
fully_unwrapped_msg_events: list[Event] = field(default_factory=list)
def add_msg_generation_event(self, event: Event):
@ -123,14 +123,18 @@ class NodeEvents:
QueueEvent(event, self.persistent_transmission_queue_size)
)
def add_blend_event(self, event: Event):
def add_temporal_processor_event(self, event: Event):
if event.topic == "TemporalProcessorScheduled":
self.blend_queue_size += 1
self.temporal_processor_queue_size += 1
elif event.topic == "MessageReleasedFromTemporalProcessor":
self.blend_queue_size -= 1
self.temporal_processor_queue_size -= 1
else:
raise Exception(f"Unexpected event topic for blend: {event.topic}")
self.blend_events.append(QueueEvent(event, self.blend_queue_size))
raise Exception(
f"Unexpected event topic for temporal processor: {event.topic}"
)
self.temporal_processor_events.append(
QueueEvent(event, self.temporal_processor_queue_size)
)
def add_fully_unwrapped_msg_event(self, event: Event):
self.fully_unwrapped_msg_events.append(event)
@ -224,12 +228,12 @@ def parse_record_stream(
node_storage.get(record["node_id"]).add_persistent_transmission_event(event)
elif topic == "TemporalProcessorScheduled":
event = event_from_record(topic, record)
msg_storage[record["payload_id"]].blend_scheduled(event)
node_storage.get(record["node_id"]).add_blend_event(event)
msg_storage[record["payload_id"]].temporal_processor_scheduled(event)
node_storage.get(record["node_id"]).add_temporal_processor_event(event)
elif topic == "MessageReleasedFromTemporalProcessor":
event = event_from_record(topic, record)
msg_storage[record["payload_id"]].blend_released(event)
node_storage.get(record["node_id"]).add_blend_event(event)
msg_storage[record["payload_id"]].temporal_processor_released(event)
node_storage.get(record["node_id"]).add_temporal_processor_event(event)
return msg_storage, node_storage