From 9f71dbb24c1e83f0c47f0c842fd50a3d0eb5ebeb Mon Sep 17 00:00:00 2001 From: gusto Date: Tue, 18 Jul 2023 15:01:29 +0300 Subject: [PATCH] Simulation network broadcast fix (#262) * Replace network broadcast msg type to a dedicated channel * Update tests with broadcast chan * Replace threadrng with seedable smallrng * Simplify the broadcast loop --- simulations/src/bin/app/main.rs | 11 +- simulations/src/network/mod.rs | 164 ++++++++++-------- simulations/src/node/carnot/mod.rs | 8 +- simulations/src/node/carnot/tally.rs | 2 +- simulations/src/node/dummy.rs | 25 +-- simulations/src/runner/sync_runner.rs | 11 +- simulations/src/streaming/io.rs | 103 +++++------ simulations/src/streaming/naive.rs | 103 +++++------ .../src/streaming/runtime_subscriber.rs | 103 +++++------ .../src/streaming/settings_subscriber.rs | 103 +++++------ 10 files changed, 337 insertions(+), 296 deletions(-) diff --git a/simulations/src/bin/app/main.rs b/simulations/src/bin/app/main.rs index 9a77c161..fa106c69 100644 --- a/simulations/src/bin/app/main.rs +++ b/simulations/src/bin/app/main.rs @@ -72,15 +72,22 @@ impl SimulationApp { let regions_data = RegionsData::new(regions, behaviours); let ids = node_ids.clone(); - let mut network = Network::new(regions_data); + let mut network = Network::new(regions_data, seed); let nodes: Vec> = node_ids .iter() .copied() .map(|node_id| { + let (node_message_broadcast_sender, node_message_broadcast_receiver) = + channel::unbounded(); let (node_message_sender, node_message_receiver) = channel::unbounded(); - let network_message_receiver = network.connect(node_id, node_message_receiver); + let network_message_receiver = network.connect( + node_id, + node_message_receiver, + node_message_broadcast_receiver, + ); let network_interface = InMemoryNetworkInterface::new( node_id, + node_message_broadcast_sender, node_message_sender, network_message_receiver, ); diff --git a/simulations/src/network/mod.rs b/simulations/src/network/mod.rs index ea7a45d4..d9c4352f 100644 --- a/simulations/src/network/mod.rs +++ b/simulations/src/network/mod.rs @@ -7,7 +7,7 @@ use std::{ }; // crates use crossbeam::channel::{self, Receiver, Sender}; -use rand::{rngs::ThreadRng, Rng}; +use rand::{rngs::SmallRng, Rng, SeedableRng}; use rayon::prelude::*; use serde::{Deserialize, Serialize}; // internal @@ -106,20 +106,24 @@ pub struct Network { network_time: NetworkTime, messages: Vec<(NetworkTime, NetworkMessage)>, from_node_receivers: HashMap>>, + from_node_broadcast_receivers: HashMap>>, to_node_senders: HashMap>>, + seed: u64, } impl Network where M: Send + Sync + Clone, { - pub fn new(regions: regions::RegionsData) -> Self { + pub fn new(regions: regions::RegionsData, seed: u64) -> Self { Self { regions, network_time: Instant::now(), messages: Vec::new(), from_node_receivers: HashMap::new(), + from_node_broadcast_receivers: HashMap::new(), to_node_senders: HashMap::new(), + seed, } } @@ -139,10 +143,13 @@ where &mut self, node_id: NodeId, node_message_receiver: Receiver>, + node_message_broadcast_receiver: Receiver>, ) -> Receiver> { let (to_node_sender, from_network_receiver) = channel::unbounded(); self.from_node_receivers .insert(node_id, node_message_receiver); + self.from_node_broadcast_receivers + .insert(node_id, node_message_broadcast_receiver); self.to_node_senders.insert(node_id, to_node_sender); from_network_receiver } @@ -155,7 +162,7 @@ where /// Receive and store all messages from nodes. pub fn collect_messages(&mut self) { - let mut new_messages = self + let mut adhoc_messages = self .from_node_receivers .par_iter() .flat_map(|(_, from_node)| { @@ -165,8 +172,23 @@ where .collect::>() }) .collect(); + self.messages.append(&mut adhoc_messages); - self.messages.append(&mut new_messages); + let mut broadcast_messages = self + .from_node_broadcast_receivers + .iter() + .flat_map(|(_, from_node)| { + from_node.try_iter().flat_map(|msg| { + self.to_node_senders.keys().map(move |recipient| { + let mut m = msg.clone(); + m.to = Some(*recipient); + m + }) + }) + }) + .map(|m| (self.network_time, m)) + .collect::>(); + self.messages.append(&mut broadcast_messages); } /// Reiterate all messages and send to appropriate nodes if simulated @@ -178,7 +200,7 @@ where .messages .par_iter() .filter(|(network_time, message)| { - let mut rng = ThreadRng::default(); + let mut rng = SmallRng::seed_from_u64(self.seed); self.send_or_drop_message(&mut rng, network_time, message) }) .cloned() @@ -194,35 +216,12 @@ where network_time: &NetworkTime, message: &NetworkMessage, ) -> bool { - match message { - NetworkMessage::Adhoc(msg) => { - let recipient = msg.to.expect("Adhoc message has recipient"); - let to_node = self.to_node_senders.get(&recipient).unwrap(); - self.send_delayed(rng, recipient, to_node, network_time, msg) - } - NetworkMessage::Broadcast(msg) => { - let mut adhoc = msg.clone(); - for (recipient, to_node) in self.to_node_senders.iter() { - adhoc.to = Some(*recipient); - self.send_delayed(rng, *recipient, to_node, network_time, &adhoc); - } - false - } - } - } - - fn send_delayed( - &self, - rng: &mut R, - to: NodeId, - to_node: &Sender>, - network_time: &NetworkTime, - msg: &AdhocMessage, - ) -> bool { - if let Some(delay) = self.send_message_cost(rng, msg.from, to) { + let to = message.to.expect("adhoc message has recipient"); + if let Some(delay) = self.send_message_cost(rng, message.from, to) { if network_time.add(delay) <= self.network_time { + let to_node = self.to_node_senders.get(&to).unwrap(); to_node - .send(NetworkMessage::Adhoc(msg.clone())) + .send(message.clone()) .expect("Node should have connection"); return false; } else { @@ -234,40 +233,19 @@ where } #[derive(Clone, Debug)] -pub struct AdhocMessage { +pub struct NetworkMessage { pub from: NodeId, pub to: Option, pub payload: M, } -#[derive(Clone, Debug)] -pub enum NetworkMessage { - Adhoc(AdhocMessage), - Broadcast(AdhocMessage), -} - impl NetworkMessage { - pub fn adhoc(from: NodeId, to: NodeId, payload: M) -> Self { - Self::Adhoc(AdhocMessage { - from, - to: Some(to), - payload, - }) - } - - pub fn broadcast(from: NodeId, payload: M) -> Self { - Self::Broadcast(AdhocMessage { - from, - to: None, - payload, - }) + pub fn new(from: NodeId, to: Option, payload: M) -> Self { + Self { from, to, payload } } pub fn get_payload(self) -> M { - match self { - NetworkMessage::Adhoc(AdhocMessage { payload, .. }) => payload, - NetworkMessage::Broadcast(AdhocMessage { payload, .. }) => payload, - } + self.payload } } @@ -281,6 +259,7 @@ pub trait NetworkInterface { pub struct InMemoryNetworkInterface { id: NodeId, + broadcast: Sender>, sender: Sender>, receiver: Receiver>, } @@ -288,11 +267,13 @@ pub struct InMemoryNetworkInterface { impl InMemoryNetworkInterface { pub fn new( id: NodeId, + broadcast: Sender>, sender: Sender>, receiver: Receiver>, ) -> Self { Self { id, + broadcast, sender, receiver, } @@ -303,12 +284,12 @@ impl NetworkInterface for InMemoryNetworkInterface { type Payload = M; fn broadcast(&self, message: Self::Payload) { - let message = NetworkMessage::broadcast(self.id, message); - self.sender.send(message).unwrap(); + let message = NetworkMessage::new(self.id, None, message); + self.broadcast.send(message).unwrap(); } fn send_message(&self, address: NodeId, message: Self::Payload) { - let message = NetworkMessage::adhoc(self.id, address, message); + let message = NetworkMessage::new(self.id, Some(address), message); self.sender.send(message).unwrap(); } @@ -333,6 +314,7 @@ mod tests { struct MockNetworkInterface { id: NodeId, + broadcast: Sender>, sender: Sender>, receiver: Receiver>, } @@ -340,11 +322,13 @@ mod tests { impl MockNetworkInterface { pub fn new( id: NodeId, + broadcast: Sender>, sender: Sender>, receiver: Receiver>, ) -> Self { Self { id, + broadcast, sender, receiver, } @@ -355,12 +339,12 @@ mod tests { type Payload = (); fn broadcast(&self, message: Self::Payload) { - let message = NetworkMessage::broadcast(self.id, message); - self.sender.send(message).unwrap(); + let message = NetworkMessage::new(self.id, None, message); + self.broadcast.send(message).unwrap(); } fn send_message(&self, address: NodeId, message: Self::Payload) { - let message = NetworkMessage::adhoc(self.id, address, message); + let message = NetworkMessage::new(self.id, Some(address), message); self.sender.send(message).unwrap(); } @@ -380,15 +364,27 @@ mod tests { NetworkBehaviour::new(Duration::from_millis(100), 0.0), )]); let regions_data = RegionsData::new(regions, behaviour); - let mut network = Network::new(regions_data); + let mut network = Network::new(regions_data, 0); let (from_a_sender, from_a_receiver) = channel::unbounded(); - let to_a_receiver = network.connect(node_a, from_a_receiver); - let a = MockNetworkInterface::new(node_a, from_a_sender, to_a_receiver); + let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded(); + let to_a_receiver = network.connect(node_a, from_a_receiver, from_a_broadcast_receiver); + let a = MockNetworkInterface::new( + node_a, + from_a_broadcast_sender, + from_a_sender, + to_a_receiver, + ); let (from_b_sender, from_b_receiver) = channel::unbounded(); - let to_b_receiver = network.connect(node_b, from_b_receiver); - let b = MockNetworkInterface::new(node_b, from_b_sender, to_b_receiver); + let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded(); + let to_b_receiver = network.connect(node_b, from_b_receiver, from_b_broadcast_receiver); + let b = MockNetworkInterface::new( + node_b, + from_b_broadcast_sender, + from_b_sender, + to_b_receiver, + ); a.send_message(node_b, ()); network.collect_messages(); @@ -443,19 +439,37 @@ mod tests { ), ]); let regions_data = RegionsData::new(regions, behaviour); - let mut network = Network::new(regions_data); + let mut network = Network::new(regions_data, 0); let (from_a_sender, from_a_receiver) = channel::unbounded(); - let to_a_receiver = network.connect(node_a, from_a_receiver); - let a = MockNetworkInterface::new(node_a, from_a_sender, to_a_receiver); + let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded(); + let to_a_receiver = network.connect(node_a, from_a_receiver, from_a_broadcast_receiver); + let a = MockNetworkInterface::new( + node_a, + from_a_broadcast_sender, + from_a_sender, + to_a_receiver, + ); let (from_b_sender, from_b_receiver) = channel::unbounded(); - let to_b_receiver = network.connect(node_b, from_b_receiver); - let b = MockNetworkInterface::new(node_b, from_b_sender, to_b_receiver); + let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded(); + let to_b_receiver = network.connect(node_b, from_b_receiver, from_b_broadcast_receiver); + let b = MockNetworkInterface::new( + node_b, + from_b_broadcast_sender, + from_b_sender, + to_b_receiver, + ); let (from_c_sender, from_c_receiver) = channel::unbounded(); - let to_c_receiver = network.connect(node_c, from_c_receiver); - let c = MockNetworkInterface::new(node_c, from_c_sender, to_c_receiver); + let (from_c_broadcast_sender, from_c_broadcast_receiver) = channel::unbounded(); + let to_c_receiver = network.connect(node_c, from_c_receiver, from_c_broadcast_receiver); + let c = MockNetworkInterface::new( + node_c, + from_c_broadcast_sender, + from_c_sender, + to_c_receiver, + ); a.send_message(node_b, ()); a.send_message(node_c, ()); diff --git a/simulations/src/node/carnot/mod.rs b/simulations/src/node/carnot/mod.rs index 035ec25b..41eee861 100644 --- a/simulations/src/node/carnot/mod.rs +++ b/simulations/src/node/carnot/mod.rs @@ -291,13 +291,11 @@ impl CarnotNode { } } Output::BroadcastTimeoutQc { timeout_qc } => { - self.network_interface.send_message( - self.id, - CarnotMessage::TimeoutQc(TimeoutQcMsg { + self.network_interface + .broadcast(CarnotMessage::TimeoutQc(TimeoutQcMsg { source: self.id, qc: timeout_qc, - }), - ); + })); } Output::BroadcastProposal { proposal } => { self.network_interface diff --git a/simulations/src/node/carnot/tally.rs b/simulations/src/node/carnot/tally.rs index 16b50eee..00bc5324 100644 --- a/simulations/src/node/carnot/tally.rs +++ b/simulations/src/node/carnot/tally.rs @@ -8,7 +8,7 @@ pub(crate) struct Tally { impl Default for Tally { fn default() -> Self { - Self::new(0) + Self::new(2) } } diff --git a/simulations/src/node/dummy.rs b/simulations/src/node/dummy.rs index 20b39e4e..a3cdbbee 100644 --- a/simulations/src/node/dummy.rs +++ b/simulations/src/node/dummy.rs @@ -326,13 +326,9 @@ impl DummyNode { } fn handle_message(&mut self, message: &NetworkMessage) { - let payload = match message { - NetworkMessage::Adhoc(m) => m.payload.clone(), - NetworkMessage::Broadcast(m) => m.payload.clone(), - }; // The view can change on any message, node needs to change its position // and roles if the view changes during the message processing. - if let DummyMessage::Proposal(block) = &payload { + if let DummyMessage::Proposal(block) = &message.payload { if block.view > self.current_view() { self.update_view(block.view); } @@ -341,10 +337,10 @@ impl DummyNode { for role in roles.iter() { match role { - DummyRole::Leader => self.handle_leader(&payload), - DummyRole::Root => self.handle_root(&payload), - DummyRole::Internal => self.handle_internal(&payload), - DummyRole::Leaf => self.handle_leaf(&payload), + DummyRole::Leader => self.handle_leader(&message.payload), + DummyRole::Root => self.handle_root(&message.payload), + DummyRole::Internal => self.handle_internal(&message.payload), + DummyRole::Leaf => self.handle_leaf(&message.payload), DummyRole::Unknown => (), } } @@ -461,7 +457,7 @@ mod tests { NetworkBehaviour::new(Duration::from_millis(100), 0.0), )]); let regions_data = RegionsData::new(regions, behaviour); - Network::new(regions_data) + Network::new(regions_data, 0) } fn init_dummy_nodes( @@ -473,9 +469,16 @@ mod tests { .iter() .map(|node_id| { let (node_message_sender, node_message_receiver) = channel::unbounded(); - let network_message_receiver = network.connect(*node_id, node_message_receiver); + let (node_message_broadcast_sender, node_message_broadcast_receiver) = + channel::unbounded(); + let network_message_receiver = network.connect( + *node_id, + node_message_receiver, + node_message_broadcast_receiver, + ); let network_interface = InMemoryNetworkInterface::new( *node_id, + node_message_broadcast_sender, node_message_sender, network_message_receiver, ); diff --git a/simulations/src/runner/sync_runner.rs b/simulations/src/runner/sync_runner.rs index b1e471a1..1814ee25 100644 --- a/simulations/src/runner/sync_runner.rs +++ b/simulations/src/runner/sync_runner.rs @@ -99,7 +99,7 @@ mod tests { NetworkBehaviour::new(Duration::from_millis(100), 0.0), )]); let regions_data = RegionsData::new(regions, behaviour); - Network::new(regions_data) + Network::new(regions_data, 0) } fn init_dummy_nodes( @@ -111,9 +111,16 @@ mod tests { .iter() .map(|node_id| { let (node_message_sender, node_message_receiver) = channel::unbounded(); - let network_message_receiver = network.connect(*node_id, node_message_receiver); + let (node_message_broadcast_sender, node_message_broadcast_receiver) = + channel::unbounded(); + let network_message_receiver = network.connect( + *node_id, + node_message_receiver, + node_message_broadcast_receiver, + ); let network_interface = InMemoryNetworkInterface::new( *node_id, + node_message_broadcast_sender, node_message_sender, network_message_receiver, ); diff --git a/simulations/src/streaming/io.rs b/simulations/src/streaming/io.rs index 87db5cc8..028fec64 100644 --- a/simulations/src/streaming/io.rs +++ b/simulations/src/streaming/io.rs @@ -164,56 +164,59 @@ mod tests { > }) .collect::>(); - let network = Network::new(RegionsData { - regions: (0..6) - .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; - (region, vec![NodeId::from_index(idx)]) - }) - .collect(), - node_region: (0..6) - .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; - (NodeId::from_index(idx), region) - }) - .collect(), - region_network_behaviour: (0..6) - .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; - ( - NetworkBehaviourKey::new(region, region), - NetworkBehaviour { - delay: Duration::from_millis(100), - drop: 0.0, - }, - ) - }) - .collect(), - }); + let network = Network::new( + RegionsData { + regions: (0..6) + .map(|idx| { + let region = match idx % 6 { + 0 => Region::Europe, + 1 => Region::NorthAmerica, + 2 => Region::SouthAmerica, + 3 => Region::Asia, + 4 => Region::Africa, + 5 => Region::Australia, + _ => unreachable!(), + }; + (region, vec![NodeId::from_index(idx)]) + }) + .collect(), + node_region: (0..6) + .map(|idx| { + let region = match idx % 6 { + 0 => Region::Europe, + 1 => Region::NorthAmerica, + 2 => Region::SouthAmerica, + 3 => Region::Asia, + 4 => Region::Africa, + 5 => Region::Australia, + _ => unreachable!(), + }; + (NodeId::from_index(idx), region) + }) + .collect(), + region_network_behaviour: (0..6) + .map(|idx| { + let region = match idx % 6 { + 0 => Region::Europe, + 1 => Region::NorthAmerica, + 2 => Region::SouthAmerica, + 3 => Region::Asia, + 4 => Region::Africa, + 5 => Region::Australia, + _ => unreachable!(), + }; + ( + NetworkBehaviourKey::new(region, region), + NetworkBehaviour { + delay: Duration::from_millis(100), + drop: 0.0, + }, + ) + }) + .collect(), + }, + 0, + ); let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> = SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap(); simulation_runner diff --git a/simulations/src/streaming/naive.rs b/simulations/src/streaming/naive.rs index d37d22c0..d4847b4a 100644 --- a/simulations/src/streaming/naive.rs +++ b/simulations/src/streaming/naive.rs @@ -169,56 +169,59 @@ mod tests { > }) .collect::>(); - let network = Network::new(RegionsData { - regions: (0..6) - .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; - (region, vec![NodeId::from_index(idx)]) - }) - .collect(), - node_region: (0..6) - .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; - (NodeId::from_index(idx), region) - }) - .collect(), - region_network_behaviour: (0..6) - .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; - ( - NetworkBehaviourKey::new(region, region), - NetworkBehaviour { - delay: Duration::from_millis(100), - drop: 0.0, - }, - ) - }) - .collect(), - }); + let network = Network::new( + RegionsData { + regions: (0..6) + .map(|idx| { + let region = match idx % 6 { + 0 => Region::Europe, + 1 => Region::NorthAmerica, + 2 => Region::SouthAmerica, + 3 => Region::Asia, + 4 => Region::Africa, + 5 => Region::Australia, + _ => unreachable!(), + }; + (region, vec![NodeId::from_index(idx)]) + }) + .collect(), + node_region: (0..6) + .map(|idx| { + let region = match idx % 6 { + 0 => Region::Europe, + 1 => Region::NorthAmerica, + 2 => Region::SouthAmerica, + 3 => Region::Asia, + 4 => Region::Africa, + 5 => Region::Australia, + _ => unreachable!(), + }; + (NodeId::from_index(idx), region) + }) + .collect(), + region_network_behaviour: (0..6) + .map(|idx| { + let region = match idx % 6 { + 0 => Region::Europe, + 1 => Region::NorthAmerica, + 2 => Region::SouthAmerica, + 3 => Region::Asia, + 4 => Region::Africa, + 5 => Region::Australia, + _ => unreachable!(), + }; + ( + NetworkBehaviourKey::new(region, region), + NetworkBehaviour { + delay: Duration::from_millis(100), + drop: 0.0, + }, + ) + }) + .collect(), + }, + 0, + ); let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> = SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap(); simulation_runner.simulate().unwrap(); diff --git a/simulations/src/streaming/runtime_subscriber.rs b/simulations/src/streaming/runtime_subscriber.rs index 4d69c2c7..1d3f00ef 100644 --- a/simulations/src/streaming/runtime_subscriber.rs +++ b/simulations/src/streaming/runtime_subscriber.rs @@ -154,56 +154,59 @@ mod tests { > }) .collect::>(); - let network = Network::new(RegionsData { - regions: (0..6) - .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; - (region, vec![NodeId::from_index(idx)]) - }) - .collect(), - node_region: (0..6) - .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; - (NodeId::from_index(idx), region) - }) - .collect(), - region_network_behaviour: (0..6) - .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; - ( - NetworkBehaviourKey::new(region, region), - NetworkBehaviour { - delay: Duration::from_millis(100), - drop: 0.0, - }, - ) - }) - .collect(), - }); + let network = Network::new( + RegionsData { + regions: (0..6) + .map(|idx| { + let region = match idx % 6 { + 0 => Region::Europe, + 1 => Region::NorthAmerica, + 2 => Region::SouthAmerica, + 3 => Region::Asia, + 4 => Region::Africa, + 5 => Region::Australia, + _ => unreachable!(), + }; + (region, vec![NodeId::from_index(idx)]) + }) + .collect(), + node_region: (0..6) + .map(|idx| { + let region = match idx % 6 { + 0 => Region::Europe, + 1 => Region::NorthAmerica, + 2 => Region::SouthAmerica, + 3 => Region::Asia, + 4 => Region::Africa, + 5 => Region::Australia, + _ => unreachable!(), + }; + (NodeId::from_index(idx), region) + }) + .collect(), + region_network_behaviour: (0..6) + .map(|idx| { + let region = match idx % 6 { + 0 => Region::Europe, + 1 => Region::NorthAmerica, + 2 => Region::SouthAmerica, + 3 => Region::Asia, + 4 => Region::Africa, + 5 => Region::Australia, + _ => unreachable!(), + }; + ( + NetworkBehaviourKey::new(region, region), + NetworkBehaviour { + delay: Duration::from_millis(100), + drop: 0.0, + }, + ) + }) + .collect(), + }, + 0, + ); let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> = SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap(); simulation_runner.simulate().unwrap(); diff --git a/simulations/src/streaming/settings_subscriber.rs b/simulations/src/streaming/settings_subscriber.rs index f76a8c37..37b2fc66 100644 --- a/simulations/src/streaming/settings_subscriber.rs +++ b/simulations/src/streaming/settings_subscriber.rs @@ -154,56 +154,59 @@ mod tests { > }) .collect::>(); - let network = Network::new(RegionsData { - regions: (0..6) - .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; - (region, vec![NodeId::from_index(idx)]) - }) - .collect(), - node_region: (0..6) - .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; - (NodeId::from_index(idx), region) - }) - .collect(), - region_network_behaviour: (0..6) - .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; - ( - NetworkBehaviourKey::new(region, region), - NetworkBehaviour { - delay: Duration::from_millis(100), - drop: 0.0, - }, - ) - }) - .collect(), - }); + let network = Network::new( + RegionsData { + regions: (0..6) + .map(|idx| { + let region = match idx % 6 { + 0 => Region::Europe, + 1 => Region::NorthAmerica, + 2 => Region::SouthAmerica, + 3 => Region::Asia, + 4 => Region::Africa, + 5 => Region::Australia, + _ => unreachable!(), + }; + (region, vec![NodeId::from_index(idx)]) + }) + .collect(), + node_region: (0..6) + .map(|idx| { + let region = match idx % 6 { + 0 => Region::Europe, + 1 => Region::NorthAmerica, + 2 => Region::SouthAmerica, + 3 => Region::Asia, + 4 => Region::Africa, + 5 => Region::Australia, + _ => unreachable!(), + }; + (NodeId::from_index(idx), region) + }) + .collect(), + region_network_behaviour: (0..6) + .map(|idx| { + let region = match idx % 6 { + 0 => Region::Europe, + 1 => Region::NorthAmerica, + 2 => Region::SouthAmerica, + 3 => Region::Asia, + 4 => Region::Africa, + 5 => Region::Australia, + _ => unreachable!(), + }; + ( + NetworkBehaviourKey::new(region, region), + NetworkBehaviour { + delay: Duration::from_millis(100), + drop: 0.0, + }, + ) + }) + .collect(), + }, + 0, + ); let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> = SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap(); simulation_runner.simulate().unwrap();