diff --git a/scripts/latency.py b/scripts/latency.py index 9dfb350..35551f9 100755 --- a/scripts/latency.py +++ b/scripts/latency.py @@ -1,17 +1,12 @@ # !/usr/bin/env python +import argparse import json -import sys +import statistics from collections.abc import Iterable from typing import Dict, Optional -import statistics -import argparse import mixlog -from json_stream.base import TransientStreamingJSONObject - -JsonStream = Iterable[TransientStreamingJSONObject] - class Message: def __init__(self, message_id: str, step_a: Optional[int]): @@ -73,8 +68,13 @@ def parse_record_stream(record_stream: Iterable[str]) -> MessageStorage: storage: MessageStorage = {} for record in record_stream: - json_record = json.loads(record) - payload_id = json_record["payload_id"] + try: + json_record = json.loads(record) + except json.decoder.JSONDecodeError: + continue + + if (payload_id := json_record.get("payload_id")) is None: + continue step_id = json_record["step_id"] if (stored_message := storage.get(payload_id)) is None: @@ -91,12 +91,12 @@ def build_argument_parser() -> argparse.ArgumentParser: "--step-duration", type=int, default=100, - help="Duration (in ms) of each step in the simulation." + help="Duration (in ms) of each step in the simulation.", ) parser.add_argument( "input_file", nargs="?", - help="The file to parse. If not provided, input will be read from stdin." + help="The file to parse. If not provided, input will be read from stdin.", ) return parser