Message sending between simulation nodes (#110)

* Dummy node for simulations

* Shared network state for nodes

* Runner one step test

* Beginning of network interface

* Connect dummy node to network

* Network step tests

* Pop messages that are being sent

* Regions send receive tests

* Setup network in sync runner tests

* Dispatch and collect node messages during sim step

* Improve network interface receiver
This commit is contained in:
gusto 2023-03-31 12:48:06 +03:00 committed by GitHub
parent c882a58286
commit 901ebf4152
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 525 additions and 64 deletions

View File

@ -8,6 +8,7 @@ edition = "2021"
[dependencies] [dependencies]
clap = { version = "4", features = ["derive"] } clap = { version = "4", features = ["derive"] }
crc32fast = "1.3" crc32fast = "1.3"
crossbeam = { version = "0.8.2", features = ["crossbeam-channel"] }
fixed-slice-deque = "0.1.0-beta2" fixed-slice-deque = "0.1.0-beta2"
nomos-core = { path = "../nomos-core" } nomos-core = { path = "../nomos-core" }
polars = { version = "0.27", features = ["serde", "object", "json", "csv-file", "parquet", "dtype-struct"] } polars = { version = "0.27", features = ["serde", "object", "json", "csv-file", "parquet", "dtype-struct"] }

View File

@ -1,3 +1,4 @@
use std::collections::HashMap;
// std // std
use std::error::Error; use std::error::Error;
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
@ -11,6 +12,8 @@ use polars::io::SerWriter;
use polars::prelude::{DataFrame, JsonReader, SerReader}; use polars::prelude::{DataFrame, JsonReader, SerReader};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use simulations::network::regions::RegionsData;
use simulations::network::Network;
use simulations::overlay::tree::TreeOverlay; use simulations::overlay::tree::TreeOverlay;
// internal // internal
use simulations::{ use simulations::{
@ -94,9 +97,12 @@ impl SimulationApp {
output_format, output_format,
} = self; } = self;
let simulation_settings: SimulationSettings<_, _> = load_json_from_file(&input_settings)?; let simulation_settings: SimulationSettings<_, _> = load_json_from_file(&input_settings)?;
let nodes = vec![]; // TODO: Initialize nodes of different types.
let regions_data = RegionsData::new(HashMap::new(), HashMap::new());
let network = Network::new(regions_data);
let mut simulation_runner: SimulationRunner<CarnotNode, TreeOverlay> = let mut simulation_runner: SimulationRunner<(), CarnotNode, TreeOverlay> =
SimulationRunner::new(simulation_settings); SimulationRunner::new(network, nodes, simulation_settings);
// build up series vector // build up series vector
let mut out_data: Vec<OutData> = Vec::new(); let mut out_data: Vec<OutData> = Vec::new();
simulation_runner.simulate(Some(&mut out_data)); simulation_runner.simulate(Some(&mut out_data));

View File

@ -5,7 +5,7 @@ use rand::Rng;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
// internal // internal
#[derive(Default, Debug, Serialize, Deserialize)] #[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub struct NetworkBehaviour { pub struct NetworkBehaviour {
pub delay: Duration, pub delay: Duration,
pub drop: f64, pub drop: f64,

View File

@ -1,6 +1,11 @@
// std // std
use std::time::Duration; use std::{
collections::HashMap,
ops::Add,
time::{Duration, Instant},
};
// crates // crates
use crossbeam::channel::{self, Receiver, Sender};
use rand::Rng; use rand::Rng;
// internal // internal
use crate::node::NodeId; use crate::node::NodeId;
@ -8,17 +13,31 @@ use crate::node::NodeId;
pub mod behaviour; pub mod behaviour;
pub mod regions; pub mod regions;
#[derive(Debug, serde::Serialize, serde::Deserialize)] pub type NetworkTime = Instant;
pub struct Network {
pub struct Network<M> {
pub regions: regions::RegionsData, pub regions: regions::RegionsData,
network_time: NetworkTime,
messages: Vec<(NetworkTime, NetworkMessage<M>)>,
from_node_receivers: HashMap<NodeId, Receiver<NetworkMessage<M>>>,
to_node_senders: HashMap<NodeId, Sender<NetworkMessage<M>>>,
} }
impl Network { impl<M> Network<M>
where
M: Clone,
{
pub fn new(regions: regions::RegionsData) -> Self { pub fn new(regions: regions::RegionsData) -> Self {
Self { regions } Self {
regions,
network_time: Instant::now(),
messages: Vec::new(),
from_node_receivers: HashMap::new(),
to_node_senders: HashMap::new(),
}
} }
pub fn send_message_cost<R: Rng>( fn send_message_cost<R: Rng>(
&self, &self,
rng: &mut R, rng: &mut R,
node_a: NodeId, node_a: NodeId,
@ -29,4 +48,239 @@ impl Network {
// TODO: use a delay range // TODO: use a delay range
.then(|| network_behaviour.delay()) .then(|| network_behaviour.delay())
} }
pub fn connect(
&mut self,
node_id: NodeId,
node_message_receiver: Receiver<NetworkMessage<M>>,
) -> Receiver<NetworkMessage<M>> {
let (to_node_sender, from_network_receiver) = channel::unbounded();
self.from_node_receivers
.insert(node_id, node_message_receiver);
self.to_node_senders.insert(node_id, to_node_sender);
from_network_receiver
}
/// Collects and dispatches messages to connected interfaces.
pub fn step<R: Rng>(&mut self, rng: &mut R, time_passed: Duration) {
self.collect_messages();
self.dispatch_after(rng, time_passed);
}
/// Receive and store all messages from nodes.
pub fn collect_messages(&mut self) {
self.from_node_receivers.iter().for_each(|(_, from_node)| {
while let Ok(message) = from_node.try_recv() {
self.messages.push((self.network_time, message));
}
});
}
/// Reiterate all messages and send to appropriate nodes if simulated
/// delay has passed.
pub fn dispatch_after<R: Rng>(&mut self, rng: &mut R, time_passed: Duration) {
self.network_time += time_passed;
let mut delayed = vec![];
while let Some((network_time, message)) = self.messages.pop() {
// TODO: Handle message drops (remove unwrap).
let delay = self
.send_message_cost(rng, message.from, message.to)
.unwrap();
if network_time.add(delay) <= self.network_time {
let to_node = self.to_node_senders.get(&message.to).unwrap();
to_node.send(message).expect("Node should have connection");
} else {
delayed.push((network_time, message));
}
}
self.messages = delayed;
}
}
#[derive(Clone, Debug)]
pub struct NetworkMessage<M> {
pub from: NodeId,
pub to: NodeId,
pub payload: M,
}
impl<M> NetworkMessage<M> {
pub fn new(from: NodeId, to: NodeId, payload: M) -> Self {
Self { from, to, payload }
}
}
pub trait NetworkInterface {
type Payload;
fn send_message(&self, address: NodeId, message: Self::Payload);
fn receive_messages(&self) -> Vec<NetworkMessage<Self::Payload>>;
}
#[cfg(test)]
mod tests {
use super::{
behaviour::NetworkBehaviour,
regions::{Region, RegionsData},
Network, NetworkInterface, NetworkMessage,
};
use crate::node::NodeId;
use crossbeam::channel::{self, Receiver, Sender};
use rand::rngs::mock::StepRng;
use std::{collections::HashMap, time::Duration};
struct MockNetworkInterface {
id: NodeId,
sender: Sender<NetworkMessage<()>>,
receiver: Receiver<NetworkMessage<()>>,
}
impl MockNetworkInterface {
pub fn new(
id: NodeId,
sender: Sender<NetworkMessage<()>>,
receiver: Receiver<NetworkMessage<()>>,
) -> Self {
Self {
id,
sender,
receiver,
}
}
}
impl NetworkInterface for MockNetworkInterface {
type Payload = ();
fn send_message(&self, address: NodeId, message: Self::Payload) {
let message = NetworkMessage::new(self.id, address, message);
self.sender.send(message).unwrap();
}
fn receive_messages(&self) -> Vec<crate::network::NetworkMessage<Self::Payload>> {
self.receiver.try_iter().collect()
}
}
#[test]
fn send_receive_messages() {
let mut rng = StepRng::new(1, 0);
let node_a = 0.into();
let node_b = 1.into();
let regions = HashMap::from([(Region::Europe, vec![node_a, node_b])]);
let behaviour = HashMap::from([(
(Region::Europe, Region::Europe),
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
)]);
let regions_data = RegionsData::new(regions, behaviour);
let mut network = Network::new(regions_data);
let (from_a_sender, from_a_receiver) = channel::unbounded();
let to_a_receiver = network.connect(node_a, from_a_receiver);
let a = MockNetworkInterface::new(node_a, from_a_sender, to_a_receiver);
let (from_b_sender, from_b_receiver) = channel::unbounded();
let to_b_receiver = network.connect(node_b, from_b_receiver);
let b = MockNetworkInterface::new(node_b, from_b_sender, to_b_receiver);
a.send_message(node_b, ());
network.collect_messages();
assert_eq!(a.receive_messages().len(), 0);
assert_eq!(b.receive_messages().len(), 0);
network.step(&mut rng, Duration::from_millis(0));
assert_eq!(a.receive_messages().len(), 0);
assert_eq!(b.receive_messages().len(), 0);
network.step(&mut rng, Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 0);
assert_eq!(b.receive_messages().len(), 1);
network.step(&mut rng, Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 0);
assert_eq!(b.receive_messages().len(), 0);
b.send_message(node_a, ());
b.send_message(node_a, ());
b.send_message(node_a, ());
network.collect_messages();
network.dispatch_after(&mut rng, Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 3);
assert_eq!(b.receive_messages().len(), 0);
}
#[test]
fn regions_send_receive_messages() {
let mut rng = StepRng::new(1, 0);
let node_a = 0.into();
let node_b = 1.into();
let node_c = 2.into();
let regions = HashMap::from([
(Region::Asia, vec![node_a, node_b]),
(Region::Europe, vec![node_c]),
]);
let behaviour = HashMap::from([
(
(Region::Asia, Region::Asia),
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
),
(
(Region::Asia, Region::Europe),
NetworkBehaviour::new(Duration::from_millis(500), 0.0),
),
(
(Region::Europe, Region::Europe),
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
),
]);
let regions_data = RegionsData::new(regions, behaviour);
let mut network = Network::new(regions_data);
let (from_a_sender, from_a_receiver) = channel::unbounded();
let to_a_receiver = network.connect(node_a, from_a_receiver);
let a = MockNetworkInterface::new(node_a, from_a_sender, to_a_receiver);
let (from_b_sender, from_b_receiver) = channel::unbounded();
let to_b_receiver = network.connect(node_b, from_b_receiver);
let b = MockNetworkInterface::new(node_b, from_b_sender, to_b_receiver);
let (from_c_sender, from_c_receiver) = channel::unbounded();
let to_c_receiver = network.connect(node_c, from_c_receiver);
let c = MockNetworkInterface::new(node_c, from_c_sender, to_c_receiver);
a.send_message(node_b, ());
a.send_message(node_c, ());
network.collect_messages();
b.send_message(node_a, ());
b.send_message(node_c, ());
network.collect_messages();
c.send_message(node_a, ());
c.send_message(node_b, ());
network.collect_messages();
network.dispatch_after(&mut rng, Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 1);
assert_eq!(b.receive_messages().len(), 1);
assert_eq!(c.receive_messages().len(), 0);
a.send_message(node_b, ());
b.send_message(node_c, ());
network.collect_messages();
network.dispatch_after(&mut rng, Duration::from_millis(400));
assert_eq!(a.receive_messages().len(), 1); // c to a
assert_eq!(b.receive_messages().len(), 2); // c to b && a to b
assert_eq!(c.receive_messages().len(), 2); // a to c && b to c
network.dispatch_after(&mut rng, Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 0);
assert_eq!(b.receive_messages().len(), 0);
assert_eq!(c.receive_messages().len(), 1); // b to c
}
} }

View File

@ -15,7 +15,7 @@ pub enum Region {
Australia, Australia,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegionsData { pub struct RegionsData {
pub regions: HashMap<Region, Vec<NodeId>>, pub regions: HashMap<Region, Vec<NodeId>>,
#[serde(skip)] #[serde(skip)]

View File

@ -1,9 +1,8 @@
// std // std
// crates // crates
use rand::Rng;
use serde::Deserialize; use serde::Deserialize;
// internal // internal
use crate::node::{Node, NodeId}; use super::{Node, NodeId};
#[derive(Default)] #[derive(Default)]
pub struct CarnotState {} pub struct CarnotState {}
@ -22,14 +21,6 @@ impl Node for CarnotNode {
type Settings = CarnotSettings; type Settings = CarnotSettings;
type State = CarnotState; type State = CarnotState;
fn new<R: Rng>(_rng: &mut R, id: NodeId, settings: Self::Settings) -> Self {
Self {
id,
state: Default::default(),
settings,
}
}
fn id(&self) -> NodeId { fn id(&self) -> NodeId {
self.id self.id
} }

View File

@ -0,0 +1,111 @@
// std
// crates
use crossbeam::channel::{Receiver, Sender};
use serde::Deserialize;
// internal
use crate::{
network::{NetworkInterface, NetworkMessage},
node::{Node, NodeId},
};
use super::{NetworkState, SharedState};
#[derive(Debug, Default)]
pub struct DummyState {
pub current_view: usize,
}
#[derive(Clone, Default, Deserialize)]
pub struct DummySettings {}
#[derive(Clone)]
pub enum DummyMessage {
EventOne(usize),
EventTwo(usize),
}
pub struct DummyNode {
node_id: NodeId,
state: DummyState,
_settings: DummySettings,
_network_state: SharedState<NetworkState>,
network_interface: DummyNetworkInterface,
}
impl DummyNode {
pub fn new(
node_id: NodeId,
_network_state: SharedState<NetworkState>,
network_interface: DummyNetworkInterface,
) -> Self {
Self {
node_id,
state: DummyState { current_view: 0 },
_settings: DummySettings {},
_network_state,
network_interface,
}
}
}
impl Node for DummyNode {
type Settings = DummySettings;
type State = DummyState;
fn id(&self) -> NodeId {
self.node_id
}
fn current_view(&self) -> usize {
self.state.current_view
}
fn state(&self) -> &DummyState {
&self.state
}
fn step(&mut self) {
let incoming_messages = self.network_interface.receive_messages();
self.state.current_view += 1;
for message in incoming_messages {
match message.payload {
DummyMessage::EventOne(_) => todo!(),
DummyMessage::EventTwo(_) => todo!(),
}
}
}
}
pub struct DummyNetworkInterface {
id: NodeId,
sender: Sender<NetworkMessage<DummyMessage>>,
receiver: Receiver<NetworkMessage<DummyMessage>>,
}
impl DummyNetworkInterface {
pub fn new(
id: NodeId,
sender: Sender<NetworkMessage<DummyMessage>>,
receiver: Receiver<NetworkMessage<DummyMessage>>,
) -> Self {
Self {
id,
sender,
receiver,
}
}
}
impl NetworkInterface for DummyNetworkInterface {
type Payload = DummyMessage;
fn send_message(&self, address: NodeId, message: Self::Payload) {
let message = NetworkMessage::new(self.id, address, message);
self.sender.send(message).unwrap();
}
fn receive_messages(&self) -> Vec<crate::network::NetworkMessage<Self::Payload>> {
self.receiver.try_iter().collect()
}
}

View File

@ -1,14 +1,16 @@
pub mod carnot; pub mod carnot;
pub mod dummy;
// std // std
use std::{ use std::{
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
sync::{Arc, RwLock},
time::Duration, time::Duration,
}; };
// crates // crates
use rand::Rng;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
// internal // internal
use crate::overlay::Layout;
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[serde(transparent)] #[serde(transparent)]
@ -114,10 +116,17 @@ impl core::iter::Sum<StepTime> for Duration {
} }
} }
/// A state that represents how nodes are interconnected in the network.
pub struct NetworkState {
pub layout: Layout,
}
pub type SharedState<S> = Arc<RwLock<S>>;
pub trait Node { pub trait Node {
type Settings; type Settings;
type State; type State;
fn new<R: Rng>(rng: &mut R, id: NodeId, settings: Self::Settings) -> Self;
fn id(&self) -> NodeId; fn id(&self) -> NodeId;
// TODO: View must be view whenever we integrate consensus engine // TODO: View must be view whenever we integrate consensus engine
fn current_view(&self) -> usize; fn current_view(&self) -> usize;
@ -130,10 +139,6 @@ impl Node for usize {
type Settings = (); type Settings = ();
type State = Self; type State = Self;
fn new<R: rand::Rng>(_rng: &mut R, id: NodeId, _settings: Self::Settings) -> Self {
id.inner()
}
fn id(&self) -> NodeId { fn id(&self) -> NodeId {
(*self).into() (*self).into()
} }

View File

@ -7,8 +7,9 @@ use serde::Deserialize;
use super::{Committee, Layout, Overlay}; use super::{Committee, Layout, Overlay};
use crate::node::{CommitteeId, NodeId}; use crate::node::{CommitteeId, NodeId};
#[derive(Clone, Deserialize)] #[derive(Clone, Default, Deserialize)]
pub enum TreeType { pub enum TreeType {
#[default]
FullBinaryTree, FullBinaryTree,
} }
@ -19,6 +20,16 @@ pub struct TreeSettings {
pub depth: usize, pub depth: usize,
} }
impl Default for TreeSettings {
fn default() -> Self {
Self {
tree_type: TreeType::default(),
committee_size: 1,
depth: 1,
}
}
}
pub struct TreeOverlay { pub struct TreeOverlay {
settings: TreeSettings, settings: TreeSettings,
} }

View File

@ -8,11 +8,12 @@ use rayon::prelude::*;
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
pub fn simulate<N: Node, O: Overlay>( pub fn simulate<M, N: Node, O: Overlay>(
runner: &mut SimulationRunner<N, O>, runner: &mut SimulationRunner<M, N, O>,
chunk_size: usize, chunk_size: usize,
mut out_data: Option<&mut Vec<OutData>>, mut out_data: Option<&mut Vec<OutData>>,
) where ) where
M: Clone,
N::Settings: Clone, N::Settings: Clone,
N: Send + Sync, N: Send + Sync,
O::Settings: Clone, O::Settings: Clone,

View File

@ -8,12 +8,13 @@ use std::collections::BTreeSet;
use std::sync::Arc; use std::sync::Arc;
/// [Glauber dynamics simulation](https://en.wikipedia.org/wiki/Glauber_dynamics) /// [Glauber dynamics simulation](https://en.wikipedia.org/wiki/Glauber_dynamics)
pub fn simulate<N: Node, O: Overlay>( pub fn simulate<M, N: Node, O: Overlay>(
runner: &mut SimulationRunner<N, O>, runner: &mut SimulationRunner<M, N, O>,
update_rate: usize, update_rate: usize,
maximum_iterations: usize, maximum_iterations: usize,
mut out_data: Option<&mut Vec<OutData>>, mut out_data: Option<&mut Vec<OutData>>,
) where ) where
M: Clone,
N: Send + Sync, N: Send + Sync,
N::Settings: Clone, N::Settings: Clone,
O::Settings: Clone, O::Settings: Clone,

View File

@ -42,12 +42,13 @@ use crate::overlay::Overlay;
use crate::runner::SimulationRunner; use crate::runner::SimulationRunner;
use crate::warding::SimulationState; use crate::warding::SimulationState;
pub fn simulate<N: Node, O: Overlay>( pub fn simulate<M, N: Node, O: Overlay>(
runner: &mut SimulationRunner<N, O>, runner: &mut SimulationRunner<M, N, O>,
gap: usize, gap: usize,
distribution: Option<Vec<f32>>, distribution: Option<Vec<f32>>,
mut out_data: Option<&mut Vec<OutData>>, mut out_data: Option<&mut Vec<OutData>>,
) where ) where
M: Clone,
N: Send + Sync, N: Send + Sync,
N::Settings: Clone, N::Settings: Clone,
O::Settings: Clone, O::Settings: Clone,
@ -129,9 +130,9 @@ fn choose_random_layer_and_node_id(
(i, *node_id) (i, *node_id)
} }
fn build_node_ids_deque<N, O>( fn build_node_ids_deque<M, N, O>(
gap: usize, gap: usize,
runner: &SimulationRunner<N, O>, runner: &SimulationRunner<M, N, O>,
) -> FixedSliceDeque<BTreeSet<NodeId>> ) -> FixedSliceDeque<BTreeSet<NodeId>>
where where
N: Node, N: Node,

View File

@ -3,10 +3,13 @@ mod glauber_runner;
mod layered_runner; mod layered_runner;
mod sync_runner; mod sync_runner;
use std::marker::PhantomData;
// std // std
use std::marker::PhantomData;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::Duration;
// crates // crates
use crate::network::Network;
use rand::rngs::SmallRng; use rand::rngs::SmallRng;
use rand::{RngCore, SeedableRng}; use rand::{RngCore, SeedableRng};
use rayon::prelude::*; use rayon::prelude::*;
@ -19,61 +22,48 @@ use crate::warding::{SimulationState, SimulationWard};
/// Encapsulation solution for the simulations runner /// Encapsulation solution for the simulations runner
/// Holds the network state, the simulating nodes and the simulation settings. /// Holds the network state, the simulating nodes and the simulation settings.
pub struct SimulationRunner<N, O> pub struct SimulationRunner<M, N, O>
where where
N: Node, N: Node,
O: Overlay, O: Overlay,
{ {
nodes: Arc<RwLock<Vec<N>>>, nodes: Arc<RwLock<Vec<N>>>,
network: Network<M>,
settings: SimulationSettings<N::Settings, O::Settings>, settings: SimulationSettings<N::Settings, O::Settings>,
rng: SmallRng, rng: SmallRng,
_overlay: PhantomData<O>, _overlay: PhantomData<O>,
} }
impl<N: Node, O: Overlay> SimulationRunner<N, O> impl<M, N: Node, O: Overlay> SimulationRunner<M, N, O>
where where
M: Clone,
N: Send + Sync, N: Send + Sync,
N::Settings: Clone, N::Settings: Clone,
O::Settings: Clone, O::Settings: Clone,
{ {
pub fn new(settings: SimulationSettings<N::Settings, O::Settings>) -> Self { pub fn new(
network: Network<M>,
nodes: Vec<N>,
settings: SimulationSettings<N::Settings, O::Settings>,
) -> Self {
let seed = settings let seed = settings
.seed .seed
.unwrap_or_else(|| rand::thread_rng().next_u64()); .unwrap_or_else(|| rand::thread_rng().next_u64());
println!("Seed: {seed}"); println!("Seed: {seed}");
let mut rng = SmallRng::seed_from_u64(seed); let rng = SmallRng::seed_from_u64(seed);
let overlay = O::new(settings.overlay_settings.clone());
let nodes = Self::nodes_from_initial_settings(&settings, overlay, &mut rng);
let nodes = Arc::new(RwLock::new(nodes)); let nodes = Arc::new(RwLock::new(nodes));
Self { Self {
nodes, nodes,
network,
settings, settings,
rng, rng,
_overlay: Default::default(), _overlay: Default::default(),
} }
} }
/// Initialize nodes from settings and calculate initial network state.
fn nodes_from_initial_settings(
settings: &SimulationSettings<N::Settings, O::Settings>,
_overlay: O, // TODO: attach overlay information to nodes
seed: &mut SmallRng,
) -> Vec<N> {
let SimulationSettings {
node_settings,
node_count,
..
} = settings;
(0..*node_count)
.map(|id| N::new(seed, id.into(), node_settings.clone()))
.collect()
}
pub fn simulate(&mut self, out_data: Option<&mut Vec<OutData>>) { pub fn simulate(&mut self, out_data: Option<&mut Vec<OutData>>) {
match self.settings.runner_settings.clone() { match self.settings.runner_settings.clone() {
RunnerSettings::Sync => { RunnerSettings::Sync => {
@ -100,7 +90,7 @@ where
fn dump_state_to_out_data( fn dump_state_to_out_data(
&self, &self,
_simulation_state: &SimulationState<N>, _simulation_state: &SimulationState<N>,
_out_ata: &mut Option<&mut Vec<OutData>>, _out_data: &mut Option<&mut Vec<OutData>>,
) { ) {
todo!("What data do we want to expose?") todo!("What data do we want to expose?")
} }
@ -114,6 +104,8 @@ where
} }
fn step(&mut self) { fn step(&mut self) {
self.network
.dispatch_after(&mut self.rng, Duration::from_millis(100));
self.nodes self.nodes
.write() .write()
.expect("Single access to nodes vector") .expect("Single access to nodes vector")
@ -121,5 +113,6 @@ where
.for_each(|node| { .for_each(|node| {
node.step(); node.step();
}); });
self.network.collect_messages();
} }
} }

View File

@ -6,10 +6,11 @@ use crate::warding::SimulationState;
use std::sync::Arc; use std::sync::Arc;
/// Simulate with option of dumping the network state as a `::polars::Series` /// Simulate with option of dumping the network state as a `::polars::Series`
pub fn simulate<N: Node, O: Overlay>( pub fn simulate<M, N: Node, O: Overlay>(
runner: &mut SimulationRunner<N, O>, runner: &mut SimulationRunner<M, N, O>,
mut out_data: Option<&mut Vec<OutData>>, mut out_data: Option<&mut Vec<OutData>>,
) where ) where
M: Clone,
N: Send + Sync, N: Send + Sync,
N::Settings: Clone, N::Settings: Clone,
O::Settings: Clone, O::Settings: Clone,
@ -29,3 +30,88 @@ pub fn simulate<N: Node, O: Overlay>(
} }
} }
} }
#[cfg(test)]
mod tests {
use crate::{
network::{
behaviour::NetworkBehaviour,
regions::{Region, RegionsData},
Network,
},
node::{
dummy::{DummyMessage, DummyNetworkInterface, DummyNode, DummySettings},
NetworkState, Node, NodeId, SharedState,
},
overlay::{
tree::{TreeOverlay, TreeSettings},
Overlay,
},
runner::SimulationRunner,
settings::SimulationSettings,
};
use crossbeam::channel;
use rand::rngs::mock::StepRng;
use std::{
collections::HashMap,
sync::{Arc, RwLock},
time::Duration,
};
fn init_network(node_ids: &[NodeId]) -> Network<DummyMessage> {
let regions = HashMap::from([(Region::Europe, node_ids.to_vec())]);
let behaviour = HashMap::from([(
(Region::Europe, Region::Europe),
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
)]);
let regions_data = RegionsData::new(regions, behaviour);
Network::new(regions_data)
}
fn init_dummy_nodes(
node_ids: &[NodeId],
network: &mut Network<DummyMessage>,
network_state: SharedState<NetworkState>,
) -> Vec<DummyNode> {
node_ids
.iter()
.map(|node_id| {
let (node_message_sender, node_message_receiver) = channel::unbounded();
let network_message_receiver = network.connect(*node_id, node_message_receiver);
let network_interface = DummyNetworkInterface::new(
*node_id,
node_message_sender,
network_message_receiver,
);
DummyNode::new(*node_id, network_state.clone(), network_interface)
})
.collect()
}
#[test]
fn runner_one_step() {
let settings: SimulationSettings<DummySettings, TreeSettings> = SimulationSettings {
node_count: 10,
committee_size: 1,
..Default::default()
};
let mut rng = StepRng::new(1, 0);
let node_ids: Vec<NodeId> = (0..settings.node_count).map(Into::into).collect();
let overlay = TreeOverlay::new(settings.overlay_settings.clone());
let mut network = init_network(&node_ids);
let network_state = Arc::new(RwLock::new(NetworkState {
layout: overlay.layout(&node_ids, &mut rng),
}));
let nodes = init_dummy_nodes(&node_ids, &mut network, network_state);
let mut runner: SimulationRunner<DummyMessage, DummyNode, TreeOverlay> =
SimulationRunner::new(network, nodes, settings);
runner.step();
let nodes = runner.nodes.read().unwrap();
for node in nodes.iter() {
assert_eq!(node.current_view(), 1);
}
}
}

View File

@ -21,7 +21,7 @@ pub enum RunnerSettings {
}, },
} }
#[derive(Deserialize)] #[derive(Default, Deserialize)]
pub struct SimulationSettings<N, O> { pub struct SimulationSettings<N, O> {
pub network_behaviors: HashMap<(Region, Region), StepTime>, pub network_behaviors: HashMap<(Region, Region), StepTime>,
pub regions: Vec<Region>, pub regions: Vec<Region>,