diff --git a/simlib/blendnet-sims/src/main.rs b/simlib/blendnet-sims/src/main.rs index bb122eb..9cc7dda 100644 --- a/simlib/blendnet-sims/src/main.rs +++ b/simlib/blendnet-sims/src/main.rs @@ -191,6 +191,7 @@ fn create_boxed_blendnode( Box::new(BlendNode::new( node_id, blendnode_settings, + simulation_settings.step_time, network_interface, regions_data, )) diff --git a/simlib/blendnet-sims/src/node/blend/mod.rs b/simlib/blendnet-sims/src/node/blend/mod.rs index f16091b..e2e026a 100644 --- a/simlib/blendnet-sims/src/node/blend/mod.rs +++ b/simlib/blendnet-sims/src/node/blend/mod.rs @@ -39,20 +39,21 @@ use scheduler::{Interval, TemporalScheduler}; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use state::BlendnodeState; +use std::ops::Mul; use std::{pin::pin, task::Poll, time::Duration}; use stream_wrapper::CrossbeamReceiverStream; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct BlendMessage { message: Vec, - network_route: Vec<(NodeId, Duration)>, + tracks: Vec, } impl BlendMessage { - pub fn new(message: Vec, origin: NodeId) -> Self { + pub fn new(message: Vec) -> Self { Self { message, - network_route: vec![(origin, Duration::ZERO)], + tracks: Vec::new(), } } } @@ -63,9 +64,48 @@ impl PayloadSize for BlendMessage { } } +#[derive(Debug, Clone, Serialize, Deserialize)] +enum MessageTrack { + PersistentTransmissionScheduled { + #[serde(with = "node_id_serde")] + node_id: NodeId, + step_id: usize, + index: usize, + }, + PersistentTransmissionReleased { + #[serde(with = "node_id_serde")] + node_id: NodeId, + step_id: usize, + #[serde(with = "duration_ms_serde")] + duration: Duration, + }, + TemporalProcessorScheduled { + #[serde(with = "node_id_serde")] + node_id: NodeId, + step_id: usize, + index: usize, + }, + TemporalProcessorReleased { + #[serde(with = "node_id_serde")] + node_id: NodeId, + step_id: usize, + #[serde(with = "duration_ms_serde")] + duration: Duration, + }, + NetworkReceived { + #[serde(with = "node_id_serde")] + from_node_id: NodeId, + #[serde(with = "node_id_serde")] + to_node_id: NodeId, + #[serde(with = "duration_ms_serde")] + delay: Duration, + step_id: usize, + }, +} + struct BlendOutgoingMessageWithRoute { outgoing_message: BlendOutgoingMessage, - network_route: Vec<(NodeId, Duration)>, + tracks: Vec, } #[derive(Deserialize)] @@ -89,6 +129,7 @@ pub struct BlendNode { id: NodeId, state: BlendnodeState, settings: BlendnodeSettings, + step_time: Duration, network_interface: InMemoryNetworkInterface, regions_data: RegionsData, message_cache: TimedCache, @@ -105,11 +146,15 @@ pub struct BlendNode { MockBlendMessage, Interval, >, + num_persistent_transmission_scheduled: usize, + crypto_processor: CryptographicProcessor, temporal_sender: channel::Sender, temporal_update_time_sender: channel::Sender, temporal_processor_messages: TemporalStream, TemporalScheduler>, + num_temporal_processor_scheduled: usize, + epoch_update_sender: channel::Sender, slot_update_sender: channel::Sender, cover_traffic: CoverTraffic, @@ -119,6 +164,7 @@ impl BlendNode { pub fn new( id: NodeId, settings: BlendnodeSettings, + step_time: Duration, network_interface: InMemoryNetworkInterface, regions_data: RegionsData, ) -> Self { @@ -197,6 +243,7 @@ impl BlendNode { Self { id, + step_time, network_interface, regions_data, // We're not coupling this lifespan with the steps now, but it's okay @@ -214,10 +261,12 @@ impl BlendNode { persistent_sender, persistent_update_time_sender, persistent_transmission_messages, + num_persistent_transmission_scheduled: 0, crypto_processor, temporal_sender, temporal_update_time_sender, temporal_processor_messages, + num_temporal_processor_scheduled: 0, epoch_update_sender, slot_update_sender, cover_traffic, @@ -268,14 +317,22 @@ impl BlendNode { hasher.finalize().into() } - fn schedule_persistent_transmission(&mut self, message: BlendMessage) { + fn schedule_persistent_transmission(&mut self, mut message: BlendMessage) { self.log_message( "PersistentTransmissionScheduled", &Self::parse_payload(&message.message), ); + message + .tracks + .push(MessageTrack::PersistentTransmissionScheduled { + node_id: self.id, + step_id: self.state.step_id, + index: self.num_persistent_transmission_scheduled, + }); self.persistent_sender .send(bincode::serialize(&message).unwrap()) .unwrap(); + self.num_persistent_transmission_scheduled += 1; } fn handle_incoming_message(&mut self, message: BlendMessage) { @@ -289,7 +346,7 @@ impl BlendNode { self.schedule_temporal_processor(BlendOutgoingMessageWithRoute { outgoing_message: temporal_message, - network_route: message.network_route, + tracks: message.tracks, }); } Err(e) => { @@ -298,13 +355,21 @@ impl BlendNode { } } - fn schedule_temporal_processor(&mut self, message: BlendOutgoingMessageWithRoute) { + fn schedule_temporal_processor(&mut self, mut message: BlendOutgoingMessageWithRoute) { let payload = match &message.outgoing_message { BlendOutgoingMessage::FullyUnwrapped(payload) => Payload::load(payload.clone()), BlendOutgoingMessage::Outbound(msg) => Self::parse_payload(msg), }; self.log_message("TemporalProcessorScheduled", &payload); + message + .tracks + .push(MessageTrack::TemporalProcessorScheduled { + node_id: self.id, + step_id: self.state.step_id, + index: self.num_temporal_processor_scheduled, + }); self.temporal_sender.send(message).unwrap(); + self.num_temporal_processor_scheduled += 1; } fn parse_payload(message: &[u8]) -> Payload { @@ -329,23 +394,16 @@ impl BlendNode { self.log_message(format!("MessageReleasedFrom{}", from).as_str(), payload); } - fn log_message_fully_unwrapped( - &self, - payload: &Payload, - network_route: Vec<(NodeId, Duration)>, - ) { + fn log_message_fully_unwrapped(&self, payload: &Payload, tracks: Vec) { log!( "MessageFullyUnwrapped", - MessageWithRouteLog { + MessageWithTracks { message: MessageLog { payload_id: payload.id(), step_id: self.state.step_id, node_id: self.id.index(), }, - network_route: network_route - .into_iter() - .map(|(id, delay)| (id.index(), delay.as_millis())) - .collect(), + tracks, } ); } @@ -372,6 +430,11 @@ impl BlendNode { node_id: self.id.index(), } } + + fn duration_between(&self, from_step: usize, to_step: usize) -> Duration { + self.step_time + .mul((to_step - from_step).try_into().unwrap()) + } } impl Node for BlendNode { @@ -401,22 +464,24 @@ impl Node for BlendNode { .crypto_processor .wrap_message(payload.as_bytes()) .unwrap(); - self.schedule_persistent_transmission(BlendMessage::new(message, self.id)); + self.schedule_persistent_transmission(BlendMessage::new(message)); } } // Handle incoming messages for mut network_message in self.receive() { - assert_eq!( - network_message.payload().network_route.last().unwrap().0, - network_message.from - ); - network_message.payload.network_route.push(( - self.id, - self.regions_data - .network_behaviour(network_message.from, self.id) - .delay(), - )); + network_message + .payload + .tracks + .push(MessageTrack::NetworkReceived { + from_node_id: network_message.from, + to_node_id: self.id, + delay: self + .regions_data + .network_behaviour(network_message.from, self.id) + .delay(), + step_id: self.state.step_id, + }); self.forward( network_message.payload().clone(), Some(network_message.from), @@ -426,9 +491,28 @@ impl Node for BlendNode { } // Proceed temporal processor - if let Poll::Ready(Some(outgoing_msg_with_route)) = + if let Poll::Ready(Some(mut outgoing_msg_with_route)) = pin!(&mut self.temporal_processor_messages).poll_next(&mut cx) { + // Add a TemporalProcessorReleased track + match outgoing_msg_with_route.tracks.last().unwrap() { + MessageTrack::TemporalProcessorScheduled { + node_id, step_id, .. + } => { + assert_eq!(*node_id, self.id); + outgoing_msg_with_route + .tracks + .push(MessageTrack::TemporalProcessorReleased { + node_id: self.id, + step_id: self.state.step_id, + duration: self.duration_between(*step_id, self.state.step_id), + }); + self.num_temporal_processor_scheduled -= 1; + } + track => panic!("Unexpected message track: {:?}", track), + } + + // Proceed the message match outgoing_msg_with_route.outgoing_message { BlendOutgoingMessage::Outbound(message) => { self.log_message_released_from( @@ -437,16 +521,13 @@ impl Node for BlendNode { ); self.schedule_persistent_transmission(BlendMessage { message, - network_route: outgoing_msg_with_route.network_route, + tracks: outgoing_msg_with_route.tracks, }); } BlendOutgoingMessage::FullyUnwrapped(payload) => { let payload = Payload::load(payload); self.log_message_released_from("TemporalProcessor", &payload); - self.log_message_fully_unwrapped( - &payload, - outgoing_msg_with_route.network_route, - ); + self.log_message_fully_unwrapped(&payload, outgoing_msg_with_route.tracks); self.state.num_messages_fully_unwrapped += 1; //TODO: create a tracing event } @@ -461,14 +542,30 @@ impl Node for BlendNode { .crypto_processor .wrap_message(payload.as_bytes()) .unwrap(); - self.schedule_persistent_transmission(BlendMessage::new(message, self.id)); + self.schedule_persistent_transmission(BlendMessage::new(message)); } // Proceed persistent transmission if let Poll::Ready(Some(msg)) = pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx) { - let msg: BlendMessage = bincode::deserialize(&msg).unwrap(); + let mut msg: BlendMessage = bincode::deserialize(&msg).unwrap(); + // Add a PersistentTransmissionReleased track + match msg.tracks.last().unwrap() { + MessageTrack::PersistentTransmissionScheduled { + node_id, step_id, .. + } => { + assert_eq!(*node_id, self.id); + msg.tracks + .push(MessageTrack::PersistentTransmissionReleased { + node_id: self.id, + step_id: self.state.step_id, + duration: self.duration_between(*step_id, self.state.step_id), + }); + self.num_persistent_transmission_scheduled -= 1; + } + track => panic!("Unexpected message track: {:?}", track), + } self.log_message_released_from( "PersistentTransmission", &Self::parse_payload(&msg.message), @@ -498,9 +595,9 @@ struct MessageLog { } #[derive(Debug, Serialize, Deserialize)] -struct MessageWithRouteLog { +struct MessageWithTracks { message: MessageLog, - network_route: Vec<(usize, u128)>, + tracks: Vec, } #[derive(Debug, Serialize, Deserialize)] @@ -509,3 +606,45 @@ struct EmissionLog { step_id: usize, node_id: usize, } + +mod node_id_serde { + use super::NodeId; + use netrunner::node::NodeIdExt; + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize(node_id: &NodeId, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_u64(node_id.index() as u64) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let index = u64::deserialize(deserializer)?; + Ok(NodeId::from_index(index as usize)) + } +} + +mod duration_ms_serde { + use std::time::Duration; + + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize(duration: &Duration, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_u64(duration.as_millis() as u64) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let millis = u64::deserialize(deserializer)?; + Ok(Duration::from_millis(millis)) + } +}