This commit is contained in:
Youngjoon Lee 2025-02-06 13:39:38 +09:00
parent 551ba09900
commit 2aa1fecab2
No known key found for this signature in database
GPG Key ID: D94003D91DE12141
2 changed files with 135 additions and 148 deletions

View File

@ -1,3 +1,8 @@
use std::{ops::Mul, time::Duration};
use netrunner::node::serialize_node_id_as_index;
use netrunner::node::NodeId;
use serde::Serialize;
use uuid::Uuid; use uuid::Uuid;
pub type PayloadId = String; pub type PayloadId = String;
@ -23,6 +28,79 @@ impl Payload {
} }
} }
#[derive(Debug, Clone, Serialize)]
pub struct MessageHistory(Vec<MessageEvent>);
impl MessageHistory {
pub fn new() -> Self {
Self(Vec::new())
}
pub fn add(
&mut self,
node_id: NodeId,
step_id: usize,
step_time: Duration,
event_type: MessageEventType,
) {
let duration_from_prev = self.0.last().map_or(Duration::ZERO, |prev_event| {
step_time.mul((step_id - prev_event.step_id).try_into().unwrap())
});
self.0.push(MessageEvent {
node_id,
step_id,
duration_from_prev,
event_type,
});
}
pub fn last_event_type(&self) -> Option<&MessageEventType> {
self.0.last().map(|event| &event.event_type)
}
pub fn total_duration(&self) -> Duration {
self.0.iter().map(|event| event.duration_from_prev).sum()
}
}
#[derive(Debug, Clone, Serialize)]
struct MessageEvent {
#[serde(serialize_with = "serialize_node_id_as_index")]
node_id: NodeId,
step_id: usize,
#[serde(serialize_with = "duration_as_millis")]
duration_from_prev: Duration,
event_type: MessageEventType,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub enum MessageEventType {
Created,
PersistentTransmissionScheduled {
index: usize,
},
PersistentTransmissionReleased,
TemporalProcessorScheduled {
index: usize,
},
TemporalProcessorReleased,
NetworkSent {
#[serde(serialize_with = "serialize_node_id_as_index")]
to: NodeId,
},
NetworkReceived {
#[serde(serialize_with = "serialize_node_id_as_index")]
from: NodeId,
},
}
pub fn duration_as_millis<S>(duration: &Duration, s: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
s.serialize_u64(duration.as_millis().try_into().unwrap())
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::Payload; use super::Payload;

View File

@ -13,9 +13,9 @@ use cached::{Cached, TimedCache};
use crossbeam::channel; use crossbeam::channel;
use futures::Stream; use futures::Stream;
use lottery::StakeLottery; use lottery::StakeLottery;
use message::{Payload, PayloadId}; use message::{duration_as_millis, MessageEventType, MessageHistory, Payload, PayloadId};
use netrunner::network::NetworkMessage; use netrunner::network::NetworkMessage;
use netrunner::node::{serialize_node_id_as_index, Node, NodeId, NodeIdExt}; use netrunner::node::{Node, NodeId, NodeIdExt};
use netrunner::{ use netrunner::{
network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize}, network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize},
warding::WardCondition, warding::WardCondition,
@ -37,28 +37,26 @@ use scheduler::{Interval, TemporalScheduler};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use state::BlendnodeState; use state::BlendnodeState;
use std::ops::Mul;
use std::{pin::pin, task::Poll, time::Duration}; use std::{pin::pin, task::Poll, time::Duration};
use stream_wrapper::CrossbeamReceiverStream; use stream_wrapper::CrossbeamReceiverStream;
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct BlendMessage { pub struct BlendMessage {
message: Vec<u8>, message: Vec<u8>,
history: Vec<MessageEvent>, history: MessageHistory,
} }
impl BlendMessage { impl BlendMessage {
fn new(message: Vec<u8>, node_id: NodeId, step_id: usize) -> Self { fn new(message: Vec<u8>, node_id: NodeId, step_id: usize, step_time: Duration) -> Self {
Self { let mut history = MessageHistory::new();
message, history.add(node_id, step_id, step_time, MessageEventType::Created);
history: vec![MessageEvent::Created { node_id, step_id }], Self { message, history }
}
} }
fn new_drop() -> Self { fn new_drop() -> Self {
Self { Self {
message: Vec::new(), message: Vec::new(),
history: vec![], history: MessageHistory::new(),
} }
} }
@ -73,60 +71,9 @@ impl PayloadSize for BlendMessage {
} }
} }
#[derive(Debug, Clone, Serialize)]
enum MessageEvent {
Created {
#[serde(serialize_with = "serialize_node_id_as_index")]
node_id: NodeId,
step_id: usize,
},
PersistentTransmissionScheduled {
#[serde(serialize_with = "serialize_node_id_as_index")]
node_id: NodeId,
step_id: usize,
index: usize,
},
PersistentTransmissionReleased {
#[serde(serialize_with = "serialize_node_id_as_index")]
node_id: NodeId,
step_id: usize,
#[serde(serialize_with = "serialize_duration_as_millis")]
duration: Duration,
},
TemporalProcessorScheduled {
#[serde(serialize_with = "serialize_node_id_as_index")]
node_id: NodeId,
step_id: usize,
index: usize,
},
TemporalProcessorReleased {
#[serde(serialize_with = "serialize_node_id_as_index")]
node_id: NodeId,
step_id: usize,
#[serde(serialize_with = "serialize_duration_as_millis")]
duration: Duration,
},
NetworkSent {
#[serde(serialize_with = "serialize_node_id_as_index")]
from_node_id: NodeId,
#[serde(serialize_with = "serialize_node_id_as_index")]
to_node_id: NodeId,
step_id: usize,
},
NetworkReceived {
#[serde(serialize_with = "serialize_node_id_as_index")]
from_node_id: NodeId,
#[serde(serialize_with = "serialize_node_id_as_index")]
to_node_id: NodeId,
step_id: usize,
#[serde(serialize_with = "serialize_duration_as_millis")]
latency: Duration,
},
}
struct BlendOutgoingMessageWithHistory { struct BlendOutgoingMessageWithHistory {
outgoing_message: BlendOutgoingMessage, outgoing_message: BlendOutgoingMessage,
history: Vec<MessageEvent>, history: MessageHistory,
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@ -356,7 +303,8 @@ impl BlendNode {
self.slot_update_sender.send(elapsed).unwrap(); self.slot_update_sender.send(elapsed).unwrap();
} }
fn log_message_fully_unwrapped(&self, payload: &Payload, history: Vec<MessageEvent>) { fn log_message_fully_unwrapped(&self, payload: &Payload, history: MessageHistory) {
let total_duration = history.total_duration();
log!( log!(
"MessageFullyUnwrapped", "MessageFullyUnwrapped",
MessageWithHistoryLog { MessageWithHistoryLog {
@ -366,89 +314,63 @@ impl BlendNode {
node_id: self.id.index(), node_id: self.id.index(),
}, },
history, history,
total_duration,
} }
); );
} }
fn record_network_sent_event(&self, history: &mut Vec<MessageEvent>, to: NodeId) { fn new_blend_message(&self, message: Vec<u8>) -> BlendMessage {
history.push(MessageEvent::NetworkSent { BlendMessage::new(message, self.id, self.state.step_id, self.step_time)
from_node_id: self.id,
to_node_id: to,
step_id: self.state.step_id,
})
} }
fn record_network_received_event(&self, history: &mut Vec<MessageEvent>, from: NodeId) { fn record_network_sent_event(&self, history: &mut MessageHistory, to: NodeId) {
match history.last().unwrap() { self.record_message_event(history, MessageEventType::NetworkSent { to });
MessageEvent::NetworkSent {
from_node_id,
to_node_id,
step_id,
} => {
assert_eq!(*from_node_id, from);
assert_eq!(*to_node_id, self.id);
history.push(MessageEvent::NetworkReceived {
from_node_id: from,
to_node_id: self.id,
step_id: self.state.step_id,
latency: self.duration_between(*step_id, self.state.step_id),
});
}
event => panic!("Unexpected message event: {:?}", event),
}
} }
fn record_persistent_scheduled_event(&mut self, history: &mut Vec<MessageEvent>) { fn record_network_received_event(&self, history: &mut MessageHistory, from: NodeId) {
history.push(MessageEvent::PersistentTransmissionScheduled { assert_eq!(
node_id: self.id, history.last_event_type(),
step_id: self.state.step_id, Some(&MessageEventType::NetworkSent { to: self.id })
index: self.state.cur_num_persistent_scheduled, );
}) self.record_message_event(history, MessageEventType::NetworkReceived { from });
} }
fn record_persistent_released_event(&mut self, history: &mut Vec<MessageEvent>) { fn record_persistent_scheduled_event(&self, history: &mut MessageHistory) {
match history.last().unwrap() { self.record_message_event(
MessageEvent::PersistentTransmissionScheduled { history,
node_id, step_id, .. MessageEventType::PersistentTransmissionScheduled {
} => { index: self.state.cur_num_persistent_scheduled,
assert_eq!(*node_id, self.id); },
history.push(MessageEvent::PersistentTransmissionReleased { );
node_id: self.id,
step_id: self.state.step_id,
duration: self.duration_between(*step_id, self.state.step_id),
});
}
event => panic!("Unexpected message event: {:?}", event),
}
} }
fn record_temporal_scheduled_event(&mut self, history: &mut Vec<MessageEvent>) { fn record_persistent_released_event(&self, history: &mut MessageHistory) {
history.push(MessageEvent::TemporalProcessorScheduled { assert!(matches!(
node_id: self.id, history.last_event_type(),
step_id: self.state.step_id, Some(MessageEventType::PersistentTransmissionScheduled { .. })
index: self.state.cur_num_temporal_scheduled, ));
}) self.record_message_event(history, MessageEventType::PersistentTransmissionReleased);
} }
fn record_temporal_released_event(&mut self, history: &mut Vec<MessageEvent>) { fn record_temporal_scheduled_event(&self, history: &mut MessageHistory) {
match history.last().unwrap() { self.record_message_event(
MessageEvent::TemporalProcessorScheduled { history,
node_id, step_id, .. MessageEventType::TemporalProcessorScheduled {
} => { index: self.state.cur_num_temporal_scheduled,
assert_eq!(*node_id, self.id); },
history.push(MessageEvent::TemporalProcessorReleased { );
node_id: self.id,
step_id: self.state.step_id,
duration: self.duration_between(*step_id, self.state.step_id),
});
}
event => panic!("Unexpected message event: {:?}", event),
}
} }
fn duration_between(&self, from_step: usize, to_step: usize) -> Duration { fn record_temporal_released_event(&self, history: &mut MessageHistory) {
self.step_time assert!(matches!(
.mul((to_step - from_step).try_into().unwrap()) history.last_event_type(),
Some(MessageEventType::TemporalProcessorScheduled { .. })
));
self.record_message_event(history, MessageEventType::TemporalProcessorReleased);
}
fn record_message_event(&self, history: &mut MessageHistory, event_type: MessageEventType) {
history.add(self.id, self.state.step_id, self.step_time, event_type);
} }
} }
@ -478,11 +400,7 @@ impl Node for BlendNode {
.crypto_processor .crypto_processor
.wrap_message(payload.as_bytes()) .wrap_message(payload.as_bytes())
.unwrap(); .unwrap();
self.schedule_persistent_transmission(BlendMessage::new( self.schedule_persistent_transmission(self.new_blend_message(message));
message,
self.id,
self.state.step_id,
));
} }
} }
@ -534,11 +452,7 @@ impl Node for BlendNode {
.crypto_processor .crypto_processor
.wrap_message(payload.as_bytes()) .wrap_message(payload.as_bytes())
.unwrap(); .unwrap();
self.schedule_persistent_transmission(BlendMessage::new( self.schedule_persistent_transmission(self.new_blend_message(message));
message,
self.id,
self.state.step_id,
));
} }
// Proceed persistent transmission // Proceed persistent transmission
@ -574,12 +488,7 @@ struct MessageLog {
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
struct MessageWithHistoryLog { struct MessageWithHistoryLog {
message: MessageLog, message: MessageLog,
history: Vec<MessageEvent>, history: MessageHistory,
} #[serde(serialize_with = "duration_as_millis")]
total_duration: Duration,
pub fn serialize_duration_as_millis<S>(duration: &Duration, s: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
s.serialize_u64(duration.as_millis().try_into().unwrap())
} }