From 10c942b154ddcf3a1c17df2ab78ad8d4d84c5c96 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Mon, 10 Feb 2025 13:08:27 +0900 Subject: [PATCH] fix: use message_hash for network tracking --- simlib/blendnet-sims/Cargo.toml | 1 + simlib/blendnet-sims/src/analysis/latency.rs | 15 +- .../src/analysis/message_history.rs | 210 ++++++++++++------ .../blendnet-sims/src/node/blend/message.rs | 9 + simlib/blendnet-sims/src/node/blend/mod.rs | 29 ++- 5 files changed, 181 insertions(+), 83 deletions(-) diff --git a/simlib/blendnet-sims/Cargo.toml b/simlib/blendnet-sims/Cargo.toml index 6c3e8a8..cde7ce9 100644 --- a/simlib/blendnet-sims/Cargo.toml +++ b/simlib/blendnet-sims/Cargo.toml @@ -29,3 +29,4 @@ tracing-appender = "0.2" cached = "0.54.0" polars = "0.46.0" humantime-serde = "1.1.1" +serde_with = { version = "3.12.0", features = ["hex"] } diff --git a/simlib/blendnet-sims/src/analysis/latency.rs b/simlib/blendnet-sims/src/analysis/latency.rs index 51ba261..53b07d7 100644 --- a/simlib/blendnet-sims/src/analysis/latency.rs +++ b/simlib/blendnet-sims/src/analysis/latency.rs @@ -14,6 +14,7 @@ use serde::{Deserialize, Serialize}; use crate::node::blend::log::TopicLog; use crate::node::blend::message::{MessageEvent, MessageEventType, PayloadId}; +use crate::node::blend::Sha256Hash; pub fn analyze_latency(log_file: PathBuf, step_duration: Duration) -> Result<(), Box> { let output = Output { @@ -118,7 +119,7 @@ fn analyze_connection_latency( let file = File::open(log_file)?; let reader = BufReader::new(file); - let mut sent_events: HashMap<(PayloadId, NodeId, NodeId), usize> = HashMap::new(); + let mut sent_events: HashMap<(Sha256Hash, NodeId, NodeId), usize> = HashMap::new(); let mut latencies_ms: Vec = Vec::new(); for line in reader.lines() { @@ -127,14 +128,18 @@ fn analyze_connection_latency( assert_eq!(topic_log.topic, "MessageEvent"); let event = topic_log.message; match event.event_type { - MessageEventType::NetworkSent { to } => { + MessageEventType::NetworkSent { to, message_hash } => { sent_events - .entry((event.payload_id, event.node_id, to)) + .entry((message_hash, event.node_id, to)) .or_insert(event.step_id); } - MessageEventType::NetworkReceived { from } => { + MessageEventType::NetworkReceived { + from, + message_hash, + duplicate: false, + } => { if let Some(sent_step_id) = - sent_events.remove(&(event.payload_id, from, event.node_id)) + sent_events.remove(&(message_hash, from, event.node_id)) { let latency = step_duration .mul((event.step_id - sent_step_id).try_into().unwrap()) diff --git a/simlib/blendnet-sims/src/analysis/message_history.rs b/simlib/blendnet-sims/src/analysis/message_history.rs index 17a51ec..3f51a7e 100644 --- a/simlib/blendnet-sims/src/analysis/message_history.rs +++ b/simlib/blendnet-sims/src/analysis/message_history.rs @@ -2,8 +2,10 @@ use std::{ error::Error, fs::File, io::{BufRead, BufReader}, + iter::Rev, ops::{Add, Mul}, path::PathBuf, + slice::Iter, time::Duration, }; @@ -24,80 +26,130 @@ pub fn analyze_message_history( let reader = BufReader::new(file); let mut history = Vec::new(); - let mut target_node_id: Option = None; - let mut target_event: Option = None; let lines: Vec = reader.lines().collect::>()?; - for line in lines.iter().rev() { - if let Ok(topic_log) = serde_json::from_str::>(line) { - assert_eq!(topic_log.topic, "MessageEvent"); - let event = topic_log.message; - if event.payload_id == payload_id - && (target_node_id.is_none() || target_node_id.unwrap() == event.node_id) - && (target_event.is_none() || target_event.as_ref().unwrap() == &event.event_type) - { - match event.event_type { - MessageEventType::FullyUnwrapped => { - assert!(history.is_empty()); - assert!(target_node_id.is_none()); - target_node_id = Some(event.node_id); - history.push(event); - } - MessageEventType::Created => { - assert!(!history.is_empty()); - assert!(target_node_id.is_some()); - history.push(event); - } - MessageEventType::PersistentTransmissionScheduled { .. } => { - assert!(target_node_id.is_some()); - assert!(matches!( - history.last().unwrap().event_type, - MessageEventType::PersistentTransmissionReleased { .. } - )); - history.push(event); - } - MessageEventType::PersistentTransmissionReleased => { - assert!(target_node_id.is_some()); - history.push(event); - } - MessageEventType::TemporalProcessorScheduled { .. } => { - assert!(target_node_id.is_some()); - assert!(matches!( - history.last().unwrap().event_type, - MessageEventType::TemporalProcessorReleased { .. } - )); - history.push(event); - } - MessageEventType::TemporalProcessorReleased => { - assert!(target_node_id.is_some()); - history.push(event); - } - MessageEventType::NetworkReceived { from } => { - if history.is_empty() { - continue; + let mut rev_iter = lines.iter().rev(); + + let event = find_event( + &payload_id, + None, + |event_type| matches!(event_type, MessageEventType::FullyUnwrapped), + &mut rev_iter, + ) + .unwrap(); + history.push(event); + loop { + let last_event = history.last().unwrap(); + let event = match &last_event.event_type { + MessageEventType::Created => break, + MessageEventType::PersistentTransmissionScheduled { .. } => find_event( + &payload_id, + Some(&last_event.node_id), + |event_type| { + matches!( + event_type, + MessageEventType::Created | MessageEventType::TemporalProcessorReleased + ) + }, + &mut rev_iter, + ) + .unwrap(), + MessageEventType::PersistentTransmissionReleased => find_event( + &payload_id, + Some(&last_event.node_id), + |event_type| { + matches!( + event_type, + MessageEventType::PersistentTransmissionScheduled { .. } + ) + }, + &mut rev_iter, + ) + .unwrap(), + MessageEventType::TemporalProcessorScheduled { .. } => find_event( + &payload_id, + Some(&last_event.node_id), + |event_type| { + matches!( + event_type, + MessageEventType::NetworkReceived { + duplicate: false, + .. } - assert!(target_node_id.is_some()); - assert_ne!(target_node_id.unwrap(), from); - target_node_id = Some(from); - target_event = Some(MessageEventType::NetworkSent { to: event.node_id }); - history.push(event); - } - MessageEventType::NetworkSent { .. } => { - if history.is_empty() { - continue; + ) + }, + &mut rev_iter, + ) + .unwrap(), + MessageEventType::TemporalProcessorReleased => find_event( + &payload_id, + Some(&last_event.node_id), + |event_type| { + matches!( + event_type, + MessageEventType::TemporalProcessorScheduled { .. } + ) + }, + &mut rev_iter, + ) + .unwrap(), + MessageEventType::NetworkSent { + message_hash: target_message_hash, + .. + } => find_event( + &payload_id, + Some(&last_event.node_id), + |event_type| match event_type { + MessageEventType::NetworkReceived { + message_hash, + duplicate: false, + .. + } => message_hash == target_message_hash, + MessageEventType::PersistentTransmissionReleased => true, + _ => false, + }, + &mut rev_iter, + ) + .unwrap(), + MessageEventType::NetworkReceived { + from, + message_hash: target_message_hash, + duplicate: false, + } => { + let to_node = last_event.node_id; + match find_event( + &payload_id, + Some(from), + |event_type: &MessageEventType| match event_type { + MessageEventType::NetworkSent { to, message_hash } => { + to == &to_node && target_message_hash == message_hash } - assert!(target_node_id.is_some()); - if target_event.is_none() - || target_event.as_ref().unwrap() != &event.event_type - { - continue; - } - target_event = None; - history.push(event); + _ => false, + }, + &mut rev_iter, + ) { + Some(ev) => ev, + None => { + panic!( + "No matching NetworkSent event found for NetworkReceived event: {:?}", + last_event + ); } } } - } + MessageEventType::FullyUnwrapped => find_event( + &payload_id, + Some(&last_event.node_id), + |event_type| matches!(event_type, MessageEventType::TemporalProcessorReleased), + &mut rev_iter, + ) + .unwrap(), + event_type => { + panic!("Unexpected event type: {:?}", event_type); + } + }; + + history.push(event); } let mut history_with_durations: Vec = Vec::new(); @@ -125,6 +177,30 @@ pub fn analyze_message_history( Ok(()) } +fn find_event( + payload_id: &PayloadId, + node_id: Option<&NodeId>, + match_event: F, + rev_iter: &mut Rev>, +) -> Option +where + F: Fn(&MessageEventType) -> bool, +{ + for line in rev_iter { + if let Ok(topic_log) = serde_json::from_str::>(line) { + assert_eq!(topic_log.topic, "MessageEvent"); + let event = topic_log.message; + if &event.payload_id == payload_id + && node_id.map_or(true, |node_id| &event.node_id == node_id) + && match_event(&event.event_type) + { + return Some(event); + } + } + } + None +} + #[derive(Serialize, Deserialize)] struct Output { history: Vec, diff --git a/simlib/blendnet-sims/src/node/blend/message.rs b/simlib/blendnet-sims/src/node/blend/message.rs index fb760f3..762ebcf 100644 --- a/simlib/blendnet-sims/src/node/blend/message.rs +++ b/simlib/blendnet-sims/src/node/blend/message.rs @@ -1,8 +1,11 @@ use netrunner::node::NodeId; use serde::Deserialize; use serde::Serialize; +use serde_with::{hex::Hex, serde_as}; use uuid::Uuid; +use super::Sha256Hash; + pub type PayloadId = String; pub struct Payload(Uuid); @@ -35,6 +38,7 @@ pub struct MessageEvent { pub event_type: MessageEventType, } +#[serde_as] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum MessageEventType { Created, @@ -49,10 +53,15 @@ pub enum MessageEventType { NetworkSent { #[serde(with = "node_id_serde")] to: NodeId, + #[serde_as(as = "Hex")] + message_hash: Sha256Hash, }, NetworkReceived { #[serde(with = "node_id_serde")] from: NodeId, + #[serde_as(as = "Hex")] + message_hash: Sha256Hash, + duplicate: bool, }, FullyUnwrapped, } diff --git a/simlib/blendnet-sims/src/node/blend/mod.rs b/simlib/blendnet-sims/src/node/blend/mod.rs index d6c204b..e3b1043 100644 --- a/simlib/blendnet-sims/src/node/blend/mod.rs +++ b/simlib/blendnet-sims/src/node/blend/mod.rs @@ -73,7 +73,7 @@ pub struct BlendnodeSettings { >, } -type Sha256Hash = [u8; 32]; +pub type Sha256Hash = [u8; 32]; /// This node implementation only used for testing different streaming implementation purposes. pub struct BlendNode { @@ -207,6 +207,9 @@ impl BlendNode { } fn forward(&mut self, message: SimMessage, exclude_node: Option) { + let message_hash = Self::sha256(&message.0); + self.message_cache.cache_set(message_hash, ()); + let payload_id = Self::parse_payload(&message.0).id(); for node_id in self .settings @@ -220,35 +223,39 @@ impl BlendNode { payload_id: payload_id.clone(), step_id: self.state.step_id, node_id: self.id, - event_type: MessageEventType::NetworkSent { to: *node_id } + event_type: MessageEventType::NetworkSent { + to: *node_id, + message_hash + } } ); self.network_interface .send_message(*node_id, message.clone()) } - self.message_cache.cache_set(Self::sha256(&message.0), ()); } fn receive(&mut self) -> Vec> { self.network_interface .receive_messages() .into_iter() - .inspect(|msg| { + // Retain only messages that have not been seen before + .filter(|msg| { + let message_hash = Self::sha256(&msg.payload().0); + let duplicate = self.message_cache.cache_set(message_hash, ()).is_some(); log!( "MessageEvent", MessageEvent { payload_id: Self::parse_payload(&msg.payload().0).id(), step_id: self.state.step_id, node_id: self.id, - event_type: MessageEventType::NetworkReceived { from: msg.from } + event_type: MessageEventType::NetworkReceived { + from: msg.from, + message_hash, + duplicate, + } } ); - }) - // Retain only messages that have not been seen before - .filter(|msg| { - self.message_cache - .cache_set(Self::sha256(&msg.payload().0), ()) - .is_none() + !duplicate }) .collect() }