fix: use message_hash for network tracking

This commit is contained in:
Youngjoon Lee 2025-02-10 13:08:27 +09:00
parent 3444f0d380
commit 10c942b154
No known key found for this signature in database
GPG Key ID: 303963A54A81DD4D
5 changed files with 181 additions and 83 deletions

View File

@ -29,3 +29,4 @@ tracing-appender = "0.2"
cached = "0.54.0" cached = "0.54.0"
polars = "0.46.0" polars = "0.46.0"
humantime-serde = "1.1.1" humantime-serde = "1.1.1"
serde_with = { version = "3.12.0", features = ["hex"] }

View File

@ -14,6 +14,7 @@ use serde::{Deserialize, Serialize};
use crate::node::blend::log::TopicLog; use crate::node::blend::log::TopicLog;
use crate::node::blend::message::{MessageEvent, MessageEventType, PayloadId}; use crate::node::blend::message::{MessageEvent, MessageEventType, PayloadId};
use crate::node::blend::Sha256Hash;
pub fn analyze_latency(log_file: PathBuf, step_duration: Duration) -> Result<(), Box<dyn Error>> { pub fn analyze_latency(log_file: PathBuf, step_duration: Duration) -> Result<(), Box<dyn Error>> {
let output = Output { let output = Output {
@ -118,7 +119,7 @@ fn analyze_connection_latency(
let file = File::open(log_file)?; let file = File::open(log_file)?;
let reader = BufReader::new(file); let reader = BufReader::new(file);
let mut sent_events: HashMap<(PayloadId, NodeId, NodeId), usize> = HashMap::new(); let mut sent_events: HashMap<(Sha256Hash, NodeId, NodeId), usize> = HashMap::new();
let mut latencies_ms: Vec<i64> = Vec::new(); let mut latencies_ms: Vec<i64> = Vec::new();
for line in reader.lines() { for line in reader.lines() {
@ -127,14 +128,18 @@ fn analyze_connection_latency(
assert_eq!(topic_log.topic, "MessageEvent"); assert_eq!(topic_log.topic, "MessageEvent");
let event = topic_log.message; let event = topic_log.message;
match event.event_type { match event.event_type {
MessageEventType::NetworkSent { to } => { MessageEventType::NetworkSent { to, message_hash } => {
sent_events sent_events
.entry((event.payload_id, event.node_id, to)) .entry((message_hash, event.node_id, to))
.or_insert(event.step_id); .or_insert(event.step_id);
} }
MessageEventType::NetworkReceived { from } => { MessageEventType::NetworkReceived {
from,
message_hash,
duplicate: false,
} => {
if let Some(sent_step_id) = if let Some(sent_step_id) =
sent_events.remove(&(event.payload_id, from, event.node_id)) sent_events.remove(&(message_hash, from, event.node_id))
{ {
let latency = step_duration let latency = step_duration
.mul((event.step_id - sent_step_id).try_into().unwrap()) .mul((event.step_id - sent_step_id).try_into().unwrap())

View File

@ -2,8 +2,10 @@ use std::{
error::Error, error::Error,
fs::File, fs::File,
io::{BufRead, BufReader}, io::{BufRead, BufReader},
iter::Rev,
ops::{Add, Mul}, ops::{Add, Mul},
path::PathBuf, path::PathBuf,
slice::Iter,
time::Duration, time::Duration,
}; };
@ -24,80 +26,130 @@ pub fn analyze_message_history(
let reader = BufReader::new(file); let reader = BufReader::new(file);
let mut history = Vec::new(); let mut history = Vec::new();
let mut target_node_id: Option<NodeId> = None;
let mut target_event: Option<MessageEventType> = None;
let lines: Vec<String> = reader.lines().collect::<Result<_, _>>()?; let lines: Vec<String> = reader.lines().collect::<Result<_, _>>()?;
for line in lines.iter().rev() { let mut rev_iter = lines.iter().rev();
if let Ok(topic_log) = serde_json::from_str::<TopicLog<MessageEvent>>(line) {
assert_eq!(topic_log.topic, "MessageEvent"); let event = find_event(
let event = topic_log.message; &payload_id,
if event.payload_id == payload_id None,
&& (target_node_id.is_none() || target_node_id.unwrap() == event.node_id) |event_type| matches!(event_type, MessageEventType::FullyUnwrapped),
&& (target_event.is_none() || target_event.as_ref().unwrap() == &event.event_type) &mut rev_iter,
{ )
match event.event_type { .unwrap();
MessageEventType::FullyUnwrapped => { history.push(event);
assert!(history.is_empty()); loop {
assert!(target_node_id.is_none()); let last_event = history.last().unwrap();
target_node_id = Some(event.node_id); let event = match &last_event.event_type {
history.push(event); MessageEventType::Created => break,
} MessageEventType::PersistentTransmissionScheduled { .. } => find_event(
MessageEventType::Created => { &payload_id,
assert!(!history.is_empty()); Some(&last_event.node_id),
assert!(target_node_id.is_some()); |event_type| {
history.push(event); matches!(
} event_type,
MessageEventType::PersistentTransmissionScheduled { .. } => { MessageEventType::Created | MessageEventType::TemporalProcessorReleased
assert!(target_node_id.is_some()); )
assert!(matches!( },
history.last().unwrap().event_type, &mut rev_iter,
MessageEventType::PersistentTransmissionReleased { .. } )
)); .unwrap(),
history.push(event); MessageEventType::PersistentTransmissionReleased => find_event(
} &payload_id,
MessageEventType::PersistentTransmissionReleased => { Some(&last_event.node_id),
assert!(target_node_id.is_some()); |event_type| {
history.push(event); matches!(
} event_type,
MessageEventType::TemporalProcessorScheduled { .. } => { MessageEventType::PersistentTransmissionScheduled { .. }
assert!(target_node_id.is_some()); )
assert!(matches!( },
history.last().unwrap().event_type, &mut rev_iter,
MessageEventType::TemporalProcessorReleased { .. } )
)); .unwrap(),
history.push(event); MessageEventType::TemporalProcessorScheduled { .. } => find_event(
} &payload_id,
MessageEventType::TemporalProcessorReleased => { Some(&last_event.node_id),
assert!(target_node_id.is_some()); |event_type| {
history.push(event); matches!(
} event_type,
MessageEventType::NetworkReceived { from } => { MessageEventType::NetworkReceived {
if history.is_empty() { duplicate: false,
continue; ..
} }
assert!(target_node_id.is_some()); )
assert_ne!(target_node_id.unwrap(), from); },
target_node_id = Some(from); &mut rev_iter,
target_event = Some(MessageEventType::NetworkSent { to: event.node_id }); )
history.push(event); .unwrap(),
} MessageEventType::TemporalProcessorReleased => find_event(
MessageEventType::NetworkSent { .. } => { &payload_id,
if history.is_empty() { Some(&last_event.node_id),
continue; |event_type| {
matches!(
event_type,
MessageEventType::TemporalProcessorScheduled { .. }
)
},
&mut rev_iter,
)
.unwrap(),
MessageEventType::NetworkSent {
message_hash: target_message_hash,
..
} => find_event(
&payload_id,
Some(&last_event.node_id),
|event_type| match event_type {
MessageEventType::NetworkReceived {
message_hash,
duplicate: false,
..
} => message_hash == target_message_hash,
MessageEventType::PersistentTransmissionReleased => true,
_ => false,
},
&mut rev_iter,
)
.unwrap(),
MessageEventType::NetworkReceived {
from,
message_hash: target_message_hash,
duplicate: false,
} => {
let to_node = last_event.node_id;
match find_event(
&payload_id,
Some(from),
|event_type: &MessageEventType| match event_type {
MessageEventType::NetworkSent { to, message_hash } => {
to == &to_node && target_message_hash == message_hash
} }
assert!(target_node_id.is_some()); _ => false,
if target_event.is_none() },
|| target_event.as_ref().unwrap() != &event.event_type &mut rev_iter,
{ ) {
continue; Some(ev) => ev,
} None => {
target_event = None; panic!(
history.push(event); "No matching NetworkSent event found for NetworkReceived event: {:?}",
last_event
);
} }
} }
} }
} MessageEventType::FullyUnwrapped => find_event(
&payload_id,
Some(&last_event.node_id),
|event_type| matches!(event_type, MessageEventType::TemporalProcessorReleased),
&mut rev_iter,
)
.unwrap(),
event_type => {
panic!("Unexpected event type: {:?}", event_type);
}
};
history.push(event);
} }
let mut history_with_durations: Vec<MessageEventWithDuration> = Vec::new(); let mut history_with_durations: Vec<MessageEventWithDuration> = Vec::new();
@ -125,6 +177,30 @@ pub fn analyze_message_history(
Ok(()) Ok(())
} }
fn find_event<F>(
payload_id: &PayloadId,
node_id: Option<&NodeId>,
match_event: F,
rev_iter: &mut Rev<Iter<'_, String>>,
) -> Option<MessageEvent>
where
F: Fn(&MessageEventType) -> bool,
{
for line in rev_iter {
if let Ok(topic_log) = serde_json::from_str::<TopicLog<MessageEvent>>(line) {
assert_eq!(topic_log.topic, "MessageEvent");
let event = topic_log.message;
if &event.payload_id == payload_id
&& node_id.map_or(true, |node_id| &event.node_id == node_id)
&& match_event(&event.event_type)
{
return Some(event);
}
}
}
None
}
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct Output { struct Output {
history: Vec<MessageEventWithDuration>, history: Vec<MessageEventWithDuration>,

View File

@ -1,8 +1,11 @@
use netrunner::node::NodeId; use netrunner::node::NodeId;
use serde::Deserialize; use serde::Deserialize;
use serde::Serialize; use serde::Serialize;
use serde_with::{hex::Hex, serde_as};
use uuid::Uuid; use uuid::Uuid;
use super::Sha256Hash;
pub type PayloadId = String; pub type PayloadId = String;
pub struct Payload(Uuid); pub struct Payload(Uuid);
@ -35,6 +38,7 @@ pub struct MessageEvent {
pub event_type: MessageEventType, pub event_type: MessageEventType,
} }
#[serde_as]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum MessageEventType { pub enum MessageEventType {
Created, Created,
@ -49,10 +53,15 @@ pub enum MessageEventType {
NetworkSent { NetworkSent {
#[serde(with = "node_id_serde")] #[serde(with = "node_id_serde")]
to: NodeId, to: NodeId,
#[serde_as(as = "Hex")]
message_hash: Sha256Hash,
}, },
NetworkReceived { NetworkReceived {
#[serde(with = "node_id_serde")] #[serde(with = "node_id_serde")]
from: NodeId, from: NodeId,
#[serde_as(as = "Hex")]
message_hash: Sha256Hash,
duplicate: bool,
}, },
FullyUnwrapped, FullyUnwrapped,
} }

View File

@ -73,7 +73,7 @@ pub struct BlendnodeSettings {
>, >,
} }
type Sha256Hash = [u8; 32]; pub type Sha256Hash = [u8; 32];
/// This node implementation only used for testing different streaming implementation purposes. /// This node implementation only used for testing different streaming implementation purposes.
pub struct BlendNode { pub struct BlendNode {
@ -207,6 +207,9 @@ impl BlendNode {
} }
fn forward(&mut self, message: SimMessage, exclude_node: Option<NodeId>) { fn forward(&mut self, message: SimMessage, exclude_node: Option<NodeId>) {
let message_hash = Self::sha256(&message.0);
self.message_cache.cache_set(message_hash, ());
let payload_id = Self::parse_payload(&message.0).id(); let payload_id = Self::parse_payload(&message.0).id();
for node_id in self for node_id in self
.settings .settings
@ -220,35 +223,39 @@ impl BlendNode {
payload_id: payload_id.clone(), payload_id: payload_id.clone(),
step_id: self.state.step_id, step_id: self.state.step_id,
node_id: self.id, node_id: self.id,
event_type: MessageEventType::NetworkSent { to: *node_id } event_type: MessageEventType::NetworkSent {
to: *node_id,
message_hash
}
} }
); );
self.network_interface self.network_interface
.send_message(*node_id, message.clone()) .send_message(*node_id, message.clone())
} }
self.message_cache.cache_set(Self::sha256(&message.0), ());
} }
fn receive(&mut self) -> Vec<NetworkMessage<SimMessage>> { fn receive(&mut self) -> Vec<NetworkMessage<SimMessage>> {
self.network_interface self.network_interface
.receive_messages() .receive_messages()
.into_iter() .into_iter()
.inspect(|msg| { // Retain only messages that have not been seen before
.filter(|msg| {
let message_hash = Self::sha256(&msg.payload().0);
let duplicate = self.message_cache.cache_set(message_hash, ()).is_some();
log!( log!(
"MessageEvent", "MessageEvent",
MessageEvent { MessageEvent {
payload_id: Self::parse_payload(&msg.payload().0).id(), payload_id: Self::parse_payload(&msg.payload().0).id(),
step_id: self.state.step_id, step_id: self.state.step_id,
node_id: self.id, node_id: self.id,
event_type: MessageEventType::NetworkReceived { from: msg.from } event_type: MessageEventType::NetworkReceived {
from: msg.from,
message_hash,
duplicate,
}
} }
); );
}) !duplicate
// Retain only messages that have not been seen before
.filter(|msg| {
self.message_cache
.cache_set(Self::sha256(&msg.payload().0), ())
.is_none()
}) })
.collect() .collect()
} }