From 901ebf415297e3c1c99e71af5a724ad8644818db Mon Sep 17 00:00:00 2001 From: gusto Date: Fri, 31 Mar 2023 12:48:06 +0300 Subject: [PATCH] Message sending between simulation nodes (#110) * Dummy node for simulations * Shared network state for nodes * Runner one step test * Beginning of network interface * Connect dummy node to network * Network step tests * Pop messages that are being sent * Regions send receive tests * Setup network in sync runner tests * Dispatch and collect node messages during sim step * Improve network interface receiver --- simulations/Cargo.toml | 1 + simulations/src/bin/app.rs | 10 +- simulations/src/network/behaviour.rs | 2 +- simulations/src/network/mod.rs | 266 ++++++++++++++++++++++- simulations/src/network/regions.rs | 2 +- simulations/src/node/carnot/mod.rs | 11 +- simulations/src/node/dummy.rs | 111 ++++++++++ simulations/src/node/mod.rs | 17 +- simulations/src/overlay/tree.rs | 13 +- simulations/src/runner/async_runner.rs | 5 +- simulations/src/runner/glauber_runner.rs | 5 +- simulations/src/runner/layered_runner.rs | 9 +- simulations/src/runner/mod.rs | 45 ++-- simulations/src/runner/sync_runner.rs | 90 +++++++- simulations/src/settings.rs | 2 +- 15 files changed, 525 insertions(+), 64 deletions(-) create mode 100644 simulations/src/node/dummy.rs diff --git a/simulations/Cargo.toml b/simulations/Cargo.toml index 1b941633..3a010183 100644 --- a/simulations/Cargo.toml +++ b/simulations/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] clap = { version = "4", features = ["derive"] } crc32fast = "1.3" +crossbeam = { version = "0.8.2", features = ["crossbeam-channel"] } fixed-slice-deque = "0.1.0-beta2" nomos-core = { path = "../nomos-core" } polars = { version = "0.27", features = ["serde", "object", "json", "csv-file", "parquet", "dtype-struct"] } diff --git a/simulations/src/bin/app.rs b/simulations/src/bin/app.rs index b7ab6ca7..7f98ff81 100644 --- a/simulations/src/bin/app.rs +++ b/simulations/src/bin/app.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; // std use std::error::Error; use std::fmt::{Display, Formatter}; @@ -11,6 +12,8 @@ use polars::io::SerWriter; use polars::prelude::{DataFrame, JsonReader, SerReader}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; +use simulations::network::regions::RegionsData; +use simulations::network::Network; use simulations::overlay::tree::TreeOverlay; // internal use simulations::{ @@ -94,9 +97,12 @@ impl SimulationApp { output_format, } = self; let simulation_settings: SimulationSettings<_, _> = load_json_from_file(&input_settings)?; + let nodes = vec![]; // TODO: Initialize nodes of different types. + let regions_data = RegionsData::new(HashMap::new(), HashMap::new()); + let network = Network::new(regions_data); - let mut simulation_runner: SimulationRunner = - SimulationRunner::new(simulation_settings); + let mut simulation_runner: SimulationRunner<(), CarnotNode, TreeOverlay> = + SimulationRunner::new(network, nodes, simulation_settings); // build up series vector let mut out_data: Vec = Vec::new(); simulation_runner.simulate(Some(&mut out_data)); diff --git a/simulations/src/network/behaviour.rs b/simulations/src/network/behaviour.rs index aa99ca65..577b50f6 100644 --- a/simulations/src/network/behaviour.rs +++ b/simulations/src/network/behaviour.rs @@ -5,7 +5,7 @@ use rand::Rng; use serde::{Deserialize, Serialize}; // internal -#[derive(Default, Debug, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct NetworkBehaviour { pub delay: Duration, pub drop: f64, diff --git a/simulations/src/network/mod.rs b/simulations/src/network/mod.rs index 3f65cd1c..4a7e2d06 100644 --- a/simulations/src/network/mod.rs +++ b/simulations/src/network/mod.rs @@ -1,6 +1,11 @@ // std -use std::time::Duration; +use std::{ + collections::HashMap, + ops::Add, + time::{Duration, Instant}, +}; // crates +use crossbeam::channel::{self, Receiver, Sender}; use rand::Rng; // internal use crate::node::NodeId; @@ -8,17 +13,31 @@ use crate::node::NodeId; pub mod behaviour; pub mod regions; -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct Network { +pub type NetworkTime = Instant; + +pub struct Network { pub regions: regions::RegionsData, + network_time: NetworkTime, + messages: Vec<(NetworkTime, NetworkMessage)>, + from_node_receivers: HashMap>>, + to_node_senders: HashMap>>, } -impl Network { +impl Network +where + M: Clone, +{ pub fn new(regions: regions::RegionsData) -> Self { - Self { regions } + Self { + regions, + network_time: Instant::now(), + messages: Vec::new(), + from_node_receivers: HashMap::new(), + to_node_senders: HashMap::new(), + } } - pub fn send_message_cost( + fn send_message_cost( &self, rng: &mut R, node_a: NodeId, @@ -29,4 +48,239 @@ impl Network { // TODO: use a delay range .then(|| network_behaviour.delay()) } + + pub fn connect( + &mut self, + node_id: NodeId, + node_message_receiver: Receiver>, + ) -> Receiver> { + let (to_node_sender, from_network_receiver) = channel::unbounded(); + self.from_node_receivers + .insert(node_id, node_message_receiver); + self.to_node_senders.insert(node_id, to_node_sender); + from_network_receiver + } + + /// Collects and dispatches messages to connected interfaces. + pub fn step(&mut self, rng: &mut R, time_passed: Duration) { + self.collect_messages(); + self.dispatch_after(rng, time_passed); + } + + /// Receive and store all messages from nodes. + pub fn collect_messages(&mut self) { + self.from_node_receivers.iter().for_each(|(_, from_node)| { + while let Ok(message) = from_node.try_recv() { + self.messages.push((self.network_time, message)); + } + }); + } + + /// Reiterate all messages and send to appropriate nodes if simulated + /// delay has passed. + pub fn dispatch_after(&mut self, rng: &mut R, time_passed: Duration) { + self.network_time += time_passed; + let mut delayed = vec![]; + while let Some((network_time, message)) = self.messages.pop() { + // TODO: Handle message drops (remove unwrap). + let delay = self + .send_message_cost(rng, message.from, message.to) + .unwrap(); + if network_time.add(delay) <= self.network_time { + let to_node = self.to_node_senders.get(&message.to).unwrap(); + to_node.send(message).expect("Node should have connection"); + } else { + delayed.push((network_time, message)); + } + } + self.messages = delayed; + } +} + +#[derive(Clone, Debug)] +pub struct NetworkMessage { + pub from: NodeId, + pub to: NodeId, + pub payload: M, +} + +impl NetworkMessage { + pub fn new(from: NodeId, to: NodeId, payload: M) -> Self { + Self { from, to, payload } + } +} + +pub trait NetworkInterface { + type Payload; + + fn send_message(&self, address: NodeId, message: Self::Payload); + fn receive_messages(&self) -> Vec>; +} + +#[cfg(test)] +mod tests { + use super::{ + behaviour::NetworkBehaviour, + regions::{Region, RegionsData}, + Network, NetworkInterface, NetworkMessage, + }; + use crate::node::NodeId; + use crossbeam::channel::{self, Receiver, Sender}; + use rand::rngs::mock::StepRng; + use std::{collections::HashMap, time::Duration}; + + struct MockNetworkInterface { + id: NodeId, + sender: Sender>, + receiver: Receiver>, + } + + impl MockNetworkInterface { + pub fn new( + id: NodeId, + sender: Sender>, + receiver: Receiver>, + ) -> Self { + Self { + id, + sender, + receiver, + } + } + } + + impl NetworkInterface for MockNetworkInterface { + type Payload = (); + + fn send_message(&self, address: NodeId, message: Self::Payload) { + let message = NetworkMessage::new(self.id, address, message); + self.sender.send(message).unwrap(); + } + + fn receive_messages(&self) -> Vec> { + self.receiver.try_iter().collect() + } + } + + #[test] + fn send_receive_messages() { + let mut rng = StepRng::new(1, 0); + let node_a = 0.into(); + let node_b = 1.into(); + + let regions = HashMap::from([(Region::Europe, vec![node_a, node_b])]); + let behaviour = HashMap::from([( + (Region::Europe, Region::Europe), + NetworkBehaviour::new(Duration::from_millis(100), 0.0), + )]); + let regions_data = RegionsData::new(regions, behaviour); + let mut network = Network::new(regions_data); + + let (from_a_sender, from_a_receiver) = channel::unbounded(); + let to_a_receiver = network.connect(node_a, from_a_receiver); + let a = MockNetworkInterface::new(node_a, from_a_sender, to_a_receiver); + + let (from_b_sender, from_b_receiver) = channel::unbounded(); + let to_b_receiver = network.connect(node_b, from_b_receiver); + let b = MockNetworkInterface::new(node_b, from_b_sender, to_b_receiver); + + a.send_message(node_b, ()); + network.collect_messages(); + + assert_eq!(a.receive_messages().len(), 0); + assert_eq!(b.receive_messages().len(), 0); + + network.step(&mut rng, Duration::from_millis(0)); + assert_eq!(a.receive_messages().len(), 0); + assert_eq!(b.receive_messages().len(), 0); + + network.step(&mut rng, Duration::from_millis(100)); + assert_eq!(a.receive_messages().len(), 0); + assert_eq!(b.receive_messages().len(), 1); + + network.step(&mut rng, Duration::from_millis(100)); + assert_eq!(a.receive_messages().len(), 0); + assert_eq!(b.receive_messages().len(), 0); + + b.send_message(node_a, ()); + b.send_message(node_a, ()); + b.send_message(node_a, ()); + network.collect_messages(); + + network.dispatch_after(&mut rng, Duration::from_millis(100)); + assert_eq!(a.receive_messages().len(), 3); + assert_eq!(b.receive_messages().len(), 0); + } + + #[test] + fn regions_send_receive_messages() { + let mut rng = StepRng::new(1, 0); + let node_a = 0.into(); + let node_b = 1.into(); + let node_c = 2.into(); + + let regions = HashMap::from([ + (Region::Asia, vec![node_a, node_b]), + (Region::Europe, vec![node_c]), + ]); + let behaviour = HashMap::from([ + ( + (Region::Asia, Region::Asia), + NetworkBehaviour::new(Duration::from_millis(100), 0.0), + ), + ( + (Region::Asia, Region::Europe), + NetworkBehaviour::new(Duration::from_millis(500), 0.0), + ), + ( + (Region::Europe, Region::Europe), + NetworkBehaviour::new(Duration::from_millis(100), 0.0), + ), + ]); + let regions_data = RegionsData::new(regions, behaviour); + let mut network = Network::new(regions_data); + + let (from_a_sender, from_a_receiver) = channel::unbounded(); + let to_a_receiver = network.connect(node_a, from_a_receiver); + let a = MockNetworkInterface::new(node_a, from_a_sender, to_a_receiver); + + let (from_b_sender, from_b_receiver) = channel::unbounded(); + let to_b_receiver = network.connect(node_b, from_b_receiver); + let b = MockNetworkInterface::new(node_b, from_b_sender, to_b_receiver); + + let (from_c_sender, from_c_receiver) = channel::unbounded(); + let to_c_receiver = network.connect(node_c, from_c_receiver); + let c = MockNetworkInterface::new(node_c, from_c_sender, to_c_receiver); + + a.send_message(node_b, ()); + a.send_message(node_c, ()); + network.collect_messages(); + + b.send_message(node_a, ()); + b.send_message(node_c, ()); + network.collect_messages(); + + c.send_message(node_a, ()); + c.send_message(node_b, ()); + network.collect_messages(); + + network.dispatch_after(&mut rng, Duration::from_millis(100)); + assert_eq!(a.receive_messages().len(), 1); + assert_eq!(b.receive_messages().len(), 1); + assert_eq!(c.receive_messages().len(), 0); + + a.send_message(node_b, ()); + b.send_message(node_c, ()); + network.collect_messages(); + + network.dispatch_after(&mut rng, Duration::from_millis(400)); + assert_eq!(a.receive_messages().len(), 1); // c to a + assert_eq!(b.receive_messages().len(), 2); // c to b && a to b + assert_eq!(c.receive_messages().len(), 2); // a to c && b to c + + network.dispatch_after(&mut rng, Duration::from_millis(100)); + assert_eq!(a.receive_messages().len(), 0); + assert_eq!(b.receive_messages().len(), 0); + assert_eq!(c.receive_messages().len(), 1); // b to c + } } diff --git a/simulations/src/network/regions.rs b/simulations/src/network/regions.rs index 5f9fc120..54e093f6 100644 --- a/simulations/src/network/regions.rs +++ b/simulations/src/network/regions.rs @@ -15,7 +15,7 @@ pub enum Region { Australia, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct RegionsData { pub regions: HashMap>, #[serde(skip)] diff --git a/simulations/src/node/carnot/mod.rs b/simulations/src/node/carnot/mod.rs index 7c594f95..df1d8b4f 100644 --- a/simulations/src/node/carnot/mod.rs +++ b/simulations/src/node/carnot/mod.rs @@ -1,9 +1,8 @@ // std // crates -use rand::Rng; use serde::Deserialize; // internal -use crate::node::{Node, NodeId}; +use super::{Node, NodeId}; #[derive(Default)] pub struct CarnotState {} @@ -22,14 +21,6 @@ impl Node for CarnotNode { type Settings = CarnotSettings; type State = CarnotState; - fn new(_rng: &mut R, id: NodeId, settings: Self::Settings) -> Self { - Self { - id, - state: Default::default(), - settings, - } - } - fn id(&self) -> NodeId { self.id } diff --git a/simulations/src/node/dummy.rs b/simulations/src/node/dummy.rs new file mode 100644 index 00000000..0c74ede9 --- /dev/null +++ b/simulations/src/node/dummy.rs @@ -0,0 +1,111 @@ +// std +// crates +use crossbeam::channel::{Receiver, Sender}; +use serde::Deserialize; +// internal +use crate::{ + network::{NetworkInterface, NetworkMessage}, + node::{Node, NodeId}, +}; + +use super::{NetworkState, SharedState}; + +#[derive(Debug, Default)] +pub struct DummyState { + pub current_view: usize, +} + +#[derive(Clone, Default, Deserialize)] +pub struct DummySettings {} + +#[derive(Clone)] +pub enum DummyMessage { + EventOne(usize), + EventTwo(usize), +} + +pub struct DummyNode { + node_id: NodeId, + state: DummyState, + _settings: DummySettings, + _network_state: SharedState, + network_interface: DummyNetworkInterface, +} + +impl DummyNode { + pub fn new( + node_id: NodeId, + _network_state: SharedState, + network_interface: DummyNetworkInterface, + ) -> Self { + Self { + node_id, + state: DummyState { current_view: 0 }, + _settings: DummySettings {}, + _network_state, + network_interface, + } + } +} + +impl Node for DummyNode { + type Settings = DummySettings; + type State = DummyState; + + fn id(&self) -> NodeId { + self.node_id + } + + fn current_view(&self) -> usize { + self.state.current_view + } + + fn state(&self) -> &DummyState { + &self.state + } + + fn step(&mut self) { + let incoming_messages = self.network_interface.receive_messages(); + self.state.current_view += 1; + + for message in incoming_messages { + match message.payload { + DummyMessage::EventOne(_) => todo!(), + DummyMessage::EventTwo(_) => todo!(), + } + } + } +} + +pub struct DummyNetworkInterface { + id: NodeId, + sender: Sender>, + receiver: Receiver>, +} + +impl DummyNetworkInterface { + pub fn new( + id: NodeId, + sender: Sender>, + receiver: Receiver>, + ) -> Self { + Self { + id, + sender, + receiver, + } + } +} + +impl NetworkInterface for DummyNetworkInterface { + type Payload = DummyMessage; + + fn send_message(&self, address: NodeId, message: Self::Payload) { + let message = NetworkMessage::new(self.id, address, message); + self.sender.send(message).unwrap(); + } + + fn receive_messages(&self) -> Vec> { + self.receiver.try_iter().collect() + } +} diff --git a/simulations/src/node/mod.rs b/simulations/src/node/mod.rs index 9d27296c..05e4ab99 100644 --- a/simulations/src/node/mod.rs +++ b/simulations/src/node/mod.rs @@ -1,14 +1,16 @@ pub mod carnot; +pub mod dummy; // std use std::{ ops::{Deref, DerefMut}, + sync::{Arc, RwLock}, time::Duration, }; // crates -use rand::Rng; use serde::{Deserialize, Serialize}; // internal +use crate::overlay::Layout; #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] #[serde(transparent)] @@ -114,10 +116,17 @@ impl core::iter::Sum for Duration { } } +/// A state that represents how nodes are interconnected in the network. +pub struct NetworkState { + pub layout: Layout, +} + +pub type SharedState = Arc>; + pub trait Node { type Settings; type State; - fn new(rng: &mut R, id: NodeId, settings: Self::Settings) -> Self; + fn id(&self) -> NodeId; // TODO: View must be view whenever we integrate consensus engine fn current_view(&self) -> usize; @@ -130,10 +139,6 @@ impl Node for usize { type Settings = (); type State = Self; - fn new(_rng: &mut R, id: NodeId, _settings: Self::Settings) -> Self { - id.inner() - } - fn id(&self) -> NodeId { (*self).into() } diff --git a/simulations/src/overlay/tree.rs b/simulations/src/overlay/tree.rs index da3e3ff5..e9ddbcdf 100644 --- a/simulations/src/overlay/tree.rs +++ b/simulations/src/overlay/tree.rs @@ -7,8 +7,9 @@ use serde::Deserialize; use super::{Committee, Layout, Overlay}; use crate::node::{CommitteeId, NodeId}; -#[derive(Clone, Deserialize)] +#[derive(Clone, Default, Deserialize)] pub enum TreeType { + #[default] FullBinaryTree, } @@ -19,6 +20,16 @@ pub struct TreeSettings { pub depth: usize, } +impl Default for TreeSettings { + fn default() -> Self { + Self { + tree_type: TreeType::default(), + committee_size: 1, + depth: 1, + } + } +} + pub struct TreeOverlay { settings: TreeSettings, } diff --git a/simulations/src/runner/async_runner.rs b/simulations/src/runner/async_runner.rs index f9660b7a..e48cd074 100644 --- a/simulations/src/runner/async_runner.rs +++ b/simulations/src/runner/async_runner.rs @@ -8,11 +8,12 @@ use rayon::prelude::*; use std::collections::HashSet; use std::sync::Arc; -pub fn simulate( - runner: &mut SimulationRunner, +pub fn simulate( + runner: &mut SimulationRunner, chunk_size: usize, mut out_data: Option<&mut Vec>, ) where + M: Clone, N::Settings: Clone, N: Send + Sync, O::Settings: Clone, diff --git a/simulations/src/runner/glauber_runner.rs b/simulations/src/runner/glauber_runner.rs index 1cb5bb4b..863d0e01 100644 --- a/simulations/src/runner/glauber_runner.rs +++ b/simulations/src/runner/glauber_runner.rs @@ -8,12 +8,13 @@ use std::collections::BTreeSet; use std::sync::Arc; /// [Glauber dynamics simulation](https://en.wikipedia.org/wiki/Glauber_dynamics) -pub fn simulate( - runner: &mut SimulationRunner, +pub fn simulate( + runner: &mut SimulationRunner, update_rate: usize, maximum_iterations: usize, mut out_data: Option<&mut Vec>, ) where + M: Clone, N: Send + Sync, N::Settings: Clone, O::Settings: Clone, diff --git a/simulations/src/runner/layered_runner.rs b/simulations/src/runner/layered_runner.rs index 482bed7f..99d98e1a 100644 --- a/simulations/src/runner/layered_runner.rs +++ b/simulations/src/runner/layered_runner.rs @@ -42,12 +42,13 @@ use crate::overlay::Overlay; use crate::runner::SimulationRunner; use crate::warding::SimulationState; -pub fn simulate( - runner: &mut SimulationRunner, +pub fn simulate( + runner: &mut SimulationRunner, gap: usize, distribution: Option>, mut out_data: Option<&mut Vec>, ) where + M: Clone, N: Send + Sync, N::Settings: Clone, O::Settings: Clone, @@ -129,9 +130,9 @@ fn choose_random_layer_and_node_id( (i, *node_id) } -fn build_node_ids_deque( +fn build_node_ids_deque( gap: usize, - runner: &SimulationRunner, + runner: &SimulationRunner, ) -> FixedSliceDeque> where N: Node, diff --git a/simulations/src/runner/mod.rs b/simulations/src/runner/mod.rs index 7705afc0..a386d0a2 100644 --- a/simulations/src/runner/mod.rs +++ b/simulations/src/runner/mod.rs @@ -3,10 +3,13 @@ mod glauber_runner; mod layered_runner; mod sync_runner; -use std::marker::PhantomData; // std +use std::marker::PhantomData; + use std::sync::{Arc, RwLock}; +use std::time::Duration; // crates +use crate::network::Network; use rand::rngs::SmallRng; use rand::{RngCore, SeedableRng}; use rayon::prelude::*; @@ -19,61 +22,48 @@ use crate::warding::{SimulationState, SimulationWard}; /// Encapsulation solution for the simulations runner /// Holds the network state, the simulating nodes and the simulation settings. -pub struct SimulationRunner +pub struct SimulationRunner where N: Node, O: Overlay, { nodes: Arc>>, + network: Network, settings: SimulationSettings, rng: SmallRng, _overlay: PhantomData, } -impl SimulationRunner +impl SimulationRunner where + M: Clone, N: Send + Sync, N::Settings: Clone, O::Settings: Clone, { - pub fn new(settings: SimulationSettings) -> Self { + pub fn new( + network: Network, + nodes: Vec, + settings: SimulationSettings, + ) -> Self { let seed = settings .seed .unwrap_or_else(|| rand::thread_rng().next_u64()); println!("Seed: {seed}"); - let mut rng = SmallRng::seed_from_u64(seed); - let overlay = O::new(settings.overlay_settings.clone()); - let nodes = Self::nodes_from_initial_settings(&settings, overlay, &mut rng); - + let rng = SmallRng::seed_from_u64(seed); let nodes = Arc::new(RwLock::new(nodes)); Self { nodes, + network, settings, rng, _overlay: Default::default(), } } - /// Initialize nodes from settings and calculate initial network state. - fn nodes_from_initial_settings( - settings: &SimulationSettings, - _overlay: O, // TODO: attach overlay information to nodes - seed: &mut SmallRng, - ) -> Vec { - let SimulationSettings { - node_settings, - node_count, - .. - } = settings; - - (0..*node_count) - .map(|id| N::new(seed, id.into(), node_settings.clone())) - .collect() - } - pub fn simulate(&mut self, out_data: Option<&mut Vec>) { match self.settings.runner_settings.clone() { RunnerSettings::Sync => { @@ -100,7 +90,7 @@ where fn dump_state_to_out_data( &self, _simulation_state: &SimulationState, - _out_ata: &mut Option<&mut Vec>, + _out_data: &mut Option<&mut Vec>, ) { todo!("What data do we want to expose?") } @@ -114,6 +104,8 @@ where } fn step(&mut self) { + self.network + .dispatch_after(&mut self.rng, Duration::from_millis(100)); self.nodes .write() .expect("Single access to nodes vector") @@ -121,5 +113,6 @@ where .for_each(|node| { node.step(); }); + self.network.collect_messages(); } } diff --git a/simulations/src/runner/sync_runner.rs b/simulations/src/runner/sync_runner.rs index d82de1e8..9103f452 100644 --- a/simulations/src/runner/sync_runner.rs +++ b/simulations/src/runner/sync_runner.rs @@ -6,10 +6,11 @@ use crate::warding::SimulationState; use std::sync::Arc; /// Simulate with option of dumping the network state as a `::polars::Series` -pub fn simulate( - runner: &mut SimulationRunner, +pub fn simulate( + runner: &mut SimulationRunner, mut out_data: Option<&mut Vec>, ) where + M: Clone, N: Send + Sync, N::Settings: Clone, O::Settings: Clone, @@ -29,3 +30,88 @@ pub fn simulate( } } } + +#[cfg(test)] +mod tests { + use crate::{ + network::{ + behaviour::NetworkBehaviour, + regions::{Region, RegionsData}, + Network, + }, + node::{ + dummy::{DummyMessage, DummyNetworkInterface, DummyNode, DummySettings}, + NetworkState, Node, NodeId, SharedState, + }, + overlay::{ + tree::{TreeOverlay, TreeSettings}, + Overlay, + }, + runner::SimulationRunner, + settings::SimulationSettings, + }; + use crossbeam::channel; + use rand::rngs::mock::StepRng; + use std::{ + collections::HashMap, + sync::{Arc, RwLock}, + time::Duration, + }; + + fn init_network(node_ids: &[NodeId]) -> Network { + let regions = HashMap::from([(Region::Europe, node_ids.to_vec())]); + let behaviour = HashMap::from([( + (Region::Europe, Region::Europe), + NetworkBehaviour::new(Duration::from_millis(100), 0.0), + )]); + let regions_data = RegionsData::new(regions, behaviour); + Network::new(regions_data) + } + + fn init_dummy_nodes( + node_ids: &[NodeId], + network: &mut Network, + network_state: SharedState, + ) -> Vec { + node_ids + .iter() + .map(|node_id| { + let (node_message_sender, node_message_receiver) = channel::unbounded(); + let network_message_receiver = network.connect(*node_id, node_message_receiver); + let network_interface = DummyNetworkInterface::new( + *node_id, + node_message_sender, + network_message_receiver, + ); + DummyNode::new(*node_id, network_state.clone(), network_interface) + }) + .collect() + } + + #[test] + fn runner_one_step() { + let settings: SimulationSettings = SimulationSettings { + node_count: 10, + committee_size: 1, + ..Default::default() + }; + + let mut rng = StepRng::new(1, 0); + let node_ids: Vec = (0..settings.node_count).map(Into::into).collect(); + let overlay = TreeOverlay::new(settings.overlay_settings.clone()); + let mut network = init_network(&node_ids); + let network_state = Arc::new(RwLock::new(NetworkState { + layout: overlay.layout(&node_ids, &mut rng), + })); + let nodes = init_dummy_nodes(&node_ids, &mut network, network_state); + + let mut runner: SimulationRunner = + SimulationRunner::new(network, nodes, settings); + runner.step(); + let nodes = runner.nodes.read().unwrap(); + + for node in nodes.iter() { + assert_eq!(node.current_view(), 1); + } + } +} diff --git a/simulations/src/settings.rs b/simulations/src/settings.rs index 09f247bd..ea673b84 100644 --- a/simulations/src/settings.rs +++ b/simulations/src/settings.rs @@ -21,7 +21,7 @@ pub enum RunnerSettings { }, } -#[derive(Deserialize)] +#[derive(Default, Deserialize)] pub struct SimulationSettings { pub network_behaviors: HashMap<(Region, Region), StepTime>, pub regions: Vec,