diff --git a/simulations/src/bin/app/main.rs b/simulations/src/bin/app/main.rs index fab0188c..9f78854e 100644 --- a/simulations/src/bin/app/main.rs +++ b/simulations/src/bin/app/main.rs @@ -80,8 +80,14 @@ impl SimulationApp { let (node_message_broadcast_sender, node_message_broadcast_receiver) = channel::unbounded(); let (node_message_sender, node_message_receiver) = channel::unbounded(); + // Dividing milliseconds in second by milliseconds in the step. + let step_time_as_second_fraction = + 1_000_000 / simulation_settings.step_time.subsec_millis(); + let capacity_bps = simulation_settings.node_settings.network_capacity_kbps * 1024 + / step_time_as_second_fraction; let network_message_receiver = network.connect( node_id, + capacity_bps, node_message_receiver, node_message_broadcast_receiver, ); diff --git a/simulations/src/network/mod.rs b/simulations/src/network/mod.rs index f0d13552..d3d55b49 100644 --- a/simulations/src/network/mod.rs +++ b/simulations/src/network/mod.rs @@ -3,10 +3,12 @@ use std::{ collections::HashMap, ops::Add, str::FromStr, + sync::atomic::{AtomicU32, Ordering}, time::{Duration, Instant}, }; // crates use crossbeam::channel::{self, Receiver, Sender}; +use parking_lot::Mutex; use rand::{rngs::SmallRng, Rng, SeedableRng}; use rayon::prelude::*; use serde::{Deserialize, Serialize}; @@ -101,10 +103,48 @@ mod network_behaviors_serde { } } +/// Represents node network capacity and current load in bytes. +struct NodeNetworkCapacity { + capacity_bps: u32, + current_load: Mutex, + load_to_flush: AtomicU32, +} + +impl NodeNetworkCapacity { + fn new(capacity_bps: u32) -> Self { + Self { + capacity_bps, + current_load: Mutex::new(0), + load_to_flush: AtomicU32::new(0), + } + } + + fn increase_load(&self, load: u32) -> bool { + let mut current_load = self.current_load.lock(); + if *current_load + load <= self.capacity_bps { + *current_load += load; + true + } else { + false + } + } + + fn decrease_load(&self, load: u32) { + self.load_to_flush.fetch_add(load, Ordering::Relaxed); + } + + fn flush_load(&self) { + let mut s = self.current_load.lock(); + *s -= self.load_to_flush.load(Ordering::Relaxed); + self.load_to_flush.store(0, Ordering::Relaxed); + } +} + pub struct Network { pub regions: regions::RegionsData, network_time: NetworkTime, messages: Vec<(NetworkTime, NetworkMessage)>, + node_network_capacity: HashMap, from_node_receivers: HashMap>>, from_node_broadcast_receivers: HashMap>>, to_node_senders: HashMap>>, @@ -120,6 +160,7 @@ where regions, network_time: Instant::now(), messages: Vec::new(), + node_network_capacity: HashMap::new(), from_node_receivers: HashMap::new(), from_node_broadcast_receivers: HashMap::new(), to_node_senders: HashMap::new(), @@ -142,9 +183,12 @@ where pub fn connect( &mut self, node_id: NodeId, + capacity_bps: u32, node_message_receiver: Receiver>, node_message_broadcast_receiver: Receiver>, ) -> Receiver> { + self.node_network_capacity + .insert(node_id, NodeNetworkCapacity::new(capacity_bps)); let (to_node_sender, from_network_receiver) = channel::unbounded(); self.from_node_receivers .insert(node_id, node_message_receiver); @@ -206,6 +250,10 @@ where .cloned() .collect(); + for (_, c) in self.node_network_capacity.iter() { + c.flush_load(); + } + self.messages = delayed; } @@ -218,11 +266,15 @@ where ) -> bool { let to = message.to.expect("adhoc message has recipient"); if let Some(delay) = self.send_message_cost(rng, message.from, to) { - if network_time.add(delay) <= self.network_time { + let node_capacity = self.node_network_capacity.get(&to).unwrap(); + if network_time.add(delay) <= self.network_time + && node_capacity.increase_load(message.size_bytes) + { let to_node = self.to_node_senders.get(&to).unwrap(); to_node .send(message.clone()) - .expect("Node should have connection"); + .expect("node should have connection"); + node_capacity.decrease_load(message.size_bytes); return false; } else { return true; @@ -237,11 +289,17 @@ pub struct NetworkMessage { pub from: NodeId, pub to: Option, pub payload: M, + pub size_bytes: u32, } impl NetworkMessage { - pub fn new(from: NodeId, to: Option, payload: M) -> Self { - Self { from, to, payload } + pub fn new(from: NodeId, to: Option, payload: M, size_bytes: u32) -> Self { + Self { + from, + to, + payload, + size_bytes, + } } pub fn get_payload(self) -> M { @@ -249,6 +307,10 @@ impl NetworkMessage { } } +pub trait PayloadSize { + fn size_bytes(&self) -> u32; +} + pub trait NetworkInterface { type Payload; @@ -280,16 +342,18 @@ impl InMemoryNetworkInterface { } } -impl NetworkInterface for InMemoryNetworkInterface { +impl NetworkInterface for InMemoryNetworkInterface { type Payload = M; fn broadcast(&self, message: Self::Payload) { - let message = NetworkMessage::new(self.id, None, message); + let size = message.size_bytes(); + let message = NetworkMessage::new(self.id, None, message, size); self.broadcast.send(message).unwrap(); } fn send_message(&self, address: NodeId, message: Self::Payload) { - let message = NetworkMessage::new(self.id, Some(address), message); + let size = message.size_bytes(); + let message = NetworkMessage::new(self.id, Some(address), message, size); self.sender.send(message).unwrap(); } @@ -339,12 +403,12 @@ mod tests { type Payload = (); fn broadcast(&self, message: Self::Payload) { - let message = NetworkMessage::new(self.id, None, message); + let message = NetworkMessage::new(self.id, None, message, 1); self.broadcast.send(message).unwrap(); } fn send_message(&self, address: NodeId, message: Self::Payload) { - let message = NetworkMessage::new(self.id, Some(address), message); + let message = NetworkMessage::new(self.id, Some(address), message, 1); self.sender.send(message).unwrap(); } @@ -368,7 +432,7 @@ mod tests { let (from_a_sender, from_a_receiver) = channel::unbounded(); let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded(); - let to_a_receiver = network.connect(node_a, from_a_receiver, from_a_broadcast_receiver); + let to_a_receiver = network.connect(node_a, 3, from_a_receiver, from_a_broadcast_receiver); let a = MockNetworkInterface::new( node_a, from_a_broadcast_sender, @@ -378,7 +442,7 @@ mod tests { let (from_b_sender, from_b_receiver) = channel::unbounded(); let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded(); - let to_b_receiver = network.connect(node_b, from_b_receiver, from_b_broadcast_receiver); + let to_b_receiver = network.connect(node_b, 3, from_b_receiver, from_b_broadcast_receiver); let b = MockNetworkInterface::new( node_b, from_b_broadcast_sender, @@ -443,7 +507,7 @@ mod tests { let (from_a_sender, from_a_receiver) = channel::unbounded(); let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded(); - let to_a_receiver = network.connect(node_a, from_a_receiver, from_a_broadcast_receiver); + let to_a_receiver = network.connect(node_a, 2, from_a_receiver, from_a_broadcast_receiver); let a = MockNetworkInterface::new( node_a, from_a_broadcast_sender, @@ -453,7 +517,7 @@ mod tests { let (from_b_sender, from_b_receiver) = channel::unbounded(); let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded(); - let to_b_receiver = network.connect(node_b, from_b_receiver, from_b_broadcast_receiver); + let to_b_receiver = network.connect(node_b, 2, from_b_receiver, from_b_broadcast_receiver); let b = MockNetworkInterface::new( node_b, from_b_broadcast_sender, @@ -463,7 +527,7 @@ mod tests { let (from_c_sender, from_c_receiver) = channel::unbounded(); let (from_c_broadcast_sender, from_c_broadcast_receiver) = channel::unbounded(); - let to_c_receiver = network.connect(node_c, from_c_receiver, from_c_broadcast_receiver); + let to_c_receiver = network.connect(node_c, 2, from_c_receiver, from_c_broadcast_receiver); let c = MockNetworkInterface::new( node_c, from_c_broadcast_sender, @@ -502,4 +566,55 @@ mod tests { assert_eq!(b.receive_messages().len(), 0); assert_eq!(c.receive_messages().len(), 1); // b to c } + + #[test] + fn node_network_capacity_limit() { + let node_a = NodeId::from_index(0); + let node_b = NodeId::from_index(1); + + let regions = HashMap::from([(Region::Europe, vec![node_a, node_b])]); + let behaviour = HashMap::from([( + NetworkBehaviourKey::new(Region::Europe, Region::Europe), + NetworkBehaviour::new(Duration::from_millis(100), 0.0), + )]); + let regions_data = RegionsData::new(regions, behaviour); + let mut network = Network::new(regions_data, 0); + + let (from_a_sender, from_a_receiver) = channel::unbounded(); + let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded(); + let to_a_receiver = network.connect(node_a, 3, from_a_receiver, from_a_broadcast_receiver); + let a = MockNetworkInterface::new( + node_a, + from_a_broadcast_sender, + from_a_sender, + to_a_receiver, + ); + + let (from_b_sender, from_b_receiver) = channel::unbounded(); + let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded(); + let to_b_receiver = network.connect(node_b, 2, from_b_receiver, from_b_broadcast_receiver); + let b = MockNetworkInterface::new( + node_b, + from_b_broadcast_sender, + from_b_sender, + to_b_receiver, + ); + + for _ in 0..6 { + a.send_message(node_b, ()); + b.send_message(node_a, ()); + } + + network.step(Duration::from_millis(100)); + assert_eq!(a.receive_messages().len(), 3); + assert_eq!(b.receive_messages().len(), 2); + + network.step(Duration::from_millis(100)); + assert_eq!(a.receive_messages().len(), 3); + assert_eq!(b.receive_messages().len(), 2); + + network.step(Duration::from_millis(100)); + assert_eq!(a.receive_messages().len(), 0); + assert_eq!(b.receive_messages().len(), 2); + } } diff --git a/simulations/src/node/carnot/messages.rs b/simulations/src/node/carnot/messages.rs index 226e9ee8..6873feb0 100644 --- a/simulations/src/node/carnot/messages.rs +++ b/simulations/src/node/carnot/messages.rs @@ -3,6 +3,8 @@ use nomos_consensus::network::messages::{ NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg, }; +use crate::network::PayloadSize; + #[derive(Debug, Eq, PartialEq, Hash, Clone)] pub enum CarnotMessage { Proposal(ProposalChunkMsg), @@ -23,3 +25,17 @@ impl CarnotMessage { } } } + +impl PayloadSize for CarnotMessage { + fn size_bytes(&self) -> u32 { + match self { + CarnotMessage::Proposal(p) => { + (std::mem::size_of::() + p.chunk.len()) as u32 + } + CarnotMessage::Vote(_) => std::mem::size_of::() as u32, + CarnotMessage::TimeoutQc(_) => std::mem::size_of::() as u32, + CarnotMessage::Timeout(_) => std::mem::size_of::() as u32, + CarnotMessage::NewView(_) => std::mem::size_of::() as u32, + } + } +} diff --git a/simulations/src/node/dummy.rs b/simulations/src/node/dummy.rs index dfe34aa6..1f88a1de 100644 --- a/simulations/src/node/dummy.rs +++ b/simulations/src/node/dummy.rs @@ -3,6 +3,7 @@ use consensus_engine::View; use std::collections::{BTreeMap, BTreeSet}; use std::time::Duration; // crates +use crate::network::PayloadSize; use serde::{Deserialize, Serialize}; // internal use crate::{ @@ -90,6 +91,12 @@ pub enum DummyMessage { Proposal(Block), } +impl PayloadSize for DummyMessage { + fn size_bytes(&self) -> u32 { + 0 + } +} + struct LocalView { pub next_view_leaders: Vec, pub current_roots: Option>, @@ -475,6 +482,7 @@ mod tests { channel::unbounded(); let network_message_receiver = network.connect( *node_id, + 0, node_message_receiver, node_message_broadcast_receiver, ); diff --git a/simulations/src/runner/async_runner.rs b/simulations/src/runner/async_runner.rs index 940b6b2c..50bcc022 100644 --- a/simulations/src/runner/async_runner.rs +++ b/simulations/src/runner/async_runner.rs @@ -16,6 +16,7 @@ use super::SimulationRunnerHandle; pub fn simulate( runner: SimulationRunner, chunk_size: usize, + step_time: Duration, ) -> anyhow::Result> where M: std::fmt::Debug + Clone + Send + Sync + 'static, @@ -38,7 +39,6 @@ where let (stop_tx, stop_rx) = bounded(1); let p = runner.producer.clone(); let p1 = runner.producer; - let elapsed = Duration::from_millis(100); let handle = std::thread::spawn(move || { loop { select! { @@ -53,7 +53,7 @@ where .write() .par_iter_mut() .filter(|n| ids.contains(&n.id())) - .for_each(|node|node.step(elapsed)); + .for_each(|node|node.step(step_time)); p.send(R::try_from( &simulation_state, diff --git a/simulations/src/runner/glauber_runner.rs b/simulations/src/runner/glauber_runner.rs index 38dfb096..1beb4694 100644 --- a/simulations/src/runner/glauber_runner.rs +++ b/simulations/src/runner/glauber_runner.rs @@ -19,6 +19,7 @@ pub fn simulate( runner: SimulationRunner, update_rate: usize, maximum_iterations: usize, + step_time: Duration, ) -> anyhow::Result> where M: std::fmt::Debug + Send + Sync + Clone + 'static, @@ -42,7 +43,6 @@ where let (stop_tx, stop_rx) = bounded(1); let p = runner.producer.clone(); let p1 = runner.producer; - let elapsed = Duration::from_millis(100); let handle = std::thread::spawn(move || { 'main: for chunk in iterations.chunks(update_rate) { select! { @@ -62,7 +62,7 @@ where let node: &mut dyn Node = &mut **shared_nodes .get_mut(node_id.index()) .expect("Node should be present"); - node.step(elapsed); + node.step(step_time); } // check if any condition makes the simulation stop diff --git a/simulations/src/runner/layered_runner.rs b/simulations/src/runner/layered_runner.rs index b0bb0277..1dfb13ca 100644 --- a/simulations/src/runner/layered_runner.rs +++ b/simulations/src/runner/layered_runner.rs @@ -52,6 +52,7 @@ pub fn simulate( runner: SimulationRunner, gap: usize, distribution: Option>, + step_time: Duration, ) -> anyhow::Result> where M: std::fmt::Debug + Send + Sync + Clone + 'static, @@ -79,7 +80,6 @@ where let (stop_tx, stop_rx) = bounded(1); let p = runner.producer.clone(); let p1 = runner.producer; - let elapsed = Duration::from_millis(100); let handle = std::thread::spawn(move || { loop { select! { @@ -99,7 +99,7 @@ where .get_mut(node_id.index()) .expect("Node should be present"); let prev_view = node.current_view(); - node.step(elapsed); + node.step(step_time); let after_view = node.current_view(); if after_view > prev_view { // pass node to next step group diff --git a/simulations/src/runner/mod.rs b/simulations/src/runner/mod.rs index 3e8b576d..a70b0d7f 100644 --- a/simulations/src/runner/mod.rs +++ b/simulations/src/runner/mod.rs @@ -93,6 +93,7 @@ pub struct SimulationRunner { nodes: Arc>>>, runner_settings: RunnerSettings, producer: StreamProducer, + step_time: Duration, } impl SimulationRunner @@ -136,7 +137,7 @@ where views_count: _, leaders_count: _, network_settings: _, - step_time: _, + step_time, record_settings: _, } = settings; Ok(Self { @@ -148,24 +149,26 @@ where }, nodes, producer, + step_time, }) } pub fn simulate(self) -> anyhow::Result> { // init the start time let _ = *crate::START_TIME; + let step_time = self.step_time; match self.runner_settings.clone() { - RunnerSettings::Sync => sync_runner::simulate(self), - RunnerSettings::Async { chunks } => async_runner::simulate(self, chunks), + RunnerSettings::Sync => sync_runner::simulate(self, step_time), + RunnerSettings::Async { chunks } => async_runner::simulate(self, chunks, step_time), RunnerSettings::Glauber { maximum_iterations, update_rate, - } => glauber_runner::simulate(self, update_rate, maximum_iterations), + } => glauber_runner::simulate(self, update_rate, maximum_iterations, step_time), RunnerSettings::Layered { rounds_gap, distribution, - } => layered_runner::simulate(self, rounds_gap, distribution), + } => layered_runner::simulate(self, rounds_gap, distribution, step_time), } } } diff --git a/simulations/src/runner/sync_runner.rs b/simulations/src/runner/sync_runner.rs index 085b0477..5bd7863a 100644 --- a/simulations/src/runner/sync_runner.rs +++ b/simulations/src/runner/sync_runner.rs @@ -8,6 +8,7 @@ use std::time::Duration; /// Simulate with sending the network state to any subscriber pub fn simulate( runner: SimulationRunner, + step_time: Duration, ) -> anyhow::Result> where M: std::fmt::Debug + Send + Sync + Clone + 'static, @@ -29,7 +30,6 @@ where let (stop_tx, stop_rx) = bounded(1); let p = runner.producer.clone(); let p1 = runner.producer; - let elapsed = Duration::from_millis(100); let handle = std::thread::spawn(move || { p.send(R::try_from(&state)?)?; loop { @@ -43,7 +43,7 @@ where // then dead lock will occur { let mut nodes = nodes.write(); - inner_runner.step(&mut nodes, elapsed); + inner_runner.step(&mut nodes, step_time); } p.send(R::try_from(&state)?)?; @@ -116,6 +116,7 @@ mod tests { channel::unbounded(); let network_message_receiver = network.connect( *node_id, + 1, node_message_receiver, node_message_broadcast_receiver, ); diff --git a/simulations/src/settings.rs b/simulations/src/settings.rs index 030d9197..90d99cd5 100644 --- a/simulations/src/settings.rs +++ b/simulations/src/settings.rs @@ -37,6 +37,7 @@ pub struct TreeSettings { #[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct NodeSettings { + pub network_capacity_kbps: u32, #[serde(with = "humantime_serde")] pub timeout: std::time::Duration, }