update latency.py for detailed min/max latency analysis

This commit is contained in:
Youngjoon Lee 2024-12-21 16:28:55 +09:00
parent a71d36272d
commit 5ff2604662
No known key found for this signature in database
GPG Key ID: 303963A54A81DD4D

View File

@ -153,53 +153,139 @@ class NodeStorage:
return {str(node_id): asdict(node) for node_id, node in self.storage.items()}
def compute_results(
message_storage: MessageStorage, step_duration: int
) -> dict[str, int | float | str]:
complete_messages = [
message for message in message_storage.values() if message.latency is not None
]
incomplete_messages = sum(
(1 for message in message_storage.values() if message.latency is None)
)
@dataclass
class LatencyAnalysis:
total_messages: int
total_complete_messages: int
total_incomplete_messages: int
min_latency_steps: int
min_latency_ms: int
min_latency_analysis: "MessageLatencyAnalysis"
max_latency_steps: int
max_latency_ms: int
max_latency_analysis: "MessageLatencyAnalysis"
avg_latency_steps: float
avg_latency_ms: int
median_latency_steps: float
median_latency_ms: int
total_messages = len(message_storage)
total_complete_messages = len(complete_messages)
total_incomplete_messages = incomplete_messages
@classmethod
def build(
cls,
message_storage: MessageStorage,
node_storage: NodeStorage,
step_duration: int,
) -> "LatencyAnalysis":
complete_messages = [
message
for message in message_storage.values()
if message.latency is not None
]
incomplete_messages = sum(
(1 for message in message_storage.values() if message.latency is None)
)
complete_latencies = [
message.latency for message in complete_messages if message.latency is not None
]
latency_average_steps = statistics.mean(complete_latencies)
latency_average_ms = "{:.2f}".format(latency_average_steps * step_duration)
latency_median_steps = statistics.median(complete_latencies)
latency_median_ms = "{:.2f}".format(latency_median_steps * step_duration)
total_messages = len(message_storage)
total_complete_messages = len(complete_messages)
total_incomplete_messages = incomplete_messages
max_message = max(complete_messages)
max_latency_steps = max_message.latency
assert max_latency_steps is not None
max_latency_ms = "{:.2f}".format(max_latency_steps * step_duration)
complete_latencies = [
message.latency
for message in complete_messages
if message.latency is not None
]
min_message = min(complete_messages)
min_latency_steps = min_message.latency
assert min_latency_steps is not None
min_latency_ms = "{:.2f}".format(min_latency_steps * step_duration)
min_message = min(complete_messages)
min_latency_steps = min_message.latency
assert min_latency_steps is not None
min_latency_ms = int(min_latency_steps * step_duration)
min_latency_analysis = MessageLatencyAnalysis.build(min_message, node_storage)
return {
"total_messages": total_messages,
"total_complete_messages": total_complete_messages,
"total_incomplete_messages": total_incomplete_messages,
"latency_average_steps": latency_average_steps,
"latency_average_ms": latency_average_ms,
"latency_median_steps": latency_median_steps,
"latency_median_ms": latency_median_ms,
"max_latency_message_id": max_message.id,
"max_latency_steps": max_latency_steps,
"max_latency_ms": max_latency_ms,
"min_latency_message_id": min_message.id,
"min_latency_steps": min_latency_steps,
"min_latency_ms": min_latency_ms,
}
max_message = max(complete_messages)
max_latency_steps = max_message.latency
assert max_latency_steps is not None
max_latency_ms = int(max_latency_steps * step_duration)
max_latency_analysis = MessageLatencyAnalysis.build(max_message, node_storage)
avg_latency_steps = statistics.mean(complete_latencies)
avg_latency_ms = int(avg_latency_steps * step_duration)
median_latency_steps = statistics.median(complete_latencies)
median_latency_ms = int(median_latency_steps * step_duration)
return cls(
total_messages,
total_complete_messages,
total_incomplete_messages,
min_latency_steps,
min_latency_ms,
min_latency_analysis,
max_latency_steps,
max_latency_ms,
max_latency_analysis,
avg_latency_steps,
avg_latency_ms,
median_latency_steps,
median_latency_ms,
)
@dataclass
class MessageLatencyAnalysis:
message_id: str
persistent_latencies: list[int]
persistent_queue_sizes: list[int]
temporal_latencies: list[int]
temporal_queue_sizes: list[int]
@classmethod
def build(
cls, message: Message, node_storage: NodeStorage
) -> "MessageLatencyAnalysis":
persistent_latencies = []
persistent_queue_sizes = []
for latency in message.persistent_transmission_latencies:
if latency.steps is None:
continue
persistent_latencies.append(latency.steps)
persistent_queue_sizes.append(
next(
(
event.queue_size_after_event
for event in node_storage.get(
latency.start_event.node_id
).persistent_transmission_events
if event.event.topic == "PersistentTransmissionScheduled"
and event.event.msg_id == message.id
)
)
)
temporal_latencies = []
temporal_queue_sizes = []
for latency in message.temporal_processor_latencies:
if latency.steps is None:
continue
temporal_latencies.append(latency.steps)
temporal_queue_sizes.append(
next(
(
event.queue_size_after_event
for event in node_storage.get(
latency.start_event.node_id
).temporal_processor_events
if event.event.topic == "TemporalProcessorScheduled"
and event.event.msg_id == message.id
)
)
)
return cls(
message.id,
persistent_latencies,
persistent_queue_sizes,
temporal_latencies,
temporal_queue_sizes,
)
def parse_record_stream(
@ -263,7 +349,7 @@ if __name__ == "__main__":
arguments = argument_parser.parse_args()
input_stream = mixlog.get_input_stream(arguments.input_file)
messages, _ = parse_record_stream(input_stream)
messages, nodes = parse_record_stream(input_stream)
results = compute_results(messages, arguments.step_duration)
print(json.dumps(results, indent=4))
results = LatencyAnalysis.build(messages, nodes, arguments.step_duration)
print(json.dumps(asdict(results), indent=2))