diff --git a/simlib/blendnet-sims/src/main.rs b/simlib/blendnet-sims/src/main.rs index 9cc7dda..3ebbb9d 100644 --- a/simlib/blendnet-sims/src/main.rs +++ b/simlib/blendnet-sims/src/main.rs @@ -114,7 +114,6 @@ impl SimulationApp { create_boxed_blendnode( node_id, &mut network, - regions_data.clone(), settings.simulation_settings.clone(), no_netcap, BlendnodeSettings { @@ -156,7 +155,6 @@ impl SimulationApp { fn create_boxed_blendnode( node_id: NodeId, network: &mut Network, - regions_data: RegionsData, simulation_settings: SimulationSettings, no_netcap: bool, blendnode_settings: BlendnodeSettings, @@ -193,7 +191,6 @@ fn create_boxed_blendnode( blendnode_settings, simulation_settings.step_time, network_interface, - regions_data, )) } diff --git a/simlib/blendnet-sims/src/node/blend/mod.rs b/simlib/blendnet-sims/src/node/blend/mod.rs index 4e51691..c43685d 100644 --- a/simlib/blendnet-sims/src/node/blend/mod.rs +++ b/simlib/blendnet-sims/src/node/blend/mod.rs @@ -15,7 +15,6 @@ use futures::Stream; use lottery::StakeLottery; use message::{Payload, PayloadId}; use multiaddr::Multiaddr; -use netrunner::network::regions::RegionsData; use netrunner::network::NetworkMessage; use netrunner::node::{Node, NodeId, NodeIdExt}; use netrunner::{ @@ -97,14 +96,21 @@ enum MessageHistoryEvent { #[serde(with = "duration_ms_serde")] duration: Duration, }, + NetworkSent { + #[serde(with = "node_id_serde")] + from_node_id: NodeId, + #[serde(with = "node_id_serde")] + to_node_id: NodeId, + step_id: usize, + }, NetworkReceived { #[serde(with = "node_id_serde")] from_node_id: NodeId, #[serde(with = "node_id_serde")] to_node_id: NodeId, - #[serde(with = "duration_ms_serde")] - delay: Duration, step_id: usize, + #[serde(with = "duration_ms_serde")] + latency: Duration, }, } @@ -136,7 +142,6 @@ pub struct BlendNode { settings: BlendnodeSettings, step_time: Duration, network_interface: InMemoryNetworkInterface, - regions_data: RegionsData, message_cache: TimedCache, data_msg_lottery_update_time_sender: channel::Sender, @@ -151,14 +156,12 @@ pub struct BlendNode { MockBlendMessage, Interval, >, - num_persistent_transmission_scheduled: usize, crypto_processor: CryptographicProcessor, temporal_sender: channel::Sender, temporal_update_time_sender: channel::Sender, temporal_processor_messages: TemporalStream, TemporalScheduler>, - num_temporal_processor_scheduled: usize, epoch_update_sender: channel::Sender, slot_update_sender: channel::Sender, @@ -171,7 +174,6 @@ impl BlendNode { settings: BlendnodeSettings, step_time: Duration, network_interface: InMemoryNetworkInterface, - regions_data: RegionsData, ) -> Self { let mut rng_generator = ChaCha12Rng::seed_from_u64(settings.seed); @@ -250,7 +252,6 @@ impl BlendNode { id, step_time, network_interface, - regions_data, // We're not coupling this lifespan with the steps now, but it's okay // We expected that a message will be delivered to most of nodes within 60s. message_cache: TimedCache::with_lifespan(60), @@ -259,6 +260,8 @@ impl BlendNode { node_id: id, step_id: 0, num_messages_fully_unwrapped: 0, + cur_num_persistent_transmission_scheduled: 0, + cur_num_temporal_processor_scheduled: 0, }, data_msg_lottery_update_time_sender, data_msg_lottery_interval, @@ -266,38 +269,30 @@ impl BlendNode { persistent_sender, persistent_update_time_sender, persistent_transmission_messages, - num_persistent_transmission_scheduled: 0, crypto_processor, temporal_sender, temporal_update_time_sender, temporal_processor_messages, - num_temporal_processor_scheduled: 0, epoch_update_sender, slot_update_sender, cover_traffic, } } - fn forward( - &mut self, - message: BlendMessage, - exclude_node: Option, - log: Option, - ) { - for (i, node_id) in self + fn forward(&mut self, message: BlendMessage, exclude_node: Option) { + for node_id in self .settings .connected_peers .iter() .filter(|&id| Some(*id) != exclude_node) - .enumerate() { - if i == 0 { - if let Some(log) = &log { - Self::log_emission(log); - } - } - self.network_interface - .send_message(*node_id, message.clone()) + let mut message = message.clone(); + message.history.push(MessageHistoryEvent::NetworkSent { + from_node_id: self.id, + to_node_id: *node_id, + step_id: self.state.step_id, + }); + self.network_interface.send_message(*node_id, message) } self.message_cache .cache_set(Self::sha256(&message.message), ()); @@ -323,21 +318,17 @@ impl BlendNode { } fn schedule_persistent_transmission(&mut self, mut message: BlendMessage) { - self.log_message( - "PersistentTransmissionScheduled", - &Self::parse_payload(&message.message), - ); message .history .push(MessageHistoryEvent::PersistentTransmissionScheduled { node_id: self.id, step_id: self.state.step_id, - index: self.num_persistent_transmission_scheduled, + index: self.state.cur_num_persistent_transmission_scheduled, }); self.persistent_sender .send(bincode::serialize(&message).unwrap()) .unwrap(); - self.num_persistent_transmission_scheduled += 1; + self.state.cur_num_persistent_transmission_scheduled += 1; } fn handle_incoming_message(&mut self, message: BlendMessage) { @@ -361,24 +352,15 @@ impl BlendNode { } fn schedule_temporal_processor(&mut self, mut message: BlendOutgoingMessageWithHistory) { - let payload = match &message.outgoing_message { - BlendOutgoingMessage::FullyUnwrapped(payload) => Payload::load(payload.clone()), - BlendOutgoingMessage::Outbound(msg) => Self::parse_payload(msg), - }; - self.log_message("TemporalProcessorScheduled", &payload); message .history .push(MessageHistoryEvent::TemporalProcessorScheduled { node_id: self.id, step_id: self.state.step_id, - index: self.num_temporal_processor_scheduled, + index: self.state.cur_num_temporal_processor_scheduled, }); self.temporal_sender.send(message).unwrap(); - self.num_temporal_processor_scheduled += 1; - } - - fn parse_payload(message: &[u8]) -> Payload { - Payload::load(MockBlendMessage::payload(message).unwrap()) + self.state.cur_num_temporal_processor_scheduled += 1; } fn update_time(&mut self, elapsed: Duration) { @@ -391,18 +373,10 @@ impl BlendNode { self.slot_update_sender.send(elapsed).unwrap(); } - fn log_message_generated(&self, msg_type: &str, payload: &Payload) { - self.log_message(format!("{}MessageGenerated", msg_type).as_str(), payload); - } - - fn log_message_released_from(&self, from: &str, payload: &Payload) { - self.log_message(format!("MessageReleasedFrom{}", from).as_str(), payload); - } - fn log_message_fully_unwrapped(&self, payload: &Payload, history: Vec) { log!( "MessageFullyUnwrapped", - MessageWithHistory { + MessageWithHistoryLog { message: MessageLog { payload_id: payload.id(), step_id: self.state.step_id, @@ -413,29 +387,6 @@ impl BlendNode { ); } - fn log_message(&self, topic: &str, payload: &Payload) { - log!( - topic, - MessageLog { - payload_id: payload.id(), - step_id: self.state.step_id, - node_id: self.id.index(), - } - ); - } - - fn log_emission(log: &EmissionLog) { - log!("Emission", log); - } - - fn new_emission_log(&self, emission_type: &str) -> EmissionLog { - EmissionLog { - emission_type: emission_type.to_string(), - step_id: self.state.step_id, - node_id: self.id.index(), - } - } - fn duration_between(&self, from_step: usize, to_step: usize) -> Duration { self.step_time .mul((to_step - from_step).try_into().unwrap()) @@ -464,7 +415,6 @@ impl Node for BlendNode { if let Poll::Ready(Some(_)) = pin!(&mut self.data_msg_lottery_interval).poll_next(&mut cx) { if self.data_msg_lottery.run() { let payload = Payload::new(); - self.log_message_generated("Data", &payload); let message = self .crypto_processor .wrap_message(payload.as_bytes()) @@ -479,22 +429,29 @@ impl Node for BlendNode { // Handle incoming messages for mut network_message in self.receive() { - network_message - .payload - .history - .push(MessageHistoryEvent::NetworkReceived { - from_node_id: network_message.from, - to_node_id: self.id, - delay: self - .regions_data - .network_behaviour(network_message.from, self.id) - .delay(), - step_id: self.state.step_id, - }); + match network_message.payload.history.last().unwrap() { + MessageHistoryEvent::NetworkSent { + to_node_id, + step_id, + .. + } => { + assert_eq!(*to_node_id, self.id); + network_message + .payload + .history + .push(MessageHistoryEvent::NetworkReceived { + from_node_id: network_message.from, + to_node_id: self.id, + step_id: self.state.step_id, + latency: self.duration_between(*step_id, self.state.step_id), + }); + } + event => panic!("Unexpected message history event: {:?}", event), + } + self.forward( network_message.payload().clone(), Some(network_message.from), - None, ); self.handle_incoming_message(network_message.into_payload()); } @@ -503,7 +460,7 @@ impl Node for BlendNode { if let Poll::Ready(Some(mut outgoing_msg_with_route)) = pin!(&mut self.temporal_processor_messages).poll_next(&mut cx) { - // Add a TemporalProcessorReleased track + // Add a TemporalProcessorReleased history event match outgoing_msg_with_route.history.last().unwrap() { MessageHistoryEvent::TemporalProcessorScheduled { node_id, step_id, .. @@ -516,18 +473,14 @@ impl Node for BlendNode { duration: self.duration_between(*step_id, self.state.step_id), }, ); - self.num_temporal_processor_scheduled -= 1; + self.state.cur_num_temporal_processor_scheduled -= 1; } - track => panic!("Unexpected message track: {:?}", track), + event => panic!("Unexpected message history event: {:?}", event), } // Proceed the message match outgoing_msg_with_route.outgoing_message { BlendOutgoingMessage::Outbound(message) => { - self.log_message_released_from( - "TemporalProcessor", - &Self::parse_payload(&message), - ); self.schedule_persistent_transmission(BlendMessage { message, history: outgoing_msg_with_route.history, @@ -535,10 +488,8 @@ impl Node for BlendNode { } BlendOutgoingMessage::FullyUnwrapped(payload) => { let payload = Payload::load(payload); - self.log_message_released_from("TemporalProcessor", &payload); self.log_message_fully_unwrapped(&payload, outgoing_msg_with_route.history); self.state.num_messages_fully_unwrapped += 1; - //TODO: create a tracing event } } } @@ -546,7 +497,6 @@ impl Node for BlendNode { // Generate a cover message probabilistically if let Poll::Ready(Some(_)) = pin!(&mut self.cover_traffic).poll_next(&mut cx) { let payload = Payload::new(); - self.log_message_generated("Cover", &payload); let message = self .crypto_processor .wrap_message(payload.as_bytes()) @@ -563,7 +513,7 @@ impl Node for BlendNode { pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx) { let mut msg: BlendMessage = bincode::deserialize(&msg).unwrap(); - // Add a PersistentTransmissionReleased track + // Add a PersistentTransmissionReleased history event match msg.history.last().unwrap() { MessageHistoryEvent::PersistentTransmissionScheduled { node_id, step_id, .. @@ -575,15 +525,11 @@ impl Node for BlendNode { step_id: self.state.step_id, duration: self.duration_between(*step_id, self.state.step_id), }); - self.num_persistent_transmission_scheduled -= 1; + self.state.cur_num_persistent_transmission_scheduled -= 1; } - track => panic!("Unexpected message track: {:?}", track), + event => panic!("Unexpected message history event: {:?}", event), } - self.log_message_released_from( - "PersistentTransmission", - &Self::parse_payload(&msg.message), - ); - self.forward(msg, None, Some(self.new_emission_log("FromPersistent"))); + self.forward(msg, None); } self.state.step_id += 1; @@ -600,26 +546,19 @@ impl Node for BlendNode { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize)] struct MessageLog { payload_id: PayloadId, step_id: usize, node_id: usize, } -#[derive(Debug, Serialize, Deserialize)] -struct MessageWithHistory { +#[derive(Debug, Serialize)] +struct MessageWithHistoryLog { message: MessageLog, history: Vec, } -#[derive(Debug, Serialize, Deserialize)] -struct EmissionLog { - emission_type: String, - step_id: usize, - node_id: usize, -} - mod node_id_serde { use super::NodeId; use netrunner::node::NodeIdExt; diff --git a/simlib/blendnet-sims/src/node/blend/state.rs b/simlib/blendnet-sims/src/node/blend/state.rs index 7fb4417..49d36f9 100644 --- a/simlib/blendnet-sims/src/node/blend/state.rs +++ b/simlib/blendnet-sims/src/node/blend/state.rs @@ -15,6 +15,8 @@ pub struct BlendnodeState { pub node_id: NodeId, pub step_id: usize, pub num_messages_fully_unwrapped: usize, + pub cur_num_persistent_transmission_scheduled: usize, + pub cur_num_temporal_processor_scheduled: usize, } #[derive(Serialize)]