mirror of
https://github.com/logos-blockchain/logos-blockchain-simulations.git
synced 2026-01-08 16:13:14 +00:00
gather complex latencies
This commit is contained in:
parent
97fd438dca
commit
265ff88381
@ -8,22 +8,73 @@ from typing import Dict, Optional
|
||||
import mixlog
|
||||
|
||||
|
||||
class Event:
|
||||
def __init__(self, step_id: int, node_id: int):
|
||||
self.step_id = step_id
|
||||
self.node_id = node_id
|
||||
|
||||
|
||||
class Latency:
|
||||
def __init__(self, start_event: Event):
|
||||
self.start_event = start_event
|
||||
self.end_event = None
|
||||
|
||||
def finish(self, event: Event):
|
||||
assert self.end_event is None
|
||||
assert event.step_id >= self.start_event.step_id
|
||||
self.end_event = event
|
||||
|
||||
def finished(self) -> bool:
|
||||
return self.end_event is not None
|
||||
|
||||
@property
|
||||
def value(self) -> Optional[int]:
|
||||
if self.end_event is None:
|
||||
return None
|
||||
return self.end_event.step_id - self.start_event.step_id
|
||||
|
||||
|
||||
class Message:
|
||||
def __init__(self, message_id: str, step_a: Optional[int]):
|
||||
def __init__(self, message_id: str, msg_gen_event: Event):
|
||||
self.id = message_id
|
||||
self.step_a = int(step_a) if step_a is not None else None
|
||||
self.step_b = None
|
||||
self.total_latency = Latency(msg_gen_event)
|
||||
self.persistent_transmission_latencies: list[Latency] = []
|
||||
self.blend_latencies: list[Latency] = []
|
||||
|
||||
def __hash__(self):
|
||||
return self.id
|
||||
|
||||
def __repr__(self):
|
||||
return f"[{self.id}] {self.step_a} -> {self.step_b}"
|
||||
def fully_unwrapped(self, event: Event):
|
||||
self.total_latency.finish(event)
|
||||
|
||||
def persistent_transmission_scheduled(self, event: Event):
|
||||
Message.start_new_latency(self.persistent_transmission_latencies, event)
|
||||
|
||||
def persistent_transmission_released(self, event: Event):
|
||||
Message.finish_recent_latency(self.persistent_transmission_latencies, event)
|
||||
|
||||
def blend_scheduled(self, event: Event):
|
||||
Message.start_new_latency(self.blend_latencies, event)
|
||||
|
||||
def blend_released(self, event: Event):
|
||||
Message.finish_recent_latency(self.blend_latencies, event)
|
||||
|
||||
@staticmethod
|
||||
def start_new_latency(latencies: list[Latency], event: Event):
|
||||
latencies.append(Latency(event))
|
||||
|
||||
@staticmethod
|
||||
def finish_recent_latency(latencies: list[Latency], event: Event):
|
||||
for latency in reversed(latencies):
|
||||
if latency.start_event.node_id == event.node_id:
|
||||
assert not latency.finished()
|
||||
latency.finish(event)
|
||||
return
|
||||
raise Exception("No latency to finish")
|
||||
|
||||
@property
|
||||
def latency(self) -> Optional[int]:
|
||||
if self.step_a is not None and self.step_b is not None:
|
||||
return abs(self.step_a - self.step_b)
|
||||
return self.total_latency.value
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, Message):
|
||||
@ -93,26 +144,32 @@ def compute_results(
|
||||
def parse_record_stream(record_stream: Iterable[tuple[str, dict]]) -> MessageStorage:
|
||||
storage: MessageStorage = {}
|
||||
|
||||
for _, record in filter(
|
||||
lambda x: x[0]
|
||||
in (
|
||||
"DataMessageGenerated",
|
||||
"CoverMessageGenerated",
|
||||
"MessageFullyUnwrapped",
|
||||
),
|
||||
record_stream,
|
||||
):
|
||||
payload_id = record["payload_id"]
|
||||
step_id = record["step_id"]
|
||||
|
||||
if (stored_message := storage.get(payload_id)) is None:
|
||||
storage[payload_id] = Message(payload_id, step_id)
|
||||
else:
|
||||
stored_message.step_b = step_id
|
||||
for topic, record in record_stream:
|
||||
if topic in ("DataMessageGenerated", "CoverMessageGenerated"):
|
||||
payload_id = record["payload_id"]
|
||||
storage[payload_id] = Message(payload_id, event_from_record(record))
|
||||
elif topic == "MessageFullyUnwrapped":
|
||||
storage[record["payload_id"]].fully_unwrapped(event_from_record(record))
|
||||
elif topic == "PersistentTransmissionScheduled":
|
||||
storage[record["payload_id"]].persistent_transmission_scheduled(
|
||||
event_from_record(record)
|
||||
)
|
||||
elif topic == "PersistentTransmissionReleased":
|
||||
storage[record["payload_id"]].persistent_transmission_released(
|
||||
event_from_record(record)
|
||||
)
|
||||
elif topic == "BlendScheduled":
|
||||
storage[record["payload_id"]].blend_scheduled(event_from_record(record))
|
||||
elif topic == "BlendReleased":
|
||||
storage[record["payload_id"]].blend_released(event_from_record(record))
|
||||
|
||||
return storage
|
||||
|
||||
|
||||
def event_from_record(record: dict) -> Event:
|
||||
return Event(record["step_id"], record["node_id"])
|
||||
|
||||
|
||||
def build_argument_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(description="Log analysis for nomos-simulations.")
|
||||
parser.add_argument(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user