use topic for logs

This commit is contained in:
Youngjoon Lee 2024-12-20 16:33:06 +09:00
parent b8dd65f4a4
commit 8d227dac75
No known key found for this signature in database
GPG Key ID: 303963A54A81DD4D
6 changed files with 59 additions and 44 deletions

View File

@ -1,16 +1,14 @@
import argparse import argparse
import json
from collections.abc import Iterable from collections.abc import Iterable
from typing import Any from typing import Any
import matplotlib import matplotlib
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import mixlog
import pandas as pd import pandas as pd
import mixlog
def plot_emissions(input_stream: Iterable[tuple[str, dict]], plot_path: str) -> None:
def plot_emissions(input_stream: Iterable[str], plot_path: str) -> None:
df = pd.DataFrame(emission_records(input_stream)) df = pd.DataFrame(emission_records(input_stream))
plt.figure(figsize=(12, 6)) plt.figure(figsize=(12, 6))
@ -24,19 +22,8 @@ def plot_emissions(input_stream: Iterable[str], plot_path: str) -> None:
plt.show() plt.show()
def emission_records(input_stream: Iterable[str]) -> list[Any]: def emission_records(input_stream: Iterable[tuple[str, dict]]) -> list[Any]:
records = [] return [record for _, record in filter(lambda x: x[0] == "Emission", input_stream)]
for line in input_stream:
try:
record = json.loads(line)
except json.JSONDecodeError:
continue
if "emission_type" in record:
records.append(record)
return records
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -90,18 +90,20 @@ def compute_results(
} }
def parse_record_stream(record_stream: Iterable[str]) -> MessageStorage: def parse_record_stream(record_stream: Iterable[tuple[str, dict]]) -> MessageStorage:
storage: MessageStorage = {} storage: MessageStorage = {}
for record in record_stream: for _, record in filter(
try: lambda x: x[0]
json_record = json.loads(record) in (
except json.decoder.JSONDecodeError: "DataMessageGenerated",
continue "CoverMessageGenerated",
"MessageFullyUnwrapped",
if (payload_id := json_record.get("payload_id")) is None: ),
continue record_stream,
step_id = json_record["step_id"] ):
payload_id = record["payload_id"]
step_id = record["step_id"]
if (stored_message := storage.get(payload_id)) is None: if (stored_message := storage.get(payload_id)) is None:
storage[payload_id] = Message(payload_id, step_id) storage[payload_id] = Message(payload_id, step_id)

View File

@ -1,12 +1,22 @@
import json
import sys import sys
from collections.abc import Iterable from collections.abc import Iterable
from typing import Optional from typing import Optional
TOPIC_INDICATOR = "Topic:"
def line_to_json_stream(record_stream: Iterable[str]) -> Iterable[str]:
def line_to_json_stream(record_stream: Iterable[str]) -> Iterable[tuple[str, dict]]:
for record in record_stream: for record in record_stream:
bracket_pos = record.find("{") topic_idx = record.find(TOPIC_INDICATOR)
yield record[bracket_pos:] if topic_idx == -1:
continue
# Split the line into 2 parts: topic and JSON message
parts = record[topic_idx + len(TOPIC_INDICATOR) :].split(":", maxsplit=1)
topic = parts[0].strip()
json_record = json.loads(parts[1].strip())
yield (topic, json_record)
def get_pipe_stream() -> Iterable[str]: def get_pipe_stream() -> Iterable[str]:
@ -18,7 +28,7 @@ def get_file_stream(input_filename) -> Iterable[str]:
yield from file yield from file
def get_input_stream(input_filename: Optional[str]) -> Iterable[str]: def get_input_stream(input_filename: Optional[str]) -> Iterable[tuple[str, dict]]:
stream = ( stream = (
get_file_stream(input_filename) get_file_stream(input_filename)
if input_filename is not None if input_filename is not None

View File

@ -246,11 +246,13 @@ fn load_json_from_file<T: DeserializeOwned>(path: &Path) -> anyhow::Result<T> {
} }
fn log_topology(topology: &Topology) { fn log_topology(topology: &Topology) {
let log = TopologyLog { log!(
topology: topology.to_node_indices(), "Topology",
diameter: topology.diameter(), TopologyLog {
}; topology: topology.to_node_indices(),
tracing::info!("Topology: {}", serde_json::to_string(&log).unwrap()); diameter: topology.diameter(),
}
);
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]

View File

@ -0,0 +1,10 @@
#[macro_export]
macro_rules! log {
($topic:expr, $msg:expr) => {
tracing::info!(
"Topic:{}: {}",
$topic,
serde_json::to_string(&$msg).unwrap()
);
};
}

View File

@ -1,4 +1,6 @@
pub mod consensus_streams; pub mod consensus_streams;
#[macro_use]
pub mod log;
pub mod lottery; pub mod lottery;
mod message; mod message;
pub mod scheduler; pub mod scheduler;
@ -293,17 +295,19 @@ impl BlendNode {
self.log_message("MessageFullyUnwrapped", payload); self.log_message("MessageFullyUnwrapped", payload);
} }
fn log_message(&self, tag: &str, payload: &Payload) { fn log_message(&self, topic: &str, payload: &Payload) {
let log = MessageLog { log!(
payload_id: payload.id(), topic,
step_id: self.state.step_id, MessageLog {
node_id: self.id.index(), payload_id: payload.id(),
}; step_id: self.state.step_id,
tracing::info!("{}: {}", tag, serde_json::to_string(&log).unwrap()); node_id: self.id.index(),
}
);
} }
fn log_emission(log: &EmissionLog) { fn log_emission(log: &EmissionLog) {
tracing::info!("Emission: {}", serde_json::to_string(log).unwrap()); log!("Emission", log);
} }
fn new_emission_log(&self, emission_type: &str) -> EmissionLog { fn new_emission_log(&self, emission_type: &str) -> EmissionLog {