diff --git a/consensus-engine/src/overlay/tree_overlay/tree.rs b/consensus-engine/src/overlay/tree_overlay/tree.rs index 6210faf8..c11fee73 100644 --- a/consensus-engine/src/overlay/tree_overlay/tree.rs +++ b/consensus-engine/src/overlay/tree_overlay/tree.rs @@ -92,7 +92,10 @@ impl Tree { let Some(base) = self .committee_id_to_index .get(committee_id) - .map(|&idx| idx * 2) else { return (None, None); }; + .map(|&idx| idx * 2) + else { + return (None, None); + }; let first_child = base + 1; let second_child = base + 2; ( diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index bd9ab0b1..3f0b8a66 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -227,12 +227,15 @@ where let network_adapter = adapter.clone(); task_manager.push(genesis_block.view.next(), async move { let Event::Approve { qc, .. } = Self::gather_votes( - network_adapter, - leader_committee.clone(), - genesis_block, - leader_tally_settings.clone(), - ) - .await else { unreachable!() }; + network_adapter, + leader_committee.clone(), + genesis_block, + leader_tally_settings.clone(), + ) + .await + else { + unreachable!() + }; Event::ProposeBlock { qc } }); } @@ -442,13 +445,12 @@ where if carnot.is_next_leader() { task_manager.push(block.view, async move { - let Event::Approve { qc, .. } = Self::gather_votes( - adapter, - leader_committee, - block, - leader_tally_settings, - ) - .await else { unreachable!() }; + let Event::Approve { qc, .. } = + Self::gather_votes(adapter, leader_committee, block, leader_tally_settings) + .await + else { + unreachable!() + }; Event::ProposeBlock { qc } }); } diff --git a/simulations/src/bin/app/main.rs b/simulations/src/bin/app/main.rs index 9f78854e..508f0bb2 100644 --- a/simulations/src/bin/app/main.rs +++ b/simulations/src/bin/app/main.rs @@ -82,12 +82,13 @@ impl SimulationApp { let (node_message_sender, node_message_receiver) = channel::unbounded(); // Dividing milliseconds in second by milliseconds in the step. let step_time_as_second_fraction = - 1_000_000 / simulation_settings.step_time.subsec_millis(); - let capacity_bps = simulation_settings.node_settings.network_capacity_kbps * 1024 - / step_time_as_second_fraction; + simulation_settings.step_time.subsec_millis() as f32 / 1_000_000_f32; + let capacity_bps = simulation_settings.node_settings.network_capacity_kbps as f32 + * 1024.0 + * step_time_as_second_fraction; let network_message_receiver = network.connect( node_id, - capacity_bps, + capacity_bps as u32, node_message_receiver, node_message_broadcast_receiver, ); diff --git a/simulations/src/network/mod.rs b/simulations/src/network/mod.rs index d3d55b49..52fada45 100644 --- a/simulations/src/network/mod.rs +++ b/simulations/src/network/mod.rs @@ -3,7 +3,10 @@ use std::{ collections::HashMap, ops::Add, str::FromStr, - sync::atomic::{AtomicU32, Ordering}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, time::{Duration, Instant}, }; // crates @@ -267,21 +270,48 @@ where let to = message.to.expect("adhoc message has recipient"); if let Some(delay) = self.send_message_cost(rng, message.from, to) { let node_capacity = self.node_network_capacity.get(&to).unwrap(); - if network_time.add(delay) <= self.network_time - && node_capacity.increase_load(message.size_bytes) - { + let should_delay = network_time.add(delay) <= self.network_time; + let remaining_size = message.remaining_size(); + if should_delay && node_capacity.increase_load(remaining_size) { let to_node = self.to_node_senders.get(&to).unwrap(); to_node .send(message.clone()) .expect("node should have connection"); - node_capacity.decrease_load(message.size_bytes); + node_capacity.decrease_load(remaining_size); return false; } else { + // if we do not need to delay, then we should check if the msg is too large + // if so, we mock the partial sending message behavior + if should_delay { + // if remaining is 0, we should send without delay + return self.try_partial_send(node_capacity, message, &to) != 0; + } return true; } } false } + + /// Try to apply partial send logic, returns the remaining size of the message + fn try_partial_send( + &self, + node_capacity: &NodeNetworkCapacity, + message: &NetworkMessage, + to: &NodeId, + ) -> u32 { + let mut cap = node_capacity.current_load.lock(); + let sent = node_capacity.capacity_bps - *cap; + *cap = node_capacity.capacity_bps; + let remaining = message.partial_send(sent); + if remaining == 0 { + let to_node = self.to_node_senders.get(to).unwrap(); + to_node + .send(message.clone()) + .expect("node should have connection"); + node_capacity.decrease_load(sent); + } + remaining + } } #[derive(Clone, Debug)] @@ -289,7 +319,7 @@ pub struct NetworkMessage { pub from: NodeId, pub to: Option, pub payload: M, - pub size_bytes: u32, + pub remaining: Arc, } impl NetworkMessage { @@ -298,13 +328,28 @@ impl NetworkMessage { from, to, payload, - size_bytes, + remaining: Arc::new(AtomicU32::new(size_bytes)), } } - pub fn get_payload(self) -> M { + pub fn payload(&self) -> &M { + &self.payload + } + + pub fn into_payload(self) -> M { self.payload } + + fn remaining_size(&self) -> u32 { + self.remaining.load(Ordering::SeqCst) + } + + /// Mock the partial sending of a message behavior, returning the remaining message size. + fn partial_send(&self, size: u32) -> u32 { + self.remaining + .fetch_sub(size, Ordering::SeqCst) + .saturating_sub(size) + } } pub trait PayloadSize { diff --git a/simulations/src/network/regions.rs b/simulations/src/network/regions.rs index 0e12e06e..217c6eec 100644 --- a/simulations/src/network/regions.rs +++ b/simulations/src/network/regions.rs @@ -208,7 +208,7 @@ mod tests { .map(NodeId::from_index) .collect::>(); - let available_regions = vec![ + let available_regions = [ Region::NorthAmerica, Region::Europe, Region::Asia, diff --git a/simulations/src/node/carnot/message_cache.rs b/simulations/src/node/carnot/message_cache.rs index ac8eb5ee..602d110f 100644 --- a/simulations/src/node/carnot/message_cache.rs +++ b/simulations/src/node/carnot/message_cache.rs @@ -1,6 +1,6 @@ use crate::node::carnot::messages::CarnotMessage; use consensus_engine::View; -use polars::export::ahash::HashMap; +use std::collections::HashMap; pub(crate) struct MessageCache { cache: HashMap>, diff --git a/simulations/src/node/carnot/mod.rs b/simulations/src/node/carnot/mod.rs index 5b6484a5..89bb2264 100644 --- a/simulations/src/node/carnot/mod.rs +++ b/simulations/src/node/carnot/mod.rs @@ -505,7 +505,7 @@ impl> Node for Car .network_interface .receive_messages() .into_iter() - .map(NetworkMessage::get_payload) + .map(NetworkMessage::into_payload) .partition(|m| { m.view() == self.engine.current_view() || matches!(m, CarnotMessage::Proposal(_) | CarnotMessage::TimeoutQc(_))