From c9fe4994c7a67c6c5fb454a7ba53500821b7482b Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Sat, 8 Feb 2025 10:20:53 +0900 Subject: [PATCH] commands for latency and history analysis --- simlib/blendnet-sims/Cargo.toml | 2 + simlib/blendnet-sims/src/analysis/history.rs | 136 ++++++++ simlib/blendnet-sims/src/analysis/latency.rs | 109 +++++++ simlib/blendnet-sims/src/analysis/mod.rs | 2 + simlib/blendnet-sims/src/main.rs | 101 +++++- simlib/blendnet-sims/src/node/blend/log.rs | 11 +- .../blendnet-sims/src/node/blend/message.rs | 85 ++--- simlib/blendnet-sims/src/node/blend/mod.rs | 299 ++++++++---------- 8 files changed, 505 insertions(+), 240 deletions(-) create mode 100644 simlib/blendnet-sims/src/analysis/history.rs create mode 100644 simlib/blendnet-sims/src/analysis/latency.rs create mode 100644 simlib/blendnet-sims/src/analysis/mod.rs diff --git a/simlib/blendnet-sims/Cargo.toml b/simlib/blendnet-sims/Cargo.toml index 9729a7a..6c3e8a8 100644 --- a/simlib/blendnet-sims/Cargo.toml +++ b/simlib/blendnet-sims/Cargo.toml @@ -27,3 +27,5 @@ sha2 = "0.10" uuid = { version = "1", features = ["fast-rng", "v4"] } tracing-appender = "0.2" cached = "0.54.0" +polars = "0.46.0" +humantime-serde = "1.1.1" diff --git a/simlib/blendnet-sims/src/analysis/history.rs b/simlib/blendnet-sims/src/analysis/history.rs new file mode 100644 index 0000000..bf47964 --- /dev/null +++ b/simlib/blendnet-sims/src/analysis/history.rs @@ -0,0 +1,136 @@ +use std::{ + error::Error, + fs::File, + io::{BufRead, BufReader}, + ops::{Add, Mul}, + path::PathBuf, + time::Duration, +}; + +use netrunner::node::NodeId; +use serde::{Deserialize, Serialize}; + +use crate::node::blend::{ + log::TopicLog, + message::{MessageEvent, MessageEventType, PayloadId}, +}; + +pub fn analyze_message_history( + log_file: PathBuf, + step_duration: Duration, + payload_id: PayloadId, +) -> Result<(), Box> { + let file = File::open(log_file)?; + let reader = BufReader::new(file); + + let mut history = Vec::new(); + let mut target_node_id: Option = None; + let mut target_event: Option = None; + + let lines: Vec = reader.lines().collect::>()?; + for line in lines.iter().rev() { + if let Ok(topic_log) = serde_json::from_str::>(line) { + assert_eq!(topic_log.topic, "MessageEvent"); + let event = topic_log.message; + if event.payload_id == payload_id + && (target_node_id.is_none() || target_node_id.unwrap() == event.node_id) + && (target_event.is_none() || target_event.as_ref().unwrap() == &event.event_type) + { + match event.event_type { + MessageEventType::FullyUnwrapped => { + assert!(history.is_empty()); + assert!(target_node_id.is_none()); + target_node_id = Some(event.node_id); + history.push(event); + } + MessageEventType::Created => { + assert!(!history.is_empty()); + assert!(target_node_id.is_some()); + history.push(event); + } + MessageEventType::PersistentTransmissionScheduled { .. } => { + assert!(target_node_id.is_some()); + assert!(matches!( + history.last().unwrap().event_type, + MessageEventType::PersistentTransmissionReleased { .. } + )); + history.push(event); + } + MessageEventType::PersistentTransmissionReleased => { + assert!(target_node_id.is_some()); + history.push(event); + } + MessageEventType::TemporalProcessorScheduled { .. } => { + assert!(target_node_id.is_some()); + assert!(matches!( + history.last().unwrap().event_type, + MessageEventType::TemporalProcessorReleased { .. } + )); + history.push(event); + } + MessageEventType::TemporalProcessorReleased => { + assert!(target_node_id.is_some()); + history.push(event); + } + MessageEventType::NetworkReceived { from } => { + assert!(!history.is_empty()); + assert!(target_node_id.is_some()); + assert_ne!(target_node_id.unwrap(), from); + target_node_id = Some(from); + target_event = Some(MessageEventType::NetworkSent { to: event.node_id }); + history.push(event); + } + MessageEventType::NetworkSent { .. } => { + assert!(!history.is_empty()); + assert!(target_node_id.is_some()); + if target_event.is_none() + || target_event.as_ref().unwrap() != &event.event_type + { + continue; + } + target_event = None; + history.push(event); + } + } + } + } + } + + let mut history_with_durations: Vec = Vec::new(); + let (_, total_duration) = history.iter().rev().fold( + (None, Duration::ZERO), + |(prev_step_id, total_duration): (Option, Duration), event| { + let duration = match prev_step_id { + Some(prev_step_id) => { + step_duration.mul((event.step_id - prev_step_id).try_into().unwrap()) + } + None => Duration::ZERO, + }; + history_with_durations.push(MessageEventWithDuration { + event: event.clone(), + duration, + }); + (Some(event.step_id), total_duration.add(duration)) + }, + ); + let output = Output { + history: history_with_durations, + total_duration, + }; + println!("{}", serde_json::to_string(&output).unwrap()); + Ok(()) +} + +#[derive(Serialize, Deserialize)] +struct Output { + history: Vec, + #[serde(with = "humantime_serde")] + total_duration: Duration, +} + +#[derive(Serialize, Deserialize)] +struct MessageEventWithDuration { + event: MessageEvent, + #[serde(with = "humantime_serde")] + duration: Duration, +} diff --git a/simlib/blendnet-sims/src/analysis/latency.rs b/simlib/blendnet-sims/src/analysis/latency.rs new file mode 100644 index 0000000..3275b90 --- /dev/null +++ b/simlib/blendnet-sims/src/analysis/latency.rs @@ -0,0 +1,109 @@ +use core::panic; +use std::{error::Error, ops::Mul, path::PathBuf, time::Duration}; + +use std::{ + collections::HashMap, + fs::File, + io::{BufRead, BufReader}, +}; + +use polars::prelude::{AnyValue, NamedFrom, QuantileMethod, Scalar}; +use polars::series::Series; +use serde::{Deserialize, Serialize}; + +use crate::node::blend::log::TopicLog; +use crate::node::blend::message::{MessageEvent, MessageEventType, PayloadId}; + +pub fn analyze_latency(log_file: PathBuf, step_duration: Duration) -> Result<(), Box> { + let file = File::open(log_file)?; + let reader = BufReader::new(file); + + let mut messages: HashMap = HashMap::new(); + let mut latencies_ms: Vec = Vec::new(); + let mut latency_to_message: HashMap = HashMap::new(); + + for line in reader.lines() { + let line = line?; + if let Ok(topic_log) = serde_json::from_str::>(&line) { + assert_eq!(topic_log.topic, "MessageEvent"); + let event = topic_log.message; + match event.event_type { + MessageEventType::Created => { + assert_eq!(messages.insert(event.payload_id, event.step_id), None); + } + MessageEventType::FullyUnwrapped => match messages.remove(&event.payload_id) { + Some(created_step_id) => { + let latency = step_duration + .mul((event.step_id - created_step_id).try_into().unwrap()) + .as_millis() + .try_into() + .unwrap(); + latencies_ms.push(latency); + latency_to_message.insert(latency, event.payload_id); + } + None => { + panic!( + "FullyUnwrapped event without Created event: {}", + event.payload_id + ); + } + }, + _ => { + continue; + } + } + } + } + + let series = Series::new("latencies".into(), latencies_ms); + let series = Output::new(&series, &latency_to_message); + println!("{}", serde_json::to_string(&series).unwrap()); + + Ok(()) +} + +#[derive(Serialize, Deserialize, Debug)] +struct Output { + min: i64, + min_payload_id: PayloadId, + q1: f64, + avg: f64, + med: f64, + q3: f64, + max: i64, + max_payload_id: PayloadId, +} + +impl Output { + fn new(series: &Series, latency_to_message: &HashMap) -> Self { + let min = series.min::().unwrap().unwrap(); + let min_payload_id = latency_to_message.get(&min).unwrap().clone(); + let max = series.max::().unwrap().unwrap(); + let max_payload_id = latency_to_message.get(&max).unwrap().clone(); + Self { + min, + min_payload_id, + q1: quantile(series, 0.25), + avg: series.mean().unwrap(), + med: series.median().unwrap(), + q3: quantile(series, 0.75), + max, + max_payload_id, + } + } +} + +fn quantile(series: &Series, quantile: f64) -> f64 { + f64_from_scalar( + &series + .quantile_reduce(quantile, QuantileMethod::Linear) + .unwrap(), + ) +} + +fn f64_from_scalar(scalar: &Scalar) -> f64 { + match scalar.value() { + AnyValue::Float64(value) => *value, + _ => panic!("Expected f64"), + } +} diff --git a/simlib/blendnet-sims/src/analysis/mod.rs b/simlib/blendnet-sims/src/analysis/mod.rs new file mode 100644 index 0000000..2b6d821 --- /dev/null +++ b/simlib/blendnet-sims/src/analysis/mod.rs @@ -0,0 +1,2 @@ +pub mod history; +pub mod latency; diff --git a/simlib/blendnet-sims/src/main.rs b/simlib/blendnet-sims/src/main.rs index ce8a149..af6102d 100644 --- a/simlib/blendnet-sims/src/main.rs +++ b/simlib/blendnet-sims/src/main.rs @@ -6,9 +6,11 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; // crates use crate::node::blend::state::{BlendnodeRecord, BlendnodeState}; -use crate::node::blend::{BlendMessage, BlendnodeSettings}; +use crate::node::blend::{BlendnodeSettings, SimMessage}; +use analysis::history::analyze_message_history; +use analysis::latency::analyze_latency; use anyhow::Ok; -use clap::Parser; +use clap::{Parser, Subcommand}; use crossbeam::channel; use multiaddr::Multiaddr; use netrunner::network::behaviour::create_behaviours; @@ -18,6 +20,7 @@ use netrunner::node::{NodeId, NodeIdExt}; use netrunner::output_processors::Record; use netrunner::runner::{BoxedNode, SimulationRunnerHandle}; use netrunner::streaming::{io::IOSubscriber, naive::NaiveSubscriber, StreamType}; +use node::blend::message::PayloadId; use node::blend::topology::Topology; use nomos_blend::cover_traffic::CoverTrafficSettings; use nomos_blend::message_blend::{ @@ -34,10 +37,36 @@ use crate::node::blend::BlendNode; use crate::settings::SimSettings; use netrunner::{runner::SimulationRunner, settings::SimulationSettings}; +pub mod analysis; mod log; mod node; mod settings; +#[derive(Parser)] +struct Cli { + #[command(subcommand)] + command: Commands, +} + +#[derive(Subcommand)] +enum Commands { + /// Run the simulation + Run(SimulationApp), + /// Analyze the simulation results + Analyze { + #[command(subcommand)] + command: AnalyzeCommands, + }, +} + +#[derive(Subcommand)] +enum AnalyzeCommands { + /// Analyze the latency of the messages fully unwrapped + Latency(AnalyzeLatencyApp), + /// Analyze the history of a message + MessageHistory(MessageHistoryApp), +} + /// Main simulation wrapper /// Pipes together the cli arguments with the execution #[derive(Parser)] @@ -90,15 +119,14 @@ impl SimulationApp { "Regions", regions .iter() - .map(|(region, node_ids)| (region, node_ids.len())) + .map(|(region, node_ids)| (*region, node_ids.len())) .collect::>() ); - log!("NumRegions", regions.len()); let behaviours = create_behaviours(&settings.simulation_settings.network_settings); let regions_data = RegionsData::new(regions, behaviours); - let network = Arc::new(Mutex::new(Network::::new( + let network = Arc::new(Mutex::new(Network::::new( regions_data.clone(), seed, ))); @@ -162,7 +190,7 @@ impl SimulationApp { fn create_boxed_blendnode( node_id: NodeId, - network: &mut Network, + network: &mut Network, simulation_settings: SimulationSettings, no_netcap: bool, blendnode_settings: BlendnodeSettings, @@ -197,7 +225,6 @@ fn create_boxed_blendnode( Box::new(BlendNode::new( node_id, blendnode_settings, - simulation_settings.step_time, network_interface, )) } @@ -302,14 +329,54 @@ struct ConnLatencyDistributionLog { distribution: HashMap, } -fn main() -> anyhow::Result<()> { - let app: SimulationApp = SimulationApp::parse(); - let maybe_guard = log::config_tracing(app.log_format, &app.log_to, app.with_metrics); - - if let Err(e) = app.run() { - tracing::error!("error: {}", e); - drop(maybe_guard); - std::process::exit(1); - } - Ok(()) +#[derive(Parser)] +struct AnalyzeLatencyApp { + #[clap(long, short)] + log_file: PathBuf, + #[clap(long, short, value_parser = humantime::parse_duration)] + step_duration: Duration, +} + +#[derive(Parser)] +struct MessageHistoryApp { + #[clap(long, short)] + log_file: PathBuf, + #[clap(long, short, value_parser = humantime::parse_duration)] + step_duration: Duration, + #[clap(long, short)] + payload_id: PayloadId, +} + +fn main() -> anyhow::Result<()> { + let cli = Cli::parse(); + match cli.command { + Commands::Run(app) => { + let maybe_guard = log::config_tracing(app.log_format, &app.log_to, app.with_metrics); + + if let Err(e) = app.run() { + tracing::error!("error: {}", e); + drop(maybe_guard); + std::process::exit(1); + } + Ok(()) + } + Commands::Analyze { command } => match command { + AnalyzeCommands::Latency(app) => { + if let Err(e) = analyze_latency(app.log_file, app.step_duration) { + tracing::error!("error: {}", e); + std::process::exit(1); + } + Ok(()) + } + AnalyzeCommands::MessageHistory(app) => { + if let Err(e) = + analyze_message_history(app.log_file, app.step_duration, app.payload_id) + { + tracing::error!("error: {}", e); + std::process::exit(1); + } + Ok(()) + } + }, + } } diff --git a/simlib/blendnet-sims/src/node/blend/log.rs b/simlib/blendnet-sims/src/node/blend/log.rs index 1a92805..b1f5bed 100644 --- a/simlib/blendnet-sims/src/node/blend/log.rs +++ b/simlib/blendnet-sims/src/node/blend/log.rs @@ -1,9 +1,9 @@ -use serde::Serialize; +use serde::{Deserialize, Serialize}; #[macro_export] macro_rules! log { ($topic:expr, $msg:expr) => { - tracing::info!( + println!( "{}", serde_json::to_string(&$crate::node::blend::log::TopicLog { topic: $topic.to_string(), @@ -14,8 +14,11 @@ macro_rules! log { }; } -#[derive(Serialize)] -pub struct TopicLog { +#[derive(Serialize, Deserialize)] +pub struct TopicLog +where + M: 'static, +{ pub topic: String, pub message: M, } diff --git a/simlib/blendnet-sims/src/node/blend/message.rs b/simlib/blendnet-sims/src/node/blend/message.rs index e314181..fb760f3 100644 --- a/simlib/blendnet-sims/src/node/blend/message.rs +++ b/simlib/blendnet-sims/src/node/blend/message.rs @@ -1,7 +1,5 @@ -use std::{ops::Mul, time::Duration}; - -use netrunner::node::serialize_node_id_as_index; use netrunner::node::NodeId; +use serde::Deserialize; use serde::Serialize; use uuid::Uuid; @@ -28,52 +26,16 @@ impl Payload { } } -#[derive(Debug, Clone, Serialize)] -pub struct MessageHistory(Vec); - -impl MessageHistory { - pub fn new() -> Self { - Self(Vec::new()) - } - - pub fn add( - &mut self, - node_id: NodeId, - step_id: usize, - step_time: Duration, - event_type: MessageEventType, - ) { - let duration_from_prev = self.0.last().map_or(Duration::ZERO, |prev_event| { - step_time.mul((step_id - prev_event.step_id).try_into().unwrap()) - }); - self.0.push(MessageEvent { - node_id, - step_id, - duration_from_prev, - event_type, - }); - } - - pub fn last_event_type(&self) -> Option<&MessageEventType> { - self.0.last().map(|event| &event.event_type) - } - - pub fn total_duration(&self) -> Duration { - self.0.iter().map(|event| event.duration_from_prev).sum() - } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MessageEvent { + pub payload_id: PayloadId, + pub step_id: usize, + #[serde(with = "node_id_serde")] + pub node_id: NodeId, + pub event_type: MessageEventType, } -#[derive(Debug, Clone, Serialize)] -struct MessageEvent { - #[serde(serialize_with = "serialize_node_id_as_index")] - node_id: NodeId, - step_id: usize, - #[serde(serialize_with = "duration_as_millis")] - duration_from_prev: Duration, - event_type: MessageEventType, -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum MessageEventType { Created, PersistentTransmissionScheduled { @@ -85,20 +47,35 @@ pub enum MessageEventType { }, TemporalProcessorReleased, NetworkSent { - #[serde(serialize_with = "serialize_node_id_as_index")] + #[serde(with = "node_id_serde")] to: NodeId, }, NetworkReceived { - #[serde(serialize_with = "serialize_node_id_as_index")] + #[serde(with = "node_id_serde")] from: NodeId, }, + FullyUnwrapped, } -pub fn duration_as_millis(duration: &Duration, s: S) -> Result -where - S: serde::Serializer, -{ - s.serialize_u64(duration.as_millis().try_into().unwrap()) +mod node_id_serde { + use netrunner::node::{NodeId, NodeIdExt}; + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize(node_id: &NodeId, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_u64(node_id.index().try_into().unwrap()) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + Ok(NodeId::from_index( + u64::deserialize(deserializer)?.try_into().unwrap(), + )) + } } #[cfg(test)] diff --git a/simlib/blendnet-sims/src/node/blend/mod.rs b/simlib/blendnet-sims/src/node/blend/mod.rs index 5dd538c..a28e088 100644 --- a/simlib/blendnet-sims/src/node/blend/mod.rs +++ b/simlib/blendnet-sims/src/node/blend/mod.rs @@ -2,7 +2,7 @@ pub mod consensus_streams; #[macro_use] pub mod log; pub mod lottery; -mod message; +pub mod message; pub mod scheduler; pub mod state; pub mod stream_wrapper; @@ -13,9 +13,9 @@ use cached::{Cached, TimedCache}; use crossbeam::channel; use futures::Stream; use lottery::StakeLottery; -use message::{duration_as_millis, MessageEventType, MessageHistory, Payload, PayloadId}; +use message::{MessageEvent, MessageEventType, Payload}; use netrunner::network::NetworkMessage; -use netrunner::node::{Node, NodeId, NodeIdExt}; +use netrunner::node::{Node, NodeId}; use netrunner::{ network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize}, warding::WardCondition, @@ -31,41 +31,20 @@ use nomos_blend::{ BlendOutgoingMessage, }; use nomos_blend_message::mock::MockBlendMessage; +use nomos_blend_message::BlendMessage as _; use rand::SeedableRng; use rand_chacha::ChaCha12Rng; use scheduler::{Interval, TemporalScheduler}; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; use sha2::{Digest, Sha256}; use state::BlendnodeState; use std::{pin::pin, task::Poll, time::Duration}; use stream_wrapper::CrossbeamReceiverStream; -#[derive(Debug, Clone, Serialize)] -pub struct BlendMessage { - message: Vec, - history: MessageHistory, -} +#[derive(Debug, Clone)] +pub struct SimMessage(Vec); -impl BlendMessage { - fn new(message: Vec, node_id: NodeId, step_id: usize, step_time: Duration) -> Self { - let mut history = MessageHistory::new(); - history.add(node_id, step_id, step_time, MessageEventType::Created); - Self { message, history } - } - - fn new_drop() -> Self { - Self { - message: Vec::new(), - history: MessageHistory::new(), - } - } - - fn is_drop(&self) -> bool { - self.message.is_empty() - } -} - -impl PayloadSize for BlendMessage { +impl PayloadSize for SimMessage { fn size_bytes(&self) -> u32 { // payload: 32 KiB // header encryption overhead: 133 bytes = 48 + 17 * max_blend_hops(=5) @@ -75,11 +54,6 @@ impl PayloadSize for BlendMessage { } } -struct BlendOutgoingMessageWithHistory { - outgoing_message: BlendOutgoingMessage, - history: MessageHistory, -} - #[derive(Deserialize)] pub struct BlendnodeSettings { pub connected_peers: Vec, @@ -106,24 +80,23 @@ pub struct BlendNode { id: NodeId, state: BlendnodeState, settings: BlendnodeSettings, - step_time: Duration, - network_interface: InMemoryNetworkInterface, + network_interface: InMemoryNetworkInterface, message_cache: TimedCache, data_msg_lottery_update_time_sender: channel::Sender, data_msg_lottery_interval: Interval, data_msg_lottery: StakeLottery, - persistent_sender: channel::Sender, + persistent_sender: channel::Sender, persistent_update_time_sender: channel::Sender, persistent_transmission_messages: - PersistentTransmissionStream, ChaCha12Rng, Interval>, + PersistentTransmissionStream, ChaCha12Rng, Interval>, crypto_processor: CryptographicProcessor, - temporal_sender: channel::Sender, + temporal_sender: channel::Sender, temporal_update_time_sender: channel::Sender, temporal_processor_messages: - TemporalStream, TemporalScheduler>, + TemporalStream, TemporalScheduler>, epoch_update_sender: channel::Sender, slot_update_sender: channel::Sender, @@ -134,8 +107,7 @@ impl BlendNode { pub fn new( id: NodeId, settings: BlendnodeSettings, - step_time: Duration, - network_interface: InMemoryNetworkInterface, + network_interface: InMemoryNetworkInterface, ) -> Self { let mut rng_generator = ChaCha12Rng::seed_from_u64(settings.seed); @@ -152,7 +124,7 @@ impl BlendNode { ); // Init Tier-1: Persistent transmission - let (persistent_sender, persistent_receiver) = channel::unbounded::(); + let (persistent_sender, persistent_receiver) = channel::unbounded::(); let (persistent_update_time_sender, persistent_update_time_receiver) = channel::unbounded(); let persistent_transmission_messages = CrossbeamReceiverStream::new(persistent_receiver) .persistent_transmission( @@ -164,7 +136,7 @@ impl BlendNode { ), persistent_update_time_receiver, ), - BlendMessage::new_drop(), + SimMessage(MockBlendMessage::DROP_MESSAGE.to_vec()), ); // Init Tier-2: message blend: CryptographicProcessor and TemporalProcessor @@ -202,7 +174,6 @@ impl BlendNode { Self { id, - step_time, network_interface, // We're not coupling this lifespan with the steps now, but it's okay // We expected that a message will be delivered to most of nodes within 60s. @@ -231,29 +202,41 @@ impl BlendNode { } } - fn forward(&mut self, message: BlendMessage, exclude_node: Option) { + fn parse_payload(message: &[u8]) -> Payload { + Payload::load(MockBlendMessage::payload(message).unwrap()) + } + + fn forward(&mut self, message: SimMessage, exclude_node: Option) { + let payload_id = Self::parse_payload(&message.0).id(); for node_id in self .settings .connected_peers .iter() .filter(|&id| Some(*id) != exclude_node) { - let mut message = message.clone(); - self.record_network_sent_event(&mut message.history, *node_id); - self.network_interface.send_message(*node_id, message) + log!( + "MessageEvent", + MessageEvent { + payload_id: payload_id.clone(), + step_id: self.state.step_id, + node_id: self.id, + event_type: MessageEventType::NetworkSent { to: *node_id } + } + ); + self.network_interface + .send_message(*node_id, message.clone()) } - self.message_cache - .cache_set(Self::sha256(&message.message), ()); + self.message_cache.cache_set(Self::sha256(&message.0), ()); } - fn receive(&mut self) -> Vec> { + fn receive(&mut self) -> Vec> { self.network_interface .receive_messages() .into_iter() // Retain only messages that have not been seen before .filter(|msg| { self.message_cache - .cache_set(Self::sha256(&msg.payload().message), ()) + .cache_set(Self::sha256(&msg.payload().0), ()) .is_none() }) .collect() @@ -265,14 +248,24 @@ impl BlendNode { hasher.finalize().into() } - fn schedule_persistent_transmission(&mut self, mut message: BlendMessage) { - self.record_persistent_scheduled_event(&mut message.history); + fn schedule_persistent_transmission(&mut self, message: SimMessage) { + log!( + "MessageEvent", + MessageEvent { + payload_id: Self::parse_payload(&message.0).id(), + step_id: self.state.step_id, + node_id: self.id, + event_type: MessageEventType::PersistentTransmissionScheduled { + index: self.state.cur_num_persistent_scheduled + } + } + ); self.persistent_sender.send(message).unwrap(); self.state.cur_num_persistent_scheduled += 1; } - fn handle_incoming_message(&mut self, message: BlendMessage) { - match self.crypto_processor.unwrap_message(&message.message) { + fn handle_incoming_message(&mut self, message: SimMessage) { + match self.crypto_processor.unwrap_message(&message.0) { Ok((unwrapped_message, fully_unwrapped)) => { let temporal_message = if fully_unwrapped { BlendOutgoingMessage::FullyUnwrapped(unwrapped_message) @@ -280,10 +273,7 @@ impl BlendNode { BlendOutgoingMessage::Outbound(unwrapped_message) }; - self.schedule_temporal_processor(BlendOutgoingMessageWithHistory { - outgoing_message: temporal_message, - history: message.history, - }); + self.schedule_temporal_processor(temporal_message); } Err(e) => { tracing::debug!("Failed to unwrap message: {:?}", e); @@ -291,8 +281,22 @@ impl BlendNode { } } - fn schedule_temporal_processor(&mut self, mut message: BlendOutgoingMessageWithHistory) { - self.record_temporal_scheduled_event(&mut message.history); + fn schedule_temporal_processor(&mut self, message: BlendOutgoingMessage) { + log!( + "MessageEvent", + MessageEvent { + payload_id: match &message { + BlendOutgoingMessage::Outbound(msg) => Self::parse_payload(msg).id(), + BlendOutgoingMessage::FullyUnwrapped(payload) => + Payload::load(payload.clone()).id(), + }, + step_id: self.state.step_id, + node_id: self.id, + event_type: MessageEventType::TemporalProcessorScheduled { + index: self.state.cur_num_temporal_scheduled + } + } + ); self.temporal_sender.send(message).unwrap(); self.state.cur_num_temporal_scheduled += 1; } @@ -306,76 +310,6 @@ impl BlendNode { self.epoch_update_sender.send(elapsed).unwrap(); self.slot_update_sender.send(elapsed).unwrap(); } - - fn log_message_fully_unwrapped(&self, payload: &Payload, history: MessageHistory) { - let total_duration = history.total_duration(); - log!( - "MessageFullyUnwrapped", - MessageWithHistoryLog { - message: MessageLog { - payload_id: payload.id(), - step_id: self.state.step_id, - node_id: self.id.index(), - }, - history, - total_duration, - } - ); - } - - fn new_blend_message(&self, message: Vec) -> BlendMessage { - BlendMessage::new(message, self.id, self.state.step_id, self.step_time) - } - - fn record_network_sent_event(&self, history: &mut MessageHistory, to: NodeId) { - self.record_message_event(history, MessageEventType::NetworkSent { to }); - } - - fn record_network_received_event(&self, history: &mut MessageHistory, from: NodeId) { - assert_eq!( - history.last_event_type(), - Some(&MessageEventType::NetworkSent { to: self.id }) - ); - self.record_message_event(history, MessageEventType::NetworkReceived { from }); - } - - fn record_persistent_scheduled_event(&self, history: &mut MessageHistory) { - self.record_message_event( - history, - MessageEventType::PersistentTransmissionScheduled { - index: self.state.cur_num_persistent_scheduled, - }, - ); - } - - fn record_persistent_released_event(&self, history: &mut MessageHistory) { - assert!(matches!( - history.last_event_type(), - Some(MessageEventType::PersistentTransmissionScheduled { .. }) - )); - self.record_message_event(history, MessageEventType::PersistentTransmissionReleased); - } - - fn record_temporal_scheduled_event(&self, history: &mut MessageHistory) { - self.record_message_event( - history, - MessageEventType::TemporalProcessorScheduled { - index: self.state.cur_num_temporal_scheduled, - }, - ); - } - - fn record_temporal_released_event(&self, history: &mut MessageHistory) { - assert!(matches!( - history.last_event_type(), - Some(MessageEventType::TemporalProcessorScheduled { .. }) - )); - self.record_message_event(history, MessageEventType::TemporalProcessorReleased); - } - - fn record_message_event(&self, history: &mut MessageHistory, event_type: MessageEventType) { - history.add(self.id, self.state.step_id, self.step_time, event_type); - } } impl Node for BlendNode { @@ -404,18 +338,34 @@ impl Node for BlendNode { .crypto_processor .wrap_message(payload.as_bytes()) .unwrap(); - self.schedule_persistent_transmission(self.new_blend_message(message)); + log!( + "MessageEvent", + MessageEvent { + payload_id: payload.id(), + step_id: self.state.step_id, + node_id: self.id, + event_type: MessageEventType::Created + } + ); + self.schedule_persistent_transmission(SimMessage(message)); } } // Handle incoming messages - for mut network_message in self.receive() { - self.record_network_received_event( - &mut network_message.payload.history, - network_message.from, + for network_message in self.receive() { + log!( + "MessageEvent", + MessageEvent { + payload_id: Self::parse_payload(&network_message.payload().0).id(), + step_id: self.state.step_id, + node_id: self.id, + event_type: MessageEventType::NetworkReceived { + from: network_message.from + } + } ); - if network_message.payload().is_drop() { + if MockBlendMessage::is_drop_message(&network_message.payload().0) { continue; } @@ -427,23 +377,40 @@ impl Node for BlendNode { } // Proceed temporal processor - if let Poll::Ready(Some(mut outgoing_msg_with_history)) = + if let Poll::Ready(Some(msg)) = pin!(&mut self.temporal_processor_messages).poll_next(&mut cx) { - self.record_temporal_released_event(&mut outgoing_msg_with_history.history); + log!( + "MessageEvent", + MessageEvent { + payload_id: match &msg { + BlendOutgoingMessage::Outbound(msg) => Self::parse_payload(msg).id(), + BlendOutgoingMessage::FullyUnwrapped(payload) => + Payload::load(payload.clone()).id(), + }, + step_id: self.state.step_id, + node_id: self.id, + event_type: MessageEventType::TemporalProcessorReleased + } + ); self.state.cur_num_temporal_scheduled -= 1; // Proceed the message - match outgoing_msg_with_history.outgoing_message { + match msg { BlendOutgoingMessage::Outbound(message) => { - self.schedule_persistent_transmission(BlendMessage { - message, - history: outgoing_msg_with_history.history, - }); + self.schedule_persistent_transmission(SimMessage(message)); } BlendOutgoingMessage::FullyUnwrapped(payload) => { let payload = Payload::load(payload); - self.log_message_fully_unwrapped(&payload, outgoing_msg_with_history.history); + log!( + "MessageEvent", + MessageEvent { + payload_id: payload.id(), + step_id: self.state.step_id, + node_id: self.id, + event_type: MessageEventType::FullyUnwrapped + } + ); self.state.num_messages_fully_unwrapped += 1; } } @@ -456,14 +423,31 @@ impl Node for BlendNode { .crypto_processor .wrap_message(payload.as_bytes()) .unwrap(); - self.schedule_persistent_transmission(self.new_blend_message(message)); + log!( + "MessageEvent", + MessageEvent { + payload_id: payload.id(), + step_id: self.state.step_id, + node_id: self.id, + event_type: MessageEventType::Created + } + ); + self.schedule_persistent_transmission(SimMessage(message)); } // Proceed persistent transmission - if let Poll::Ready(Some(mut msg)) = + if let Poll::Ready(Some(msg)) = pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx) { - self.record_persistent_released_event(&mut msg.history); + log!( + "MessageEvent", + MessageEvent { + payload_id: Self::parse_payload(&msg.0).id(), + step_id: self.state.step_id, + node_id: self.id, + event_type: MessageEventType::PersistentTransmissionReleased + } + ); self.state.cur_num_persistent_scheduled -= 1; self.forward(msg, None); } @@ -481,18 +465,3 @@ impl Node for BlendNode { } } } - -#[derive(Debug, Serialize)] -struct MessageLog { - payload_id: PayloadId, - step_id: usize, - node_id: usize, -} - -#[derive(Debug, Serialize)] -struct MessageWithHistoryLog { - message: MessageLog, - history: MessageHistory, - #[serde(serialize_with = "duration_as_millis")] - total_duration: Duration, -}