add node storage and dump to json

This commit is contained in:
Youngjoon Lee 2024-12-20 19:33:47 +09:00
parent 16d9bc18e3
commit a5e5daad20
No known key found for this signature in database
GPG Key ID: 303963A54A81DD4D
2 changed files with 72 additions and 23 deletions

View File

@ -114,15 +114,19 @@ print(
)
for idx, log_path in enumerate(log_paths):
network_diameter = topology_result(log_path)["diameter"]
message_stroage = latency.parse_record_stream(mixlog.get_input_stream(log_path))
message_storage, node_storage = latency.parse_record_stream(
mixlog.get_input_stream(log_path)
)
with open(f"{log_dir}/msgs-{idx}.json", "w") as file:
json.dump(
{msg_id: asdict(msg) for msg_id, msg in message_stroage.items()},
{msg_id: asdict(msg) for msg_id, msg in message_storage.items()},
file,
indent=2,
)
with open(f"{log_dir}/nodes-{idx}.json", "w") as file:
json.dump(node_storage.to_dict(), file, indent=2)
latency_res = latency.compute_results(message_stroage, STEP_DURATION_MS)
latency_res = latency.compute_results(message_storage, STEP_DURATION_MS)
msg_count = latency_res["total_complete_messages"]
min_latency = float(latency_res["min_latency_ms"]) / 1000.0
min_latency_msg_id = latency_res["min_latency_message_id"]

View File

@ -3,14 +3,16 @@ import argparse
import json
import statistics
from collections.abc import Iterable
from dataclasses import dataclass, field
from typing import Dict, Optional
from dataclasses import asdict, dataclass, field
from typing import Any, Dict, Optional
import mixlog
@dataclass
class Event:
topic: str
msg_id: str
step_id: int
node_id: int
@ -93,6 +95,40 @@ class Message:
MessageStorage = Dict[str, Message]
@dataclass
class NodeEvents:
id: int
msg_generation_events: list[Event] = field(default_factory=list)
persistent_transmission_events: list[Event] = field(default_factory=list)
blend_events: list[Event] = field(default_factory=list)
fully_unwrapped_msg_events: list[Event] = field(default_factory=list)
def add_msg_generation_event(self, event: Event):
self.msg_generation_events.append(event)
def add_persistent_transmission_event(self, event: Event):
self.persistent_transmission_events.append(event)
def add_blend_event(self, event: Event):
self.blend_events.append(event)
def add_fully_unwrapped_msg_event(self, event: Event):
self.fully_unwrapped_msg_events.append(event)
class NodeStorage:
def __init__(self):
self.storage: dict[int, NodeEvents] = {}
def get(self, node_id: int) -> NodeEvents:
if node_id not in self.storage:
self.storage[node_id] = NodeEvents(node_id)
return self.storage[node_id]
def to_dict(self) -> dict[str, dict[str, Any]]:
return {str(node_id): asdict(node) for node_id, node in self.storage.items()}
def compute_results(
message_storage: MessageStorage, step_duration: int
) -> dict[str, int | float | str]:
@ -142,35 +178,44 @@ def compute_results(
}
def parse_record_stream(record_stream: Iterable[tuple[str, dict]]) -> MessageStorage:
storage: MessageStorage = {}
def parse_record_stream(
record_stream: Iterable[tuple[str, dict]],
) -> tuple[MessageStorage, NodeStorage]:
msg_storage: MessageStorage = {}
node_storage: NodeStorage = NodeStorage()
for topic, record in record_stream:
if topic in ("DataMessageGenerated", "CoverMessageGenerated"):
event = event_from_record(topic, record)
payload_id = record["payload_id"]
storage[payload_id] = Message(
payload_id, Latency(event_from_record(record))
)
msg_storage[payload_id] = Message(payload_id, Latency(event))
node_storage.get(record["node_id"]).add_msg_generation_event(event)
elif topic == "MessageFullyUnwrapped":
storage[record["payload_id"]].fully_unwrapped(event_from_record(record))
event = event_from_record(topic, record)
msg_storage[record["payload_id"]].fully_unwrapped(event)
node_storage.get(record["node_id"]).add_fully_unwrapped_msg_event(event)
elif topic == "PersistentTransmissionScheduled":
storage[record["payload_id"]].persistent_transmission_scheduled(
event_from_record(record)
)
event = event_from_record(topic, record)
msg_storage[record["payload_id"]].persistent_transmission_scheduled(event)
node_storage.get(record["node_id"]).add_persistent_transmission_event(event)
elif topic == "MessageReleasedFromPersistentTransmission":
storage[record["payload_id"]].persistent_transmission_released(
event_from_record(record)
)
event = event_from_record(topic, record)
msg_storage[record["payload_id"]].persistent_transmission_released(event)
node_storage.get(record["node_id"]).add_persistent_transmission_event(event)
elif topic == "BlendScheduled":
storage[record["payload_id"]].blend_scheduled(event_from_record(record))
event = event_from_record(topic, record)
msg_storage[record["payload_id"]].blend_scheduled(event)
node_storage.get(record["node_id"]).add_blend_event(event)
elif topic == "MessageReleasedFromBlend":
storage[record["payload_id"]].blend_released(event_from_record(record))
event = event_from_record(topic, record)
msg_storage[record["payload_id"]].blend_released(event)
node_storage.get(record["node_id"]).add_blend_event(event)
return storage
return msg_storage, node_storage
def event_from_record(record: dict) -> Event:
return Event(record["step_id"], record["node_id"])
def event_from_record(topic: str, record: dict) -> Event:
return Event(topic, record["payload_id"], record["step_id"], record["node_id"])
def build_argument_parser() -> argparse.ArgumentParser:
@ -194,7 +239,7 @@ if __name__ == "__main__":
arguments = argument_parser.parse_args()
input_stream = mixlog.get_input_stream(arguments.input_file)
messages = parse_record_stream(input_stream)
messages, _ = parse_record_stream(input_stream)
results = compute_results(messages, arguments.step_duration)
print(json.dumps(results, indent=4))