diff --git a/simlib/blendnet-sims/Cargo.toml b/simlib/blendnet-sims/Cargo.toml index 9729a7a..9e90789 100644 --- a/simlib/blendnet-sims/Cargo.toml +++ b/simlib/blendnet-sims/Cargo.toml @@ -27,3 +27,4 @@ sha2 = "0.10" uuid = { version = "1", features = ["fast-rng", "v4"] } tracing-appender = "0.2" cached = "0.54.0" +bincode = "1.3.3" diff --git a/simlib/blendnet-sims/src/main.rs b/simlib/blendnet-sims/src/main.rs index 9738768..bb122eb 100644 --- a/simlib/blendnet-sims/src/main.rs +++ b/simlib/blendnet-sims/src/main.rs @@ -114,6 +114,7 @@ impl SimulationApp { create_boxed_blendnode( node_id, &mut network, + regions_data.clone(), settings.simulation_settings.clone(), no_netcap, BlendnodeSettings { @@ -155,6 +156,7 @@ impl SimulationApp { fn create_boxed_blendnode( node_id: NodeId, network: &mut Network, + regions_data: RegionsData, simulation_settings: SimulationSettings, no_netcap: bool, blendnode_settings: BlendnodeSettings, @@ -190,6 +192,7 @@ fn create_boxed_blendnode( node_id, blendnode_settings, network_interface, + regions_data, )) } diff --git a/simlib/blendnet-sims/src/node/blend/mod.rs b/simlib/blendnet-sims/src/node/blend/mod.rs index 73cf31a..f16091b 100644 --- a/simlib/blendnet-sims/src/node/blend/mod.rs +++ b/simlib/blendnet-sims/src/node/blend/mod.rs @@ -15,6 +15,7 @@ use futures::Stream; use lottery::StakeLottery; use message::{Payload, PayloadId}; use multiaddr::Multiaddr; +use netrunner::network::regions::RegionsData; use netrunner::network::NetworkMessage; use netrunner::node::{Node, NodeId, NodeIdExt}; use netrunner::{ @@ -41,8 +42,20 @@ use state::BlendnodeState; use std::{pin::pin, task::Poll, time::Duration}; use stream_wrapper::CrossbeamReceiverStream; -#[derive(Debug, Clone)] -pub struct BlendMessage(Vec); +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BlendMessage { + message: Vec, + network_route: Vec<(NodeId, Duration)>, +} + +impl BlendMessage { + pub fn new(message: Vec, origin: NodeId) -> Self { + Self { + message, + network_route: vec![(origin, Duration::ZERO)], + } + } +} impl PayloadSize for BlendMessage { fn size_bytes(&self) -> u32 { @@ -50,6 +63,11 @@ impl PayloadSize for BlendMessage { } } +struct BlendOutgoingMessageWithRoute { + outgoing_message: BlendOutgoingMessage, + network_route: Vec<(NodeId, Duration)>, +} + #[derive(Deserialize)] pub struct BlendnodeSettings { pub connected_peers: Vec, @@ -72,6 +90,7 @@ pub struct BlendNode { state: BlendnodeState, settings: BlendnodeSettings, network_interface: InMemoryNetworkInterface, + regions_data: RegionsData, message_cache: TimedCache, data_msg_lottery_update_time_sender: channel::Sender, @@ -87,10 +106,10 @@ pub struct BlendNode { Interval, >, crypto_processor: CryptographicProcessor, - temporal_sender: channel::Sender, + temporal_sender: channel::Sender, temporal_update_time_sender: channel::Sender, temporal_processor_messages: - TemporalStream, TemporalScheduler>, + TemporalStream, TemporalScheduler>, epoch_update_sender: channel::Sender, slot_update_sender: channel::Sender, cover_traffic: CoverTraffic, @@ -101,6 +120,7 @@ impl BlendNode { id: NodeId, settings: BlendnodeSettings, network_interface: InMemoryNetworkInterface, + regions_data: RegionsData, ) -> Self { let mut rng_generator = ChaCha12Rng::seed_from_u64(settings.seed); @@ -178,6 +198,7 @@ impl BlendNode { Self { id, network_interface, + regions_data, // We're not coupling this lifespan with the steps now, but it's okay // We expected that a message will be delivered to most of nodes within 60s. message_cache: TimedCache::with_lifespan(60), @@ -224,7 +245,8 @@ impl BlendNode { self.network_interface .send_message(*node_id, message.clone()) } - self.message_cache.cache_set(Self::sha256(&message.0), ()); + self.message_cache + .cache_set(Self::sha256(&message.message), ()); } fn receive(&mut self) -> Vec> { @@ -234,7 +256,7 @@ impl BlendNode { // Retain only messages that have not been seen before .filter(|msg| { self.message_cache - .cache_set(Self::sha256(&msg.payload().0), ()) + .cache_set(Self::sha256(&msg.payload().message), ()) .is_none() }) .collect() @@ -246,23 +268,29 @@ impl BlendNode { hasher.finalize().into() } - fn schedule_persistent_transmission(&mut self, message: Vec) { + fn schedule_persistent_transmission(&mut self, message: BlendMessage) { self.log_message( "PersistentTransmissionScheduled", - &Self::parse_payload(&message), + &Self::parse_payload(&message.message), ); - self.persistent_sender.send(message).unwrap(); + self.persistent_sender + .send(bincode::serialize(&message).unwrap()) + .unwrap(); } - fn handle_incoming_message(&mut self, message: Vec) { - match self.crypto_processor.unwrap_message(&message) { + fn handle_incoming_message(&mut self, message: BlendMessage) { + match self.crypto_processor.unwrap_message(&message.message) { Ok((unwrapped_message, fully_unwrapped)) => { let temporal_message = if fully_unwrapped { BlendOutgoingMessage::FullyUnwrapped(unwrapped_message) } else { BlendOutgoingMessage::Outbound(unwrapped_message) }; - self.schedule_temporal_processor(temporal_message); + + self.schedule_temporal_processor(BlendOutgoingMessageWithRoute { + outgoing_message: temporal_message, + network_route: message.network_route, + }); } Err(e) => { tracing::debug!("Failed to unwrap message: {:?}", e); @@ -270,8 +298,8 @@ impl BlendNode { } } - fn schedule_temporal_processor(&mut self, message: BlendOutgoingMessage) { - let payload = match &message { + fn schedule_temporal_processor(&mut self, message: BlendOutgoingMessageWithRoute) { + let payload = match &message.outgoing_message { BlendOutgoingMessage::FullyUnwrapped(payload) => Payload::load(payload.clone()), BlendOutgoingMessage::Outbound(msg) => Self::parse_payload(msg), }; @@ -301,8 +329,25 @@ impl BlendNode { self.log_message(format!("MessageReleasedFrom{}", from).as_str(), payload); } - fn log_message_fully_unwrapped(&self, payload: &Payload) { - self.log_message("MessageFullyUnwrapped", payload); + fn log_message_fully_unwrapped( + &self, + payload: &Payload, + network_route: Vec<(NodeId, Duration)>, + ) { + log!( + "MessageFullyUnwrapped", + MessageWithRouteLog { + message: MessageLog { + payload_id: payload.id(), + step_id: self.state.step_id, + node_id: self.id.index(), + }, + network_route: network_route + .into_iter() + .map(|(id, delay)| (id.index(), delay.as_millis())) + .collect(), + } + ); } fn log_message(&self, topic: &str, payload: &Payload) { @@ -356,33 +401,52 @@ impl Node for BlendNode { .crypto_processor .wrap_message(payload.as_bytes()) .unwrap(); - self.schedule_persistent_transmission(message); + self.schedule_persistent_transmission(BlendMessage::new(message, self.id)); } } // Handle incoming messages - for network_message in self.receive() { + for mut network_message in self.receive() { + assert_eq!( + network_message.payload().network_route.last().unwrap().0, + network_message.from + ); + network_message.payload.network_route.push(( + self.id, + self.regions_data + .network_behaviour(network_message.from, self.id) + .delay(), + )); self.forward( network_message.payload().clone(), Some(network_message.from), None, ); - self.handle_incoming_message(network_message.into_payload().0); + self.handle_incoming_message(network_message.into_payload()); } // Proceed temporal processor - if let Poll::Ready(Some(msg)) = + if let Poll::Ready(Some(outgoing_msg_with_route)) = pin!(&mut self.temporal_processor_messages).poll_next(&mut cx) { - match msg { - BlendOutgoingMessage::Outbound(msg) => { - self.log_message_released_from("TemporalProcessor", &Self::parse_payload(&msg)); - self.schedule_persistent_transmission(msg); + match outgoing_msg_with_route.outgoing_message { + BlendOutgoingMessage::Outbound(message) => { + self.log_message_released_from( + "TemporalProcessor", + &Self::parse_payload(&message), + ); + self.schedule_persistent_transmission(BlendMessage { + message, + network_route: outgoing_msg_with_route.network_route, + }); } BlendOutgoingMessage::FullyUnwrapped(payload) => { let payload = Payload::load(payload); self.log_message_released_from("TemporalProcessor", &payload); - self.log_message_fully_unwrapped(&payload); + self.log_message_fully_unwrapped( + &payload, + outgoing_msg_with_route.network_route, + ); self.state.num_messages_fully_unwrapped += 1; //TODO: create a tracing event } @@ -397,19 +461,19 @@ impl Node for BlendNode { .crypto_processor .wrap_message(payload.as_bytes()) .unwrap(); - self.schedule_persistent_transmission(message); + self.schedule_persistent_transmission(BlendMessage::new(message, self.id)); } // Proceed persistent transmission if let Poll::Ready(Some(msg)) = pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx) { - self.log_message_released_from("PersistentTransmission", &Self::parse_payload(&msg)); - self.forward( - BlendMessage(msg), - None, - Some(self.new_emission_log("FromPersistent")), + let msg: BlendMessage = bincode::deserialize(&msg).unwrap(); + self.log_message_released_from( + "PersistentTransmission", + &Self::parse_payload(&msg.message), ); + self.forward(msg, None, Some(self.new_emission_log("FromPersistent"))); } self.state.step_id += 1; @@ -433,6 +497,12 @@ struct MessageLog { node_id: usize, } +#[derive(Debug, Serialize, Deserialize)] +struct MessageWithRouteLog { + message: MessageLog, + network_route: Vec<(usize, u128)>, +} + #[derive(Debug, Serialize, Deserialize)] struct EmissionLog { emission_type: String,