Optional network capacity in simulations (#483)

* Make simulation net capacity optional

* Add cli flag to disable netcap
This commit is contained in:
gusto 2023-10-30 05:26:45 +01:00 committed by GitHub
parent 61ff62cb29
commit d479721efd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 113 additions and 37 deletions

View File

@ -46,6 +46,8 @@ pub struct SimulationApp {
log_to: log::LogOutput,
#[clap(long)]
dump_overlay_info: bool,
#[clap(long)]
no_netcap: bool,
}
impl SimulationApp {
@ -56,6 +58,7 @@ impl SimulationApp {
log_format: _,
log_to: _,
dump_overlay_info,
no_netcap,
} = self;
let simulation_settings: SimulationSettings = load_json_from_file(&input_settings)?;
@ -99,14 +102,19 @@ impl SimulationApp {
// Dividing milliseconds in second by milliseconds in the step.
let step_time_as_second_fraction =
simulation_settings.step_time.subsec_millis() as f32 / 1_000_000_f32;
let capacity_bps = simulation_settings.node_settings.network_capacity_kbps as f32
* 1024.0
* step_time_as_second_fraction;
let capacity_bps = if no_netcap {
None
} else {
simulation_settings
.node_settings
.network_capacity_kbps
.map(|c| (c as f32 * 1024.0 * step_time_as_second_fraction) as u32)
};
let network_message_receiver = {
let mut network = network.lock();
network.connect(
node_id,
capacity_bps as u32,
capacity_bps,
node_message_receiver,
node_message_broadcast_receiver,
)

View File

@ -109,13 +109,13 @@ mod network_behaviors_serde {
/// Represents node network capacity and current load in bytes.
#[derive(Debug)]
struct NodeNetworkCapacity {
capacity_bps: u32,
capacity_bps: Option<u32>,
current_load: Mutex<u32>,
load_to_flush: AtomicU32,
}
impl NodeNetworkCapacity {
fn new(capacity_bps: u32) -> Self {
fn new(capacity_bps: Option<u32>) -> Self {
Self {
capacity_bps,
current_load: Mutex::new(0),
@ -124,12 +124,16 @@ impl NodeNetworkCapacity {
}
fn increase_load(&self, load: u32) -> bool {
let mut current_load = self.current_load.lock();
if *current_load + load <= self.capacity_bps {
*current_load += load;
true
if let Some(capacity_bps) = self.capacity_bps {
let mut current_load = self.current_load.lock();
if *current_load + load <= capacity_bps {
*current_load += load;
true
} else {
false
}
} else {
false
true
}
}
@ -138,6 +142,10 @@ impl NodeNetworkCapacity {
}
fn flush_load(&self) {
if self.capacity_bps.is_none() {
return;
}
let mut s = self.current_load.lock();
*s -= self.load_to_flush.load(Ordering::Relaxed);
self.load_to_flush.store(0, Ordering::Relaxed);
@ -188,7 +196,7 @@ where
pub fn connect(
&mut self,
node_id: NodeId,
capacity_bps: u32,
capacity_bps: Option<u32>,
node_message_receiver: Receiver<NetworkMessage<M>>,
node_message_broadcast_receiver: Receiver<NetworkMessage<M>>,
) -> Receiver<NetworkMessage<M>> {
@ -301,20 +309,24 @@ where
message: &NetworkMessage<M>,
to: &NodeId,
) -> u32 {
let mut cap = node_capacity.current_load.lock();
let sent = node_capacity.capacity_bps - *cap;
*cap = node_capacity.capacity_bps;
let remaining = message.partial_send(sent);
// Message is partially sent, the node capacity needs to be flushed at the end of step even
// if the whole message is not sent.
node_capacity.decrease_load(sent);
if remaining == 0 {
let to_node = self.to_node_senders.get(to).unwrap();
to_node
.send(message.clone())
.expect("node should have connection");
if let Some(capacity_bps) = node_capacity.capacity_bps {
let mut cap = node_capacity.current_load.lock();
let sent = capacity_bps - *cap;
*cap = capacity_bps;
let remaining = message.partial_send(sent);
// Message is partially sent, the node capacity needs to be flushed at the end of step even
// if the whole message is not sent.
node_capacity.decrease_load(sent);
if remaining == 0 {
let to_node = self.to_node_senders.get(to).unwrap();
to_node
.send(message.clone())
.expect("node should have connection");
}
remaining
} else {
0
}
remaining
}
}
@ -484,7 +496,8 @@ mod tests {
let (from_a_sender, from_a_receiver) = channel::unbounded();
let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded();
let to_a_receiver = network.connect(node_a, 3, from_a_receiver, from_a_broadcast_receiver);
let to_a_receiver =
network.connect(node_a, Some(3), from_a_receiver, from_a_broadcast_receiver);
let a = MockNetworkInterface::new(
node_a,
from_a_broadcast_sender,
@ -495,7 +508,8 @@ mod tests {
let (from_b_sender, from_b_receiver) = channel::unbounded();
let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded();
let to_b_receiver = network.connect(node_b, 3, from_b_receiver, from_b_broadcast_receiver);
let to_b_receiver =
network.connect(node_b, Some(3), from_b_receiver, from_b_broadcast_receiver);
let b = MockNetworkInterface::new(
node_b,
from_b_broadcast_sender,
@ -561,7 +575,8 @@ mod tests {
let (from_a_sender, from_a_receiver) = channel::unbounded();
let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded();
let to_a_receiver = network.connect(node_a, 2, from_a_receiver, from_a_broadcast_receiver);
let to_a_receiver =
network.connect(node_a, Some(2), from_a_receiver, from_a_broadcast_receiver);
let a = MockNetworkInterface::new(
node_a,
from_a_broadcast_sender,
@ -572,7 +587,8 @@ mod tests {
let (from_b_sender, from_b_receiver) = channel::unbounded();
let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded();
let to_b_receiver = network.connect(node_b, 2, from_b_receiver, from_b_broadcast_receiver);
let to_b_receiver =
network.connect(node_b, Some(2), from_b_receiver, from_b_broadcast_receiver);
let b = MockNetworkInterface::new(
node_b,
from_b_broadcast_sender,
@ -583,7 +599,8 @@ mod tests {
let (from_c_sender, from_c_receiver) = channel::unbounded();
let (from_c_broadcast_sender, from_c_broadcast_receiver) = channel::unbounded();
let to_c_receiver = network.connect(node_c, 2, from_c_receiver, from_c_broadcast_receiver);
let to_c_receiver =
network.connect(node_c, Some(2), from_c_receiver, from_c_broadcast_receiver);
let c = MockNetworkInterface::new(
node_c,
from_c_broadcast_sender,
@ -639,7 +656,8 @@ mod tests {
let (from_a_sender, from_a_receiver) = channel::unbounded();
let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded();
let to_a_receiver = network.connect(node_a, 3, from_a_receiver, from_a_broadcast_receiver);
let to_a_receiver =
network.connect(node_a, Some(3), from_a_receiver, from_a_broadcast_receiver);
let a = MockNetworkInterface::new(
node_a,
from_a_broadcast_sender,
@ -650,7 +668,8 @@ mod tests {
let (from_b_sender, from_b_receiver) = channel::unbounded();
let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded();
let to_b_receiver = network.connect(node_b, 2, from_b_receiver, from_b_broadcast_receiver);
let to_b_receiver =
network.connect(node_b, Some(2), from_b_receiver, from_b_broadcast_receiver);
let b = MockNetworkInterface::new(
node_b,
from_b_broadcast_sender,
@ -677,6 +696,53 @@ mod tests {
assert_eq!(b.receive_messages().len(), 2);
}
#[test]
fn node_network_capacity_no_limit() {
let node_a = NodeId::from_index(0);
let node_b = NodeId::from_index(1);
let regions = HashMap::from([(Region::Europe, vec![node_a, node_b])]);
let behaviour = HashMap::from([(
NetworkBehaviourKey::new(Region::Europe, Region::Europe),
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
)]);
let regions_data = RegionsData::new(regions, behaviour);
let mut network = Network::new(regions_data, 0);
let (from_a_sender, from_a_receiver) = channel::unbounded();
let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded();
let to_a_receiver =
network.connect(node_a, None, from_a_receiver, from_a_broadcast_receiver);
let a = MockNetworkInterface::new(
node_a,
from_a_broadcast_sender,
from_a_sender,
to_a_receiver,
1000,
);
let (from_b_sender, from_b_receiver) = channel::unbounded();
let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded();
let to_b_receiver =
network.connect(node_b, None, from_b_receiver, from_b_broadcast_receiver);
let b = MockNetworkInterface::new(
node_b,
from_b_broadcast_sender,
from_b_sender,
to_b_receiver,
100,
);
for _ in 0..6 {
a.send_message(node_b, ());
b.send_message(node_a, ());
}
network.step(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 6);
assert_eq!(b.receive_messages().len(), 6);
}
#[test]
fn node_network_message_partial_send() {
let node_a = NodeId::from_index(0);
@ -694,7 +760,8 @@ mod tests {
let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded();
// Node A is connected to the network with throuput of 5.
let to_a_receiver = network.connect(node_a, 5, from_a_receiver, from_a_broadcast_receiver);
let to_a_receiver =
network.connect(node_a, Some(5), from_a_receiver, from_a_broadcast_receiver);
// Every message sent **from** Node A will be of size 15.
let a = MockNetworkInterface::new(
@ -709,7 +776,8 @@ mod tests {
let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded();
// Node B is connected to the network with throuput of 1.
let to_b_receiver = network.connect(node_b, 1, from_b_receiver, from_b_broadcast_receiver);
let to_b_receiver =
network.connect(node_b, Some(1), from_b_receiver, from_b_broadcast_receiver);
// Every message sent **from** Node B will be of size 2.
let b = MockNetworkInterface::new(

View File

@ -482,7 +482,7 @@ mod tests {
channel::unbounded();
let network_message_receiver = network.connect(
*node_id,
0,
None,
node_message_receiver,
node_message_broadcast_receiver,
);

View File

@ -116,7 +116,7 @@ mod tests {
channel::unbounded();
let network_message_receiver = network.connect(
*node_id,
1,
Some(1),
node_message_receiver,
node_message_broadcast_receiver,
);

View File

@ -43,7 +43,7 @@ pub struct BranchSettings {
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct NodeSettings {
pub network_capacity_kbps: u32,
pub network_capacity_kbps: Option<u32>,
#[serde(with = "humantime_serde")]
pub timeout: std::time::Duration,
}