mirror of
https://github.com/logos-blockchain/logos-blockchain-simulations.git
synced 2026-01-07 15:43:09 +00:00
Adapt to new log format
This commit is contained in:
parent
e66fb8047b
commit
4ac7cfed8b
@ -1,8 +1,8 @@
|
||||
# !/usr/bin/env python
|
||||
import json
|
||||
import sys
|
||||
from collections.abc import Iterable
|
||||
from typing import Dict, Optional
|
||||
import json_stream
|
||||
import statistics
|
||||
import argparse
|
||||
|
||||
@ -12,101 +12,95 @@ JsonStream = Iterable[TransientStreamingJSONObject]
|
||||
|
||||
|
||||
class Message:
|
||||
def __init__(self, message_id: str, step_a: Optional[int]):
|
||||
self.id = message_id
|
||||
self.step_a = int(step_a) if step_a is not None else None
|
||||
self.step_b = None
|
||||
|
||||
def __hash__(self):
|
||||
return self.id
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
message_id: str,
|
||||
generator_node: Optional[str],
|
||||
generator_step: Optional[int],
|
||||
unwrapper_node: Optional[str],
|
||||
unwrapper_step: Optional[int]
|
||||
):
|
||||
self.id = message_id
|
||||
self.generator_node = generator_node
|
||||
self.generator_step = int(generator_step) if generator_step is not None else None
|
||||
self.unwrapper_node = unwrapper_node
|
||||
self.unwrapper_step = int(unwrapper_step) if unwrapper_step is not None else None
|
||||
|
||||
def __repr__(self):
|
||||
return f"[{self.id}] {self.generator_node}-{self.generator_step} -> {self.unwrapper_node}-{self.unwrapper_step}"
|
||||
return f"[{self.id}] {self.step_a} -> {self.step_b}"
|
||||
|
||||
@property
|
||||
def latency(self) -> Optional[int]:
|
||||
if self.unwrapper_step is not None and self.generator_step is not None:
|
||||
return self.unwrapper_step - self.generator_step
|
||||
if self.step_a is not None and self.step_b is not None:
|
||||
return abs(self.step_a - self.step_b)
|
||||
|
||||
|
||||
MessageStorage = Dict[str, Message]
|
||||
|
||||
|
||||
def print_results(message_storage: MessageStorage, step_duration: int):
|
||||
def compute_results(message_storage: MessageStorage, step_duration: int) -> str:
|
||||
latencies = [message_record.latency for message_record in message_storage.values()]
|
||||
valued_latencies = [latency for latency in latencies if latency is not None]
|
||||
incomplete_latencies = sum((1 for latency in latencies if latency is None))
|
||||
|
||||
total_messages = len(latencies)
|
||||
total_messages_full_latency = len(valued_latencies)
|
||||
total_messages_incomplete_latency = incomplete_latencies
|
||||
latency_average_steps = statistics.mean(valued_latencies)
|
||||
latency_average_ms = "{:.2f}ms".format(latency_average_steps * step_duration)
|
||||
latency_median_steps = statistics.median(valued_latencies)
|
||||
latency_median_ms = "{:.2f}ms".format(latency_median_steps * step_duration)
|
||||
max_latency_steps = max(valued_latencies)
|
||||
max_latency_ms = "{:.2f}ms".format(max_latency_steps * step_duration)
|
||||
min_latency_steps = min(valued_latencies)
|
||||
min_latency_ms = "{:.2f}ms".format(min_latency_steps * step_duration)
|
||||
|
||||
print("[Results]")
|
||||
print(f"- Total messages: {len(latencies)}")
|
||||
print(f" - Full latencies: {len(valued_latencies)}")
|
||||
print(f" - Incomplete latencies: {incomplete_latencies}")
|
||||
print("- Average")
|
||||
print(f" - Steps: {latency_average_steps}")
|
||||
print(" - Duration: {:.2f}ms".format(latency_average_steps * step_duration))
|
||||
print("- Median")
|
||||
print(f" - Steps: {latency_median_steps}")
|
||||
print(" - Duration: {:.2f}ms".format(latency_median_steps * step_duration))
|
||||
return f"""[Results]
|
||||
- Total messages: {total_messages}
|
||||
- Full latencies: {total_messages_full_latency}
|
||||
- Incomplete latencies: {total_messages_incomplete_latency}
|
||||
- Averages
|
||||
- Steps: {latency_average_steps}
|
||||
- Duration: {latency_average_ms}
|
||||
- Median
|
||||
- Steps: {latency_median_steps}
|
||||
- Duration: {latency_median_ms}
|
||||
- Max
|
||||
- Steps: {max_latency_steps}
|
||||
- Duration: {max_latency_ms}
|
||||
- Min
|
||||
- Steps: {min_latency_steps}
|
||||
- Duration: {min_latency_ms}"""
|
||||
|
||||
|
||||
def parse_record_stream(record_stream: JsonStream) -> MessageStorage:
|
||||
def parse_record_stream(record_stream: Iterable[str]) -> MessageStorage:
|
||||
storage: MessageStorage = {}
|
||||
|
||||
for record in record_stream:
|
||||
node_id = record["node_id"]
|
||||
_step_id = record["step_id"]
|
||||
json_record = json.loads(record)
|
||||
payload_id = json_record["payload_id"]
|
||||
step_id = json_record["step_id"]
|
||||
|
||||
data_messages_generated = record["data_messages_generated"]
|
||||
for generated_message_id, generated_message_step_id in data_messages_generated.items():
|
||||
stored_message = storage.get(generated_message_id)
|
||||
if stored_message:
|
||||
stored_message.generator_node = node_id
|
||||
stored_message.generator_step = generated_message_step_id
|
||||
else:
|
||||
storage[generated_message_id] = Message(
|
||||
generated_message_id, node_id, generated_message_step_id, None, None
|
||||
)
|
||||
|
||||
data_messages_fully_unwrapped = record["data_messages_fully_unwrapped"]
|
||||
for generated_message_id, generated_message_step_id in data_messages_fully_unwrapped.items():
|
||||
stored_message = storage.get(generated_message_id)
|
||||
if stored_message:
|
||||
stored_message.unwrapper_node = node_id
|
||||
stored_message.unwrapper_step = generated_message_step_id
|
||||
else:
|
||||
storage[generated_message_id] = Message(
|
||||
generated_message_id, None, None, node_id, generated_message_step_id
|
||||
)
|
||||
if (stored_message := storage.get(payload_id)) is None:
|
||||
storage[payload_id] = Message(payload_id, step_id)
|
||||
else:
|
||||
stored_message.step_b = step_id
|
||||
|
||||
return storage
|
||||
|
||||
|
||||
def from_pipe() -> JsonStream:
|
||||
yield from json_stream.load(sys.stdin)
|
||||
def line_to_json_stream(record_stream: Iterable[str]) -> Iterable[str]:
|
||||
for record in record_stream:
|
||||
bracket_pos = record.rfind("{")
|
||||
yield record[bracket_pos:]
|
||||
|
||||
|
||||
def from_file(input_filename) -> JsonStream:
|
||||
def get_pipe_stream() -> Iterable[str]:
|
||||
yield from sys.stdin
|
||||
|
||||
|
||||
def get_file_stream(input_filename) -> Iterable[str]:
|
||||
with open(input_filename, "r") as file:
|
||||
data = json_stream.load(file)
|
||||
yield from data["records"]
|
||||
yield from file
|
||||
|
||||
|
||||
def get_input_stream(input_filename: Optional[str]) -> JsonStream:
|
||||
if input_filename is not None:
|
||||
return from_file(input_filename)
|
||||
return from_pipe()
|
||||
def get_input_stream(input_filename: Optional[str]) -> Iterable[str]:
|
||||
stream = get_file_stream(input_filename) if input_filename is not None else get_pipe_stream()
|
||||
return line_to_json_stream(stream)
|
||||
|
||||
|
||||
def build_argument_parser() -> argparse.ArgumentParser:
|
||||
@ -132,4 +126,5 @@ if __name__ == "__main__":
|
||||
input_stream = get_input_stream(arguments.input_file)
|
||||
messages = parse_record_stream(input_stream)
|
||||
|
||||
print_results(messages, arguments.step_duration)
|
||||
results = compute_results(messages, arguments.step_duration)
|
||||
print(results)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user