// std use std::{ collections::HashMap, ops::Add, str::FromStr, sync::{ atomic::{AtomicU32, Ordering}, Arc, }, time::{Duration, Instant}, }; // crates use crossbeam::channel::{self, Receiver, Sender}; use parking_lot::Mutex; use rand::{rngs::SmallRng, Rng, SeedableRng}; use rayon::prelude::*; use serde::{Deserialize, Serialize}; // internal use crate::node::NodeId; pub mod behaviour; pub mod regions; type NetworkTime = Instant; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct NetworkBehaviourKey { pub from: regions::Region, pub to: regions::Region, } impl NetworkBehaviourKey { pub fn new(from: regions::Region, to: regions::Region) -> Self { Self { from, to } } } impl Serialize for NetworkBehaviourKey { fn serialize(&self, serializer: S) -> Result { let s = format!("{}:{}", self.from, self.to); serializer.serialize_str(&s) } } impl<'de> Deserialize<'de> for NetworkBehaviourKey { fn deserialize>(deserializer: D) -> Result { let s = String::deserialize(deserializer)?; let mut split = s.split(':'); let from = split.next().ok_or(serde::de::Error::custom( "NetworkBehaviourKey should be in the form of `from_region:to_region`", ))?; let to = split.next().ok_or(serde::de::Error::custom( "NetworkBehaviourKey should be in the form of `from_region:to_region`", ))?; Ok(Self::new( regions::Region::from_str(from).map_err(serde::de::Error::custom)?, regions::Region::from_str(to).map_err(serde::de::Error::custom)?, )) } } #[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct NetworkSettings { #[serde(with = "network_behaviors_serde")] pub network_behaviors: HashMap, /// Represents node distribution in the simulated regions. /// The sum of distributions should be 1. pub regions: HashMap, } /// Ser/Deser `HashMap` to humantime format. mod network_behaviors_serde { use super::{Deserialize, Duration, HashMap, NetworkBehaviourKey}; /// Have to implement this manually because of the `serde_json` will panic if the key of map /// is not a string. pub fn serialize( vals: &HashMap, serializer: S, ) -> Result where S: serde::Serializer, { use serde::ser::SerializeMap; let mut ser = serializer.serialize_map(Some(vals.len()))?; for (k, v) in vals { ser.serialize_key(&k)?; ser.serialize_value(&humantime::format_duration(*v).to_string())?; } ser.end() } pub fn deserialize<'de, D>( deserializer: D, ) -> Result, D::Error> where D: serde::Deserializer<'de>, { let map = HashMap::::deserialize(deserializer)?; map.into_iter() .map(|(k, v)| { let v = humantime::parse_duration(&v).map_err(serde::de::Error::custom)?; Ok((k, v)) }) .collect::, _>>() } } /// Represents node network capacity and current load in bytes. #[derive(Debug)] struct NodeNetworkCapacity { capacity_bps: Option, current_load: Mutex, load_to_flush: AtomicU32, } impl NodeNetworkCapacity { fn new(capacity_bps: Option) -> Self { Self { capacity_bps, current_load: Mutex::new(0), load_to_flush: AtomicU32::new(0), } } fn increase_load(&self, load: u32) -> bool { if let Some(capacity_bps) = self.capacity_bps { let mut current_load = self.current_load.lock(); if *current_load + load <= capacity_bps { *current_load += load; true } else { false } } else { let mut current_load = self.current_load.lock(); *current_load += load; true } } fn decrease_load(&self, load: u32) { self.load_to_flush.fetch_add(load, Ordering::Relaxed); } fn flush_load(&self) -> u32 { let mut s = self.current_load.lock(); let previous_load = *s; *s -= self.load_to_flush.load(Ordering::Relaxed); self.load_to_flush.store(0, Ordering::Relaxed); previous_load } } #[derive(Debug)] pub struct Network where M: std::fmt::Debug + PayloadSize, { pub regions: regions::RegionsData, network_time: NetworkTime, messages: Vec<(NetworkTime, NetworkMessage)>, node_network_capacity: HashMap, from_node_receivers: HashMap>>, from_node_broadcast_receivers: HashMap>>, to_node_senders: HashMap>>, state: NetworkState, seed: u64, } #[derive(Serialize, Deserialize, Default, Debug, Clone)] pub struct NetworkState { pub total_outbound_bandwidth: u64, pub total_inbound_bandwidth: u64, } impl Network where M: std::fmt::Debug + PayloadSize + Send + Sync + Clone, { pub fn new(regions: regions::RegionsData, seed: u64) -> Self { Self { regions, network_time: Instant::now(), messages: Vec::new(), node_network_capacity: HashMap::new(), from_node_receivers: HashMap::new(), from_node_broadcast_receivers: HashMap::new(), to_node_senders: HashMap::new(), state: NetworkState::default(), seed, } } pub fn bandwidth_results(&self) -> NetworkState { self.state.clone() } fn send_message_cost( &self, rng: &mut R, node_a: NodeId, node_b: NodeId, ) -> Option { let network_behaviour = self.regions.network_behaviour(node_a, node_b); (!network_behaviour.should_drop(rng)) // TODO: use a delay range .then(|| network_behaviour.delay()) } pub fn connect( &mut self, node_id: NodeId, capacity_bps: Option, node_message_receiver: Receiver>, node_message_broadcast_receiver: Receiver>, ) -> Receiver> { self.node_network_capacity .insert(node_id, NodeNetworkCapacity::new(capacity_bps)); let (to_node_sender, from_network_receiver) = channel::unbounded(); self.from_node_receivers .insert(node_id, node_message_receiver); self.from_node_broadcast_receivers .insert(node_id, node_message_broadcast_receiver); self.to_node_senders.insert(node_id, to_node_sender); from_network_receiver } /// Collects and dispatches messages to connected interfaces. pub fn step(&mut self, time_passed: Duration) { self.collect_messages(); self.dispatch_after(time_passed); } /// Receive and store all messages from nodes. pub fn collect_messages(&mut self) { let mut total_step_outbound_bandwidth = 0u64; let mut adhoc_messages: Vec<(Instant, NetworkMessage)> = self .from_node_receivers .par_iter() .flat_map(|(_, from_node)| { from_node .try_iter() .map(|msg| (self.network_time, msg)) .collect::>() }) .collect(); total_step_outbound_bandwidth += adhoc_messages .iter() .map(|(_, m)| m.payload().size_bytes() as u64) .sum::(); self.messages.append(&mut adhoc_messages); let mut broadcast_messages = self .from_node_broadcast_receivers .iter() .flat_map(|(_, from_node)| { from_node.try_iter().flat_map(|msg| { self.to_node_senders.keys().map(move |recipient| { let mut m = msg.clone(); m.to = Some(*recipient); m }) }) }) .map(|m| (self.network_time, m)) .collect::>(); total_step_outbound_bandwidth += broadcast_messages .iter() .map(|(_, m)| m.payload().size_bytes() as u64) .sum::(); self.messages.append(&mut broadcast_messages); self.state.total_outbound_bandwidth += total_step_outbound_bandwidth; } /// Reiterate all messages and send to appropriate nodes if simulated /// delay has passed. pub fn dispatch_after(&mut self, time_passed: Duration) { self.network_time += time_passed; let delayed = self .messages .par_iter() .filter(|(network_time, message)| { let mut rng = SmallRng::seed_from_u64(self.seed); self.send_or_drop_message(&mut rng, network_time, message) }) .cloned() .collect(); self.messages = delayed; let mut total_step_inbound_bandwidth = 0u64; for (_, c) in self.node_network_capacity.iter() { total_step_inbound_bandwidth += c.flush_load() as u64; } self.state.total_inbound_bandwidth += total_step_inbound_bandwidth; } /// Returns true if message needs to be delayed and be dispatched in future. fn send_or_drop_message( &self, rng: &mut R, network_time: &NetworkTime, message: &NetworkMessage, ) -> bool { let to = message.to.expect("adhoc message has recipient"); if let Some(delay) = self.send_message_cost(rng, message.from, to) { let node_capacity = self.node_network_capacity.get(&to).unwrap(); let should_send = network_time.add(delay) <= self.network_time; let remaining_size = message.remaining_size(); if should_send && node_capacity.increase_load(remaining_size) { let to_node = self.to_node_senders.get(&to).unwrap(); to_node .send(message.clone()) .expect("node should have connection"); node_capacity.decrease_load(remaining_size); return false; } else { // if we do not need to delay, then we should check if the msg is too large // if so, we mock the partial sending message behavior if should_send { // if remaining is 0, we should send without delay return self.try_partial_send(node_capacity, message, &to) != 0; } return true; } } false } /// Try to apply partial send logic, returns the remaining size of the message fn try_partial_send( &self, node_capacity: &NodeNetworkCapacity, message: &NetworkMessage, to: &NodeId, ) -> u32 { if let Some(capacity_bps) = node_capacity.capacity_bps { let mut cap = node_capacity.current_load.lock(); let sent = capacity_bps - *cap; *cap = capacity_bps; let remaining = message.partial_send(sent); // Message is partially sent, the node capacity needs to be flushed at the end of step even // if the whole message is not sent. node_capacity.decrease_load(sent); if remaining == 0 { let to_node = self.to_node_senders.get(to).unwrap(); to_node .send(message.clone()) .expect("node should have connection"); } remaining } else { node_capacity.decrease_load(message.remaining_size()); 0 } } } #[derive(Clone, Debug)] pub struct NetworkMessage { pub from: NodeId, pub to: Option, pub payload: M, pub remaining: Arc, } impl NetworkMessage { pub fn new(from: NodeId, to: Option, payload: M, size_bytes: u32) -> Self { Self { from, to, payload, remaining: Arc::new(AtomicU32::new(size_bytes)), } } pub fn payload(&self) -> &M { &self.payload } pub fn into_payload(self) -> M { self.payload } fn remaining_size(&self) -> u32 { self.remaining.load(Ordering::SeqCst) } /// Mock the partial sending of a message behavior, returning the remaining message size. fn partial_send(&self, size: u32) -> u32 { self.remaining .fetch_sub(size, Ordering::SeqCst) .saturating_sub(size) } } pub trait PayloadSize { fn size_bytes(&self) -> u32; } pub trait NetworkInterface { type Payload; fn broadcast(&self, message: Self::Payload); fn send_message(&self, address: NodeId, message: Self::Payload); fn receive_messages(&self) -> Vec>; } pub struct InMemoryNetworkInterface { id: NodeId, broadcast: Sender>, sender: Sender>, receiver: Receiver>, } impl InMemoryNetworkInterface { pub fn new( id: NodeId, broadcast: Sender>, sender: Sender>, receiver: Receiver>, ) -> Self { Self { id, broadcast, sender, receiver, } } } impl NetworkInterface for InMemoryNetworkInterface { type Payload = M; fn broadcast(&self, message: Self::Payload) { let size = message.size_bytes(); let message = NetworkMessage::new(self.id, None, message, size); self.broadcast.send(message).unwrap(); } fn send_message(&self, address: NodeId, message: Self::Payload) { let size = message.size_bytes(); let message = NetworkMessage::new(self.id, Some(address), message, size); self.sender.send(message).unwrap(); } fn receive_messages(&self) -> Vec> { self.receiver.try_iter().collect() } } #[cfg(test)] mod tests { use super::{ behaviour::NetworkBehaviour, regions::{Region, RegionsData}, Network, NetworkInterface, NetworkMessage, PayloadSize, }; use crate::{ network::NetworkBehaviourKey, node::{NodeId, NodeIdExt}, }; use crossbeam::channel::{self, Receiver, Sender}; use std::{collections::HashMap, time::Duration}; struct MockNetworkInterface { id: NodeId, broadcast: Sender>, sender: Sender>, receiver: Receiver>, message_size: u32, } impl MockNetworkInterface { pub fn new( id: NodeId, broadcast: Sender>, sender: Sender>, receiver: Receiver>, message_size: u32, ) -> Self { Self { id, broadcast, sender, receiver, message_size, } } } impl PayloadSize for () { fn size_bytes(&self) -> u32 { todo!() } } impl NetworkInterface for MockNetworkInterface { type Payload = (); fn broadcast(&self, message: Self::Payload) { let message = NetworkMessage::new(self.id, None, message, self.message_size); self.broadcast.send(message).unwrap(); } fn send_message(&self, address: NodeId, message: Self::Payload) { let message = NetworkMessage::new(self.id, Some(address), message, self.message_size); self.sender.send(message).unwrap(); } fn receive_messages(&self) -> Vec> { self.receiver.try_iter().collect() } } #[test] fn send_receive_messages() { let node_a = NodeId::from_index(0); let node_b = NodeId::from_index(1); let regions = HashMap::from([(Region::Europe, vec![node_a, node_b])]); let behaviour = HashMap::from([( NetworkBehaviourKey::new(Region::Europe, Region::Europe), NetworkBehaviour::new(Duration::from_millis(100), 0.0), )]); let regions_data = RegionsData::new(regions, behaviour); let mut network = Network::new(regions_data, 0); let (from_a_sender, from_a_receiver) = channel::unbounded(); let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded(); let to_a_receiver = network.connect(node_a, Some(3), from_a_receiver, from_a_broadcast_receiver); let a = MockNetworkInterface::new( node_a, from_a_broadcast_sender, from_a_sender, to_a_receiver, 1, ); let (from_b_sender, from_b_receiver) = channel::unbounded(); let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded(); let to_b_receiver = network.connect(node_b, Some(3), from_b_receiver, from_b_broadcast_receiver); let b = MockNetworkInterface::new( node_b, from_b_broadcast_sender, from_b_sender, to_b_receiver, 1, ); a.send_message(node_b, ()); network.collect_messages(); assert_eq!(a.receive_messages().len(), 0); assert_eq!(b.receive_messages().len(), 0); network.step(Duration::from_millis(0)); assert_eq!(a.receive_messages().len(), 0); assert_eq!(b.receive_messages().len(), 0); network.step(Duration::from_millis(100)); assert_eq!(a.receive_messages().len(), 0); assert_eq!(b.receive_messages().len(), 1); network.step(Duration::from_millis(100)); assert_eq!(a.receive_messages().len(), 0); assert_eq!(b.receive_messages().len(), 0); b.send_message(node_a, ()); b.send_message(node_a, ()); b.send_message(node_a, ()); network.collect_messages(); network.dispatch_after(Duration::from_millis(100)); assert_eq!(a.receive_messages().len(), 3); assert_eq!(b.receive_messages().len(), 0); } #[test] fn regions_send_receive_messages() { let node_a = NodeId::from_index(0); let node_b = NodeId::from_index(1); let node_c = NodeId::from_index(2); let regions = HashMap::from([ (Region::Asia, vec![node_a, node_b]), (Region::Europe, vec![node_c]), ]); let behaviour = HashMap::from([ ( NetworkBehaviourKey::new(Region::Asia, Region::Asia), NetworkBehaviour::new(Duration::from_millis(100), 0.0), ), ( NetworkBehaviourKey::new(Region::Asia, Region::Europe), NetworkBehaviour::new(Duration::from_millis(500), 0.0), ), ( NetworkBehaviourKey::new(Region::Europe, Region::Europe), NetworkBehaviour::new(Duration::from_millis(100), 0.0), ), ]); let regions_data = RegionsData::new(regions, behaviour); let mut network = Network::new(regions_data, 0); let (from_a_sender, from_a_receiver) = channel::unbounded(); let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded(); let to_a_receiver = network.connect(node_a, Some(2), from_a_receiver, from_a_broadcast_receiver); let a = MockNetworkInterface::new( node_a, from_a_broadcast_sender, from_a_sender, to_a_receiver, 1, ); let (from_b_sender, from_b_receiver) = channel::unbounded(); let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded(); let to_b_receiver = network.connect(node_b, Some(2), from_b_receiver, from_b_broadcast_receiver); let b = MockNetworkInterface::new( node_b, from_b_broadcast_sender, from_b_sender, to_b_receiver, 1, ); let (from_c_sender, from_c_receiver) = channel::unbounded(); let (from_c_broadcast_sender, from_c_broadcast_receiver) = channel::unbounded(); let to_c_receiver = network.connect(node_c, Some(2), from_c_receiver, from_c_broadcast_receiver); let c = MockNetworkInterface::new( node_c, from_c_broadcast_sender, from_c_sender, to_c_receiver, 1, ); a.send_message(node_b, ()); a.send_message(node_c, ()); network.collect_messages(); b.send_message(node_a, ()); b.send_message(node_c, ()); network.collect_messages(); c.send_message(node_a, ()); c.send_message(node_b, ()); network.collect_messages(); network.dispatch_after(Duration::from_millis(100)); assert_eq!(a.receive_messages().len(), 1); assert_eq!(b.receive_messages().len(), 1); assert_eq!(c.receive_messages().len(), 0); a.send_message(node_b, ()); b.send_message(node_c, ()); network.collect_messages(); network.dispatch_after(Duration::from_millis(400)); assert_eq!(a.receive_messages().len(), 1); // c to a assert_eq!(b.receive_messages().len(), 2); // c to b && a to b assert_eq!(c.receive_messages().len(), 2); // a to c && b to c network.dispatch_after(Duration::from_millis(100)); assert_eq!(a.receive_messages().len(), 0); assert_eq!(b.receive_messages().len(), 0); assert_eq!(c.receive_messages().len(), 1); // b to c } #[test] fn node_network_capacity_limit() { let node_a = NodeId::from_index(0); let node_b = NodeId::from_index(1); let regions = HashMap::from([(Region::Europe, vec![node_a, node_b])]); let behaviour = HashMap::from([( NetworkBehaviourKey::new(Region::Europe, Region::Europe), NetworkBehaviour::new(Duration::from_millis(100), 0.0), )]); let regions_data = RegionsData::new(regions, behaviour); let mut network = Network::new(regions_data, 0); let (from_a_sender, from_a_receiver) = channel::unbounded(); let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded(); let to_a_receiver = network.connect(node_a, Some(3), from_a_receiver, from_a_broadcast_receiver); let a = MockNetworkInterface::new( node_a, from_a_broadcast_sender, from_a_sender, to_a_receiver, 1, ); let (from_b_sender, from_b_receiver) = channel::unbounded(); let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded(); let to_b_receiver = network.connect(node_b, Some(2), from_b_receiver, from_b_broadcast_receiver); let b = MockNetworkInterface::new( node_b, from_b_broadcast_sender, from_b_sender, to_b_receiver, 1, ); for _ in 0..6 { a.send_message(node_b, ()); b.send_message(node_a, ()); } network.step(Duration::from_millis(100)); assert_eq!(a.receive_messages().len(), 3); assert_eq!(b.receive_messages().len(), 2); network.step(Duration::from_millis(100)); assert_eq!(a.receive_messages().len(), 3); assert_eq!(b.receive_messages().len(), 2); network.step(Duration::from_millis(100)); assert_eq!(a.receive_messages().len(), 0); assert_eq!(b.receive_messages().len(), 2); } #[test] fn node_network_capacity_no_limit() { let node_a = NodeId::from_index(0); let node_b = NodeId::from_index(1); let regions = HashMap::from([(Region::Europe, vec![node_a, node_b])]); let behaviour = HashMap::from([( NetworkBehaviourKey::new(Region::Europe, Region::Europe), NetworkBehaviour::new(Duration::from_millis(100), 0.0), )]); let regions_data = RegionsData::new(regions, behaviour); let mut network = Network::new(regions_data, 0); let (from_a_sender, from_a_receiver) = channel::unbounded(); let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded(); let to_a_receiver = network.connect(node_a, None, from_a_receiver, from_a_broadcast_receiver); let a = MockNetworkInterface::new( node_a, from_a_broadcast_sender, from_a_sender, to_a_receiver, 1000, ); let (from_b_sender, from_b_receiver) = channel::unbounded(); let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded(); let to_b_receiver = network.connect(node_b, None, from_b_receiver, from_b_broadcast_receiver); let b = MockNetworkInterface::new( node_b, from_b_broadcast_sender, from_b_sender, to_b_receiver, 100, ); for _ in 0..6 { a.send_message(node_b, ()); b.send_message(node_a, ()); } network.step(Duration::from_millis(100)); assert_eq!(a.receive_messages().len(), 6); assert_eq!(b.receive_messages().len(), 6); } #[test] fn node_network_message_partial_send() { let node_a = NodeId::from_index(0); let node_b = NodeId::from_index(1); let regions = HashMap::from([(Region::Europe, vec![node_a, node_b])]); let behaviour = HashMap::from([( NetworkBehaviourKey::new(Region::Europe, Region::Europe), NetworkBehaviour::new(Duration::from_millis(100), 0.0), )]); let regions_data = RegionsData::new(regions, behaviour); let mut network = Network::new(regions_data, 0); let (from_a_sender, from_a_receiver) = channel::unbounded(); let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded(); // Node A is connected to the network with throuput of 5. let to_a_receiver = network.connect(node_a, Some(5), from_a_receiver, from_a_broadcast_receiver); // Every message sent **from** Node A will be of size 15. let a = MockNetworkInterface::new( node_a, from_a_broadcast_sender, from_a_sender, to_a_receiver, 2, ); let (from_b_sender, from_b_receiver) = channel::unbounded(); let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded(); // Node B is connected to the network with throuput of 1. let to_b_receiver = network.connect(node_b, Some(1), from_b_receiver, from_b_broadcast_receiver); // Every message sent **from** Node B will be of size 2. let b = MockNetworkInterface::new( node_b, from_b_broadcast_sender, from_b_sender, to_b_receiver, 15, ); // Node A sends message of size 2 to Node B. a.send_message(node_b, ()); // Node B sends message of size 15 to Node A. b.send_message(node_a, ()); // Step duration matches the latency between nodes, thus Node A can receive 5 units of a // message, Node B - 1 unit of a message during the step. network.step(Duration::from_millis(100)); assert_eq!(a.receive_messages().len(), 0); assert_eq!(b.receive_messages().len(), 0); // Node B should receive a message during the second step, because it's throughput during the // step is 1, but the message size it receives is 2. network.step(Duration::from_millis(100)); assert_eq!(a.receive_messages().len(), 0); assert_eq!(b.receive_messages().len(), 1); // Node A should receive a message during the third step, because it's throughput during the // step is 5, but the message it recieves is of size 15. network.step(Duration::from_millis(100)); assert_eq!(a.receive_messages().len(), 1); assert_eq!(b.receive_messages().len(), 0); } }