139 lines
4.4 KiB
Python
Raw Permalink Normal View History

# !/usr/bin/env python
import argparse
import json
import statistics
from collections.abc import Iterable
from typing import Dict, Optional
2024-11-18 10:31:27 +09:00
import mixlog
class Message:
def __init__(self, message_id: str, step_a: Optional[int]):
self.id = message_id
self.step_a = int(step_a) if step_a is not None else None
self.step_b = None
def __hash__(self):
return self.id
def __repr__(self):
return f"[{self.id}] {self.step_a} -> {self.step_b}"
@property
def latency(self) -> Optional[int]:
if self.step_a is not None and self.step_b is not None:
return abs(self.step_a - self.step_b)
2024-12-19 14:09:57 +09:00
def __eq__(self, other):
if not isinstance(other, Message):
return NotImplemented
return self.latency == other.latency
def __lt__(self, other):
if not isinstance(other, Message):
return NotImplemented
if self.latency is None or other.latency is None:
return NotImplemented
return self.latency < other.latency
MessageStorage = Dict[str, Message]
2024-12-19 13:58:42 +09:00
def compute_results(
message_storage: MessageStorage, step_duration: int
) -> dict[str, int | float | str]:
2024-12-19 14:09:57 +09:00
complete_messages = [
message for message in message_storage.values() if message.latency is not None
]
incomplete_messages = sum(
(1 for message in message_storage.values() if message.latency is None)
)
total_messages = len(message_storage)
total_complete_messages = len(complete_messages)
total_incomplete_messages = incomplete_messages
complete_latencies = [
message.latency for message in complete_messages if message.latency is not None
]
latency_average_steps = statistics.mean(complete_latencies)
2024-12-19 13:58:42 +09:00
latency_average_ms = "{:.2f}".format(latency_average_steps * step_duration)
2024-12-19 14:09:57 +09:00
latency_median_steps = statistics.median(complete_latencies)
2024-12-19 13:58:42 +09:00
latency_median_ms = "{:.2f}".format(latency_median_steps * step_duration)
2024-12-19 14:09:57 +09:00
max_message = max(complete_messages)
max_latency_steps = max_message.latency
assert max_latency_steps is not None
2024-12-19 13:58:42 +09:00
max_latency_ms = "{:.2f}".format(max_latency_steps * step_duration)
2024-12-19 14:09:57 +09:00
min_message = min(complete_messages)
min_latency_steps = min_message.latency
assert min_latency_steps is not None
2024-12-19 13:58:42 +09:00
min_latency_ms = "{:.2f}".format(min_latency_steps * step_duration)
return {
"total_messages": total_messages,
2024-12-19 14:09:57 +09:00
"total_complete_messages": total_complete_messages,
"total_incomplete_messages": total_incomplete_messages,
2024-12-19 13:58:42 +09:00
"latency_average_steps": latency_average_steps,
"latency_average_ms": latency_average_ms,
"latency_median_steps": latency_median_steps,
"latency_median_ms": latency_median_ms,
2024-12-19 14:09:57 +09:00
"max_latency_message_id": max_message.id,
2024-12-19 13:58:42 +09:00
"max_latency_steps": max_latency_steps,
"max_latency_ms": max_latency_ms,
2024-12-19 14:09:57 +09:00
"min_latency_message_id": min_message.id,
2024-12-19 13:58:42 +09:00
"min_latency_steps": min_latency_steps,
"min_latency_ms": min_latency_ms,
}
def parse_record_stream(record_stream: Iterable[str]) -> MessageStorage:
storage: MessageStorage = {}
for record in record_stream:
try:
json_record = json.loads(record)
except json.decoder.JSONDecodeError:
continue
if (payload_id := json_record.get("payload_id")) is None:
continue
step_id = json_record["step_id"]
if (stored_message := storage.get(payload_id)) is None:
storage[payload_id] = Message(payload_id, step_id)
else:
stored_message.step_b = step_id
return storage
def build_argument_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Log analysis for nomos-simulations.")
parser.add_argument(
"--step-duration",
type=int,
default=100,
help="Duration (in ms) of each step in the simulation.",
)
parser.add_argument(
"input_file",
nargs="?",
help="The file to parse. If not provided, input will be read from stdin.",
)
return parser
if __name__ == "__main__":
argument_parser = build_argument_parser()
arguments = argument_parser.parse_args()
2024-11-18 10:31:27 +09:00
input_stream = mixlog.get_input_stream(arguments.input_file)
messages = parse_record_stream(input_stream)
results = compute_results(messages, arguments.step_duration)
2024-12-19 13:58:42 +09:00
print(json.dumps(results, indent=4))