From 8d227dac755abbcaab94b6cbb7b257eb983a924b Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Fri, 20 Dec 2024 16:33:06 +0900 Subject: [PATCH] use topic for logs --- simlib/blendnet-sims/scripts/emission.py | 21 ++++----------------- simlib/blendnet-sims/scripts/latency.py | 22 ++++++++++++---------- simlib/blendnet-sims/scripts/mixlog.py | 18 ++++++++++++++---- simlib/blendnet-sims/src/main.rs | 12 +++++++----- simlib/blendnet-sims/src/node/blend/log.rs | 10 ++++++++++ simlib/blendnet-sims/src/node/blend/mod.rs | 20 ++++++++++++-------- 6 files changed, 59 insertions(+), 44 deletions(-) create mode 100644 simlib/blendnet-sims/src/node/blend/log.rs diff --git a/simlib/blendnet-sims/scripts/emission.py b/simlib/blendnet-sims/scripts/emission.py index 6caa2e8..fa0be5e 100644 --- a/simlib/blendnet-sims/scripts/emission.py +++ b/simlib/blendnet-sims/scripts/emission.py @@ -1,16 +1,14 @@ import argparse -import json from collections.abc import Iterable from typing import Any import matplotlib import matplotlib.pyplot as plt +import mixlog import pandas as pd -import mixlog - -def plot_emissions(input_stream: Iterable[str], plot_path: str) -> None: +def plot_emissions(input_stream: Iterable[tuple[str, dict]], plot_path: str) -> None: df = pd.DataFrame(emission_records(input_stream)) plt.figure(figsize=(12, 6)) @@ -24,19 +22,8 @@ def plot_emissions(input_stream: Iterable[str], plot_path: str) -> None: plt.show() -def emission_records(input_stream: Iterable[str]) -> list[Any]: - records = [] - - for line in input_stream: - try: - record = json.loads(line) - except json.JSONDecodeError: - continue - - if "emission_type" in record: - records.append(record) - - return records +def emission_records(input_stream: Iterable[tuple[str, dict]]) -> list[Any]: + return [record for _, record in filter(lambda x: x[0] == "Emission", input_stream)] if __name__ == "__main__": diff --git a/simlib/blendnet-sims/scripts/latency.py b/simlib/blendnet-sims/scripts/latency.py index 751eea5..51941ef 100755 --- a/simlib/blendnet-sims/scripts/latency.py +++ b/simlib/blendnet-sims/scripts/latency.py @@ -90,18 +90,20 @@ def compute_results( } -def parse_record_stream(record_stream: Iterable[str]) -> MessageStorage: +def parse_record_stream(record_stream: Iterable[tuple[str, dict]]) -> MessageStorage: storage: MessageStorage = {} - for record in record_stream: - try: - json_record = json.loads(record) - except json.decoder.JSONDecodeError: - continue - - if (payload_id := json_record.get("payload_id")) is None: - continue - step_id = json_record["step_id"] + for _, record in filter( + lambda x: x[0] + in ( + "DataMessageGenerated", + "CoverMessageGenerated", + "MessageFullyUnwrapped", + ), + record_stream, + ): + payload_id = record["payload_id"] + step_id = record["step_id"] if (stored_message := storage.get(payload_id)) is None: storage[payload_id] = Message(payload_id, step_id) diff --git a/simlib/blendnet-sims/scripts/mixlog.py b/simlib/blendnet-sims/scripts/mixlog.py index 8044012..c426aad 100644 --- a/simlib/blendnet-sims/scripts/mixlog.py +++ b/simlib/blendnet-sims/scripts/mixlog.py @@ -1,12 +1,22 @@ +import json import sys from collections.abc import Iterable from typing import Optional +TOPIC_INDICATOR = "Topic:" -def line_to_json_stream(record_stream: Iterable[str]) -> Iterable[str]: + +def line_to_json_stream(record_stream: Iterable[str]) -> Iterable[tuple[str, dict]]: for record in record_stream: - bracket_pos = record.find("{") - yield record[bracket_pos:] + topic_idx = record.find(TOPIC_INDICATOR) + if topic_idx == -1: + continue + + # Split the line into 2 parts: topic and JSON message + parts = record[topic_idx + len(TOPIC_INDICATOR) :].split(":", maxsplit=1) + topic = parts[0].strip() + json_record = json.loads(parts[1].strip()) + yield (topic, json_record) def get_pipe_stream() -> Iterable[str]: @@ -18,7 +28,7 @@ def get_file_stream(input_filename) -> Iterable[str]: yield from file -def get_input_stream(input_filename: Optional[str]) -> Iterable[str]: +def get_input_stream(input_filename: Optional[str]) -> Iterable[tuple[str, dict]]: stream = ( get_file_stream(input_filename) if input_filename is not None diff --git a/simlib/blendnet-sims/src/main.rs b/simlib/blendnet-sims/src/main.rs index f825a5e..f13626e 100644 --- a/simlib/blendnet-sims/src/main.rs +++ b/simlib/blendnet-sims/src/main.rs @@ -246,11 +246,13 @@ fn load_json_from_file(path: &Path) -> anyhow::Result { } fn log_topology(topology: &Topology) { - let log = TopologyLog { - topology: topology.to_node_indices(), - diameter: topology.diameter(), - }; - tracing::info!("Topology: {}", serde_json::to_string(&log).unwrap()); + log!( + "Topology", + TopologyLog { + topology: topology.to_node_indices(), + diameter: topology.diameter(), + } + ); } #[derive(Debug, Serialize, Deserialize)] diff --git a/simlib/blendnet-sims/src/node/blend/log.rs b/simlib/blendnet-sims/src/node/blend/log.rs new file mode 100644 index 0000000..ea34bc6 --- /dev/null +++ b/simlib/blendnet-sims/src/node/blend/log.rs @@ -0,0 +1,10 @@ +#[macro_export] +macro_rules! log { + ($topic:expr, $msg:expr) => { + tracing::info!( + "Topic:{}: {}", + $topic, + serde_json::to_string(&$msg).unwrap() + ); + }; +} diff --git a/simlib/blendnet-sims/src/node/blend/mod.rs b/simlib/blendnet-sims/src/node/blend/mod.rs index 346324c..f1b4d38 100644 --- a/simlib/blendnet-sims/src/node/blend/mod.rs +++ b/simlib/blendnet-sims/src/node/blend/mod.rs @@ -1,4 +1,6 @@ pub mod consensus_streams; +#[macro_use] +pub mod log; pub mod lottery; mod message; pub mod scheduler; @@ -293,17 +295,19 @@ impl BlendNode { self.log_message("MessageFullyUnwrapped", payload); } - fn log_message(&self, tag: &str, payload: &Payload) { - let log = MessageLog { - payload_id: payload.id(), - step_id: self.state.step_id, - node_id: self.id.index(), - }; - tracing::info!("{}: {}", tag, serde_json::to_string(&log).unwrap()); + fn log_message(&self, topic: &str, payload: &Payload) { + log!( + topic, + MessageLog { + payload_id: payload.id(), + step_id: self.state.step_id, + node_id: self.id.index(), + } + ); } fn log_emission(log: &EmissionLog) { - tracing::info!("Emission: {}", serde_json::to_string(log).unwrap()); + log!("Emission", log); } fn new_emission_log(&self, emission_type: &str) -> EmissionLog {