From 319450ed59dd443652f5b72a114d4c1dcc5f300f Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Mon, 18 Nov 2024 10:31:27 +0900 Subject: [PATCH] Add mix emission analysis (#55) --- scripts/emission.py | 56 ++++++++++++++++++++++++++++++++++++++++ scripts/latency.py | 24 +++-------------- scripts/mixlog.py | 27 +++++++++++++++++++ scripts/requirements.txt | 2 ++ 4 files changed, 88 insertions(+), 21 deletions(-) create mode 100644 scripts/emission.py create mode 100644 scripts/mixlog.py create mode 100644 scripts/requirements.txt diff --git a/scripts/emission.py b/scripts/emission.py new file mode 100644 index 0000000..6caa2e8 --- /dev/null +++ b/scripts/emission.py @@ -0,0 +1,56 @@ +import argparse +import json +from collections.abc import Iterable +from typing import Any + +import matplotlib +import matplotlib.pyplot as plt +import pandas as pd + +import mixlog + + +def plot_emissions(input_stream: Iterable[str], plot_path: str) -> None: + df = pd.DataFrame(emission_records(input_stream)) + + plt.figure(figsize=(12, 6)) + plt.scatter(df["step_id"], df["node_id"], c="red", marker="x", alpha=0.6) + plt.xlabel("Step ID") + plt.ylabel("Node ID") + plt.title("Distribution of Emissions") + plt.tight_layout() + plt.savefig(plot_path) + if matplotlib.is_interactive(): + plt.show() + + +def emission_records(input_stream: Iterable[str]) -> list[Any]: + records = [] + + for line in input_stream: + try: + record = json.loads(line) + except json.JSONDecodeError: + continue + + if "emission_type" in record: + records.append(record) + + return records + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Mix emission analysis") + parser.add_argument( + "--log-path", + nargs="?", + type=str, + help="An input log file path. If not provided, input will be read from stdin.", + ) + parser.add_argument( + "--plot-png-path", required=True, type=str, help="An output plot PNG file path" + ) + args = parser.parse_args() + + input = mixlog.get_input_stream(args.log_path) + plot_emissions(input, args.plot_png_path) diff --git a/scripts/latency.py b/scripts/latency.py index 6fd970c..9dfb350 100755 --- a/scripts/latency.py +++ b/scripts/latency.py @@ -6,6 +6,8 @@ from typing import Dict, Optional import statistics import argparse +import mixlog + from json_stream.base import TransientStreamingJSONObject JsonStream = Iterable[TransientStreamingJSONObject] @@ -83,26 +85,6 @@ def parse_record_stream(record_stream: Iterable[str]) -> MessageStorage: return storage -def line_to_json_stream(record_stream: Iterable[str]) -> Iterable[str]: - for record in record_stream: - bracket_pos = record.rfind("{") - yield record[bracket_pos:] - - -def get_pipe_stream() -> Iterable[str]: - yield from sys.stdin - - -def get_file_stream(input_filename) -> Iterable[str]: - with open(input_filename, "r") as file: - yield from file - - -def get_input_stream(input_filename: Optional[str]) -> Iterable[str]: - stream = get_file_stream(input_filename) if input_filename is not None else get_pipe_stream() - return line_to_json_stream(stream) - - def build_argument_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Log analysis for nomos-simulations.") parser.add_argument( @@ -123,7 +105,7 @@ if __name__ == "__main__": argument_parser = build_argument_parser() arguments = argument_parser.parse_args() - input_stream = get_input_stream(arguments.input_file) + input_stream = mixlog.get_input_stream(arguments.input_file) messages = parse_record_stream(input_stream) results = compute_results(messages, arguments.step_duration) diff --git a/scripts/mixlog.py b/scripts/mixlog.py new file mode 100644 index 0000000..c76a613 --- /dev/null +++ b/scripts/mixlog.py @@ -0,0 +1,27 @@ +import sys +from collections.abc import Iterable +from typing import Optional + + +def line_to_json_stream(record_stream: Iterable[str]) -> Iterable[str]: + for record in record_stream: + bracket_pos = record.rfind("{") + yield record[bracket_pos:] + + +def get_pipe_stream() -> Iterable[str]: + yield from sys.stdin + + +def get_file_stream(input_filename) -> Iterable[str]: + with open(input_filename, "r") as file: + yield from file + + +def get_input_stream(input_filename: Optional[str]) -> Iterable[str]: + stream = ( + get_file_stream(input_filename) + if input_filename is not None + else get_pipe_stream() + ) + return line_to_json_stream(stream) diff --git a/scripts/requirements.txt b/scripts/requirements.txt new file mode 100644 index 0000000..55036aa --- /dev/null +++ b/scripts/requirements.txt @@ -0,0 +1,2 @@ +matplotlib==3.9.2 +pandas==2.2.3