fix large msg sending logic (#274)

* fix large msg sending logic
This commit is contained in:
Al Liu 2023-08-03 20:05:43 +08:00 committed by GitHub
parent fa8e1025f5
commit c16b794517
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 80 additions and 29 deletions

View File

@ -92,7 +92,10 @@ impl Tree {
let Some(base) = self let Some(base) = self
.committee_id_to_index .committee_id_to_index
.get(committee_id) .get(committee_id)
.map(|&idx| idx * 2) else { return (None, None); }; .map(|&idx| idx * 2)
else {
return (None, None);
};
let first_child = base + 1; let first_child = base + 1;
let second_child = base + 2; let second_child = base + 2;
( (

View File

@ -227,12 +227,15 @@ where
let network_adapter = adapter.clone(); let network_adapter = adapter.clone();
task_manager.push(genesis_block.view.next(), async move { task_manager.push(genesis_block.view.next(), async move {
let Event::Approve { qc, .. } = Self::gather_votes( let Event::Approve { qc, .. } = Self::gather_votes(
network_adapter, network_adapter,
leader_committee.clone(), leader_committee.clone(),
genesis_block, genesis_block,
leader_tally_settings.clone(), leader_tally_settings.clone(),
) )
.await else { unreachable!() }; .await
else {
unreachable!()
};
Event::ProposeBlock { qc } Event::ProposeBlock { qc }
}); });
} }
@ -442,13 +445,12 @@ where
if carnot.is_next_leader() { if carnot.is_next_leader() {
task_manager.push(block.view, async move { task_manager.push(block.view, async move {
let Event::Approve { qc, .. } = Self::gather_votes( let Event::Approve { qc, .. } =
adapter, Self::gather_votes(adapter, leader_committee, block, leader_tally_settings)
leader_committee, .await
block, else {
leader_tally_settings, unreachable!()
) };
.await else { unreachable!() };
Event::ProposeBlock { qc } Event::ProposeBlock { qc }
}); });
} }

View File

@ -82,12 +82,13 @@ impl SimulationApp {
let (node_message_sender, node_message_receiver) = channel::unbounded(); let (node_message_sender, node_message_receiver) = channel::unbounded();
// Dividing milliseconds in second by milliseconds in the step. // Dividing milliseconds in second by milliseconds in the step.
let step_time_as_second_fraction = let step_time_as_second_fraction =
1_000_000 / simulation_settings.step_time.subsec_millis(); simulation_settings.step_time.subsec_millis() as f32 / 1_000_000_f32;
let capacity_bps = simulation_settings.node_settings.network_capacity_kbps * 1024 let capacity_bps = simulation_settings.node_settings.network_capacity_kbps as f32
/ step_time_as_second_fraction; * 1024.0
* step_time_as_second_fraction;
let network_message_receiver = network.connect( let network_message_receiver = network.connect(
node_id, node_id,
capacity_bps, capacity_bps as u32,
node_message_receiver, node_message_receiver,
node_message_broadcast_receiver, node_message_broadcast_receiver,
); );

View File

@ -3,7 +3,10 @@ use std::{
collections::HashMap, collections::HashMap,
ops::Add, ops::Add,
str::FromStr, str::FromStr,
sync::atomic::{AtomicU32, Ordering}, sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
// crates // crates
@ -267,21 +270,48 @@ where
let to = message.to.expect("adhoc message has recipient"); let to = message.to.expect("adhoc message has recipient");
if let Some(delay) = self.send_message_cost(rng, message.from, to) { if let Some(delay) = self.send_message_cost(rng, message.from, to) {
let node_capacity = self.node_network_capacity.get(&to).unwrap(); let node_capacity = self.node_network_capacity.get(&to).unwrap();
if network_time.add(delay) <= self.network_time let should_delay = network_time.add(delay) <= self.network_time;
&& node_capacity.increase_load(message.size_bytes) let remaining_size = message.remaining_size();
{ if should_delay && node_capacity.increase_load(remaining_size) {
let to_node = self.to_node_senders.get(&to).unwrap(); let to_node = self.to_node_senders.get(&to).unwrap();
to_node to_node
.send(message.clone()) .send(message.clone())
.expect("node should have connection"); .expect("node should have connection");
node_capacity.decrease_load(message.size_bytes); node_capacity.decrease_load(remaining_size);
return false; return false;
} else { } else {
// if we do not need to delay, then we should check if the msg is too large
// if so, we mock the partial sending message behavior
if should_delay {
// if remaining is 0, we should send without delay
return self.try_partial_send(node_capacity, message, &to) != 0;
}
return true; return true;
} }
} }
false false
} }
/// Try to apply partial send logic, returns the remaining size of the message
fn try_partial_send(
&self,
node_capacity: &NodeNetworkCapacity,
message: &NetworkMessage<M>,
to: &NodeId,
) -> u32 {
let mut cap = node_capacity.current_load.lock();
let sent = node_capacity.capacity_bps - *cap;
*cap = node_capacity.capacity_bps;
let remaining = message.partial_send(sent);
if remaining == 0 {
let to_node = self.to_node_senders.get(to).unwrap();
to_node
.send(message.clone())
.expect("node should have connection");
node_capacity.decrease_load(sent);
}
remaining
}
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -289,7 +319,7 @@ pub struct NetworkMessage<M> {
pub from: NodeId, pub from: NodeId,
pub to: Option<NodeId>, pub to: Option<NodeId>,
pub payload: M, pub payload: M,
pub size_bytes: u32, pub remaining: Arc<AtomicU32>,
} }
impl<M> NetworkMessage<M> { impl<M> NetworkMessage<M> {
@ -298,13 +328,28 @@ impl<M> NetworkMessage<M> {
from, from,
to, to,
payload, payload,
size_bytes, remaining: Arc::new(AtomicU32::new(size_bytes)),
} }
} }
pub fn get_payload(self) -> M { pub fn payload(&self) -> &M {
&self.payload
}
pub fn into_payload(self) -> M {
self.payload self.payload
} }
fn remaining_size(&self) -> u32 {
self.remaining.load(Ordering::SeqCst)
}
/// Mock the partial sending of a message behavior, returning the remaining message size.
fn partial_send(&self, size: u32) -> u32 {
self.remaining
.fetch_sub(size, Ordering::SeqCst)
.saturating_sub(size)
}
} }
pub trait PayloadSize { pub trait PayloadSize {

View File

@ -208,7 +208,7 @@ mod tests {
.map(NodeId::from_index) .map(NodeId::from_index)
.collect::<Vec<NodeId>>(); .collect::<Vec<NodeId>>();
let available_regions = vec![ let available_regions = [
Region::NorthAmerica, Region::NorthAmerica,
Region::Europe, Region::Europe,
Region::Asia, Region::Asia,

View File

@ -1,6 +1,6 @@
use crate::node::carnot::messages::CarnotMessage; use crate::node::carnot::messages::CarnotMessage;
use consensus_engine::View; use consensus_engine::View;
use polars::export::ahash::HashMap; use std::collections::HashMap;
pub(crate) struct MessageCache { pub(crate) struct MessageCache {
cache: HashMap<View, Vec<CarnotMessage>>, cache: HashMap<View, Vec<CarnotMessage>>,

View File

@ -505,7 +505,7 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
.network_interface .network_interface
.receive_messages() .receive_messages()
.into_iter() .into_iter()
.map(NetworkMessage::get_payload) .map(NetworkMessage::into_payload)
.partition(|m| { .partition(|m| {
m.view() == self.engine.current_view() m.view() == self.engine.current_view()
|| matches!(m, CarnotMessage::Proposal(_) | CarnotMessage::TimeoutQc(_)) || matches!(m, CarnotMessage::Proposal(_) | CarnotMessage::TimeoutQc(_))