update latency.py

This commit is contained in:
Youngjoon Lee 2025-02-06 14:09:42 +09:00
parent 9ea38614e2
commit 6bf7ae49bd
No known key found for this signature in database
GPG Key ID: D94003D91DE12141

View File

@ -1,346 +1,85 @@
# !/usr/bin/env python
import argparse
import json
import statistics
from collections.abc import Iterable
from dataclasses import asdict, dataclass, field
from typing import Any, Dict, Optional
from dataclasses import asdict, dataclass
from typing import Iterable
import mixlog
@dataclass
class Event:
topic: str
msg_id: str
step_id: int
node_id: int
@dataclass
class Latency:
start_event: Event
end_event: Optional[Event] = None
steps: Optional[int] = None
def finish(self, event: Event):
assert self.end_event is None
assert event.step_id >= self.start_event.step_id
self.end_event = event
self.steps = self.end_event.step_id - self.start_event.step_id
def finished(self) -> bool:
return self.end_event is not None
@dataclass
class Message:
id: str
total_latency: Latency
persistent_transmission_latencies: list[Latency] = field(default_factory=list)
temporal_processor_latencies: list[Latency] = field(default_factory=list)
def __hash__(self):
return self.id
def fully_unwrapped(self, event: Event):
self.total_latency.finish(event)
def persistent_transmission_scheduled(self, event: Event):
Message.start_new_latency(self.persistent_transmission_latencies, event)
def persistent_transmission_released(self, event: Event):
Message.finish_recent_latency(self.persistent_transmission_latencies, event)
def temporal_processor_scheduled(self, event: Event):
Message.start_new_latency(self.temporal_processor_latencies, event)
def temporal_processor_released(self, event: Event):
Message.finish_recent_latency(self.temporal_processor_latencies, event)
@staticmethod
def start_new_latency(latencies: list[Latency], event: Event):
latencies.append(Latency(event))
@staticmethod
def finish_recent_latency(latencies: list[Latency], event: Event):
for latency in reversed(latencies):
if latency.start_event.node_id == event.node_id:
assert not latency.finished()
latency.finish(event)
return
raise Exception("No latency to finish")
@property
def latency(self) -> Optional[int]:
return self.total_latency.steps
def __eq__(self, other):
if not isinstance(other, Message):
return NotImplemented
return self.latency == other.latency
def __lt__(self, other):
if not isinstance(other, Message):
return NotImplemented
if self.latency is None or other.latency is None:
return NotImplemented
return self.latency < other.latency
MessageStorage = Dict[str, Message]
@dataclass
class QueueEvent:
event: Event
queue_size_after_event: int
@dataclass
class NodeEvents:
id: int
msg_generation_events: list[Event] = field(default_factory=list)
persistent_transmission_events: list[QueueEvent] = field(default_factory=list)
persistent_transmission_queue_size: int = 0
temporal_processor_events: list[QueueEvent] = field(default_factory=list)
temporal_processor_queue_size: int = 0
fully_unwrapped_msg_events: list[Event] = field(default_factory=list)
def add_msg_generation_event(self, event: Event):
self.msg_generation_events.append(event)
def add_persistent_transmission_event(self, event: Event):
if event.topic == "PersistentTransmissionScheduled":
self.persistent_transmission_queue_size += 1
elif event.topic == "MessageReleasedFromPersistentTransmission":
self.persistent_transmission_queue_size -= 1
else:
raise Exception(
f"Unexpected event topic for persistent transmission: {event.topic}"
)
self.persistent_transmission_events.append(
QueueEvent(event, self.persistent_transmission_queue_size)
)
def add_temporal_processor_event(self, event: Event):
if event.topic == "TemporalProcessorScheduled":
self.temporal_processor_queue_size += 1
elif event.topic == "MessageReleasedFromTemporalProcessor":
self.temporal_processor_queue_size -= 1
else:
raise Exception(
f"Unexpected event topic for temporal processor: {event.topic}"
)
self.temporal_processor_events.append(
QueueEvent(event, self.temporal_processor_queue_size)
)
def add_fully_unwrapped_msg_event(self, event: Event):
self.fully_unwrapped_msg_events.append(event)
class NodeStorage:
def __init__(self):
self.storage: dict[int, NodeEvents] = {}
def get(self, node_id: int) -> NodeEvents:
if node_id not in self.storage:
self.storage[node_id] = NodeEvents(node_id)
return self.storage[node_id]
def to_dict(self) -> dict[str, dict[str, Any]]:
return {str(node_id): asdict(node) for node_id, node in self.storage.items()}
import pandas as pd
@dataclass
class LatencyAnalysis:
total_messages: int
total_complete_messages: int
total_incomplete_messages: int
min_latency_steps: int
min_latency_ms: int
min_latency_analysis: "MessageLatencyAnalysis"
max_latency_steps: int
max_latency_ms: int
max_latency_analysis: "MessageLatencyAnalysis"
avg_latency_steps: float
avg_latency_ms: int
median_latency_steps: float
median_latency_ms: int
@classmethod
def build(
cls,
message_storage: MessageStorage,
node_storage: NodeStorage,
step_duration: int,
) -> "LatencyAnalysis":
complete_messages = [
message
for message in message_storage.values()
if message.latency is not None
]
incomplete_messages = sum(
(1 for message in message_storage.values() if message.latency is None)
)
def build(cls, input_stream: Iterable[tuple[str, dict]]) -> "LatencyAnalysis":
latencies = []
message_ids = []
messages: dict[str, dict] = {}
for topic, record in input_stream:
if topic != "MessageFullyUnwrapped":
continue
latencies.append(record["total_duration"])
message_id = record["message"]["payload_id"]
message_ids.append(message_id)
messages[message_id] = record
total_messages = len(message_storage)
total_complete_messages = len(complete_messages)
total_incomplete_messages = incomplete_messages
complete_latencies = [
message.latency
for message in complete_messages
if message.latency is not None
]
min_message = min(complete_messages)
min_latency_steps = min_message.latency
assert min_latency_steps is not None
min_latency_ms = int(min_latency_steps * step_duration)
min_latency_analysis = MessageLatencyAnalysis.build(
min_message, node_storage, step_duration
)
max_message = max(complete_messages)
max_latency_steps = max_message.latency
assert max_latency_steps is not None
max_latency_ms = int(max_latency_steps * step_duration)
max_latency_analysis = MessageLatencyAnalysis.build(
max_message, node_storage, step_duration
)
avg_latency_steps = statistics.mean(complete_latencies)
avg_latency_ms = int(avg_latency_steps * step_duration)
median_latency_steps = statistics.median(complete_latencies)
median_latency_ms = int(median_latency_steps * step_duration)
latencies = pd.Series(latencies)
message_ids = pd.Series(message_ids)
return cls(
total_messages=total_messages,
total_complete_messages=total_complete_messages,
total_incomplete_messages=total_incomplete_messages,
min_latency_steps=min_latency_steps,
min_latency_ms=min_latency_ms,
min_latency_analysis=min_latency_analysis,
max_latency_steps=max_latency_steps,
max_latency_ms=max_latency_ms,
max_latency_analysis=max_latency_analysis,
avg_latency_steps=avg_latency_steps,
avg_latency_ms=avg_latency_ms,
median_latency_steps=median_latency_steps,
median_latency_ms=median_latency_ms,
int(latencies.count()),
int(latencies.min()),
MessageLatencyAnalysis.build(
messages[str(message_ids[latencies.idxmin()])]
),
int(latencies.max()),
MessageLatencyAnalysis.build(
messages[str(message_ids[latencies.idxmax()])]
),
int(latencies.mean()),
int(latencies.median()),
)
@dataclass
class MessageLatencyAnalysis:
message_id: str
persistent_latencies_step: list[int]
persistent_latencies_ms: list[int]
persistent_queue_sizes: list[int]
temporal_latencies_step: list[int]
persistent_indices: list[int]
temporal_latencies_ms: list[int]
temporal_queue_sizes: list[int]
temporal_indices: list[int]
@classmethod
def build(
cls,
message: Message,
node_storage: NodeStorage,
step_duration: int,
message: dict,
) -> "MessageLatencyAnalysis":
persistent_latencies_step = []
persistent_latencies_ms = []
persistent_queue_sizes = []
for latency in message.persistent_transmission_latencies:
if latency.steps is None:
continue
persistent_latencies_step.append(latency.steps)
persistent_latencies_ms.append(latency.steps * step_duration)
persistent_queue_sizes.append(
next(
(
event.queue_size_after_event
for event in node_storage.get(
latency.start_event.node_id
).persistent_transmission_events
if event.event.topic == "PersistentTransmissionScheduled"
and event.event.msg_id == message.id
)
analysis = cls(message["message"]["payload_id"], [], [], [], [])
for event in message["history"]:
event_type = event["event_type"]
if "PersistentTransmissionScheduled" in event_type:
analysis.persistent_indices.append(
int(event_type["PersistentTransmissionScheduled"]["index"])
)
)
temporal_latencies_step = []
temporal_latencies_ms = []
temporal_queue_sizes = []
for latency in message.temporal_processor_latencies:
if latency.steps is None:
continue
temporal_latencies_step.append(latency.steps)
temporal_latencies_ms.append(latency.steps * step_duration)
temporal_queue_sizes.append(
next(
(
event.queue_size_after_event
for event in node_storage.get(
latency.start_event.node_id
).temporal_processor_events
if event.event.topic == "TemporalProcessorScheduled"
and event.event.msg_id == message.id
)
elif event_type == "PersistentTransmissionReleased":
analysis.persistent_latencies_ms.append(event["duration_from_prev"])
elif "TemporalProcessorScheduled" in event_type:
analysis.temporal_indices.append(
int(event_type["TemporalProcessorScheduled"]["index"])
)
)
elif event_type == "TemporalProcessorReleased":
analysis.temporal_latencies_ms.append(event["duration_from_prev"])
return cls(
message_id=message.id,
persistent_latencies_step=persistent_latencies_step,
persistent_latencies_ms=persistent_latencies_ms,
persistent_queue_sizes=persistent_queue_sizes,
temporal_latencies_step=temporal_latencies_step,
temporal_latencies_ms=temporal_latencies_ms,
temporal_queue_sizes=temporal_queue_sizes,
)
def parse_record_stream(
record_stream: Iterable[tuple[str, dict]],
) -> tuple[MessageStorage, NodeStorage]:
msg_storage: MessageStorage = {}
node_storage: NodeStorage = NodeStorage()
for topic, record in record_stream:
if topic in ("DataMessageGenerated", "CoverMessageGenerated"):
event = event_from_record(topic, record)
payload_id = record["payload_id"]
msg_storage[payload_id] = Message(payload_id, Latency(event))
node_storage.get(record["node_id"]).add_msg_generation_event(event)
elif topic == "MessageFullyUnwrapped":
event = event_from_record(topic, record)
msg_storage[record["payload_id"]].fully_unwrapped(event)
node_storage.get(record["node_id"]).add_fully_unwrapped_msg_event(event)
elif topic == "PersistentTransmissionScheduled":
event = event_from_record(topic, record)
msg_storage[record["payload_id"]].persistent_transmission_scheduled(event)
node_storage.get(record["node_id"]).add_persistent_transmission_event(event)
elif topic == "MessageReleasedFromPersistentTransmission":
event = event_from_record(topic, record)
msg_storage[record["payload_id"]].persistent_transmission_released(event)
node_storage.get(record["node_id"]).add_persistent_transmission_event(event)
elif topic == "TemporalProcessorScheduled":
event = event_from_record(topic, record)
msg_storage[record["payload_id"]].temporal_processor_scheduled(event)
node_storage.get(record["node_id"]).add_temporal_processor_event(event)
elif topic == "MessageReleasedFromTemporalProcessor":
event = event_from_record(topic, record)
msg_storage[record["payload_id"]].temporal_processor_released(event)
node_storage.get(record["node_id"]).add_temporal_processor_event(event)
return msg_storage, node_storage
def event_from_record(topic: str, record: dict) -> Event:
return Event(topic, record["payload_id"], record["step_id"], record["node_id"])
return analysis
def build_argument_parser() -> argparse.ArgumentParser:
@ -363,8 +102,5 @@ if __name__ == "__main__":
argument_parser = build_argument_parser()
arguments = argument_parser.parse_args()
input_stream = mixlog.get_input_stream(arguments.input_file)
messages, nodes = parse_record_stream(input_stream)
results = LatencyAnalysis.build(messages, nodes, arguments.step_duration)
print(json.dumps(asdict(results), indent=2))
analysis = LatencyAnalysis.build(mixlog.get_input_stream(arguments.input_file))
print(json.dumps(asdict(analysis), indent=2))