diff --git a/network-runner/README.md b/network-runner/README.md new file mode 100644 index 0000000..41c0a90 --- /dev/null +++ b/network-runner/README.md @@ -0,0 +1,8 @@ +# Network Simulator + +## Running simulations + +To run the simulation use this command line: +```bash +cargo run -- --input-settings config/mixnode.json +``` diff --git a/network-runner/config/carnot.json b/network-runner/config/mixnode.json similarity index 55% rename from network-runner/config/carnot.json rename to network-runner/config/mixnode.json index 83bd6fd..b2b494a 100644 --- a/network-runner/config/carnot.json +++ b/network-runner/config/mixnode.json @@ -17,9 +17,6 @@ "asia": 0.3 } }, - "overlay_settings": { - "number_of_committees": 3 - }, "node_settings": { "timeout": "1000ms" }, @@ -28,25 +25,7 @@ "stream_settings": { "path": "test.csv" }, - "node_count": 3000, - "views_count": 3, - "leaders_count": 1, + "node_count": 3, "seed": 0, - "wards": [ - {"max_view": 1} - ], - "record_settings": { - "node_id": true, - "current_view": true, - "highest_voted_view": true, - "local_high_qc": true, - "safe_blocks": true, - "last_view_timeout_qc": true, - "latest_committed_block": true, - "latest_committed_view": true, - "root_committee": true, - "parent_committee": true, - "child_committees": true, - "committed_blocks": true - } + "record_settings": {} } diff --git a/network-runner/src/bin/app/main.rs b/network-runner/src/bin/app/main.rs index 5c07488..bc0008a 100644 --- a/network-runner/src/bin/app/main.rs +++ b/network-runner/src/bin/app/main.rs @@ -6,10 +6,14 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; // crates use anyhow::Ok; use clap::Parser; +use crossbeam::channel; use nomos_simulations_network_runner::network::behaviour::create_behaviours; use nomos_simulations_network_runner::network::regions::{create_regions, RegionsData}; -use nomos_simulations_network_runner::network::Network; -use nomos_simulations_network_runner::node::NodeId; +use nomos_simulations_network_runner::network::{InMemoryNetworkInterface, Network}; +use nomos_simulations_network_runner::node::mix::{ + MixMessage, MixNode, MixNodeState, MixnodeSettings, +}; +use nomos_simulations_network_runner::node::{NodeId, NodeIdExt}; use nomos_simulations_network_runner::output_processors::{OutData, Record}; use nomos_simulations_network_runner::runner::{BoxedNode, SimulationRunnerHandle}; #[cfg(feature = "polars")] @@ -18,10 +22,10 @@ use nomos_simulations_network_runner::streaming::{ io::IOSubscriber, naive::NaiveSubscriber, StreamType, }; use parking_lot::Mutex; +use rand::prelude::IteratorRandom; use rand::rngs::SmallRng; use rand::seq::SliceRandom; use rand::SeedableRng; -use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use serde::de::DeserializeOwned; use serde::Serialize; // internal @@ -42,8 +46,6 @@ pub struct SimulationApp { #[clap(long, default_value = "stdout")] log_to: log::LogOutput, #[clap(long)] - dump_overlay_info: bool, - #[clap(long)] no_netcap: bool, } @@ -54,7 +56,6 @@ impl SimulationApp { stream_type, log_format: _, log_to: _, - dump_overlay_info, no_netcap, } = self; let simulation_settings: SimulationSettings = load_json_from_file(&input_settings)?; @@ -67,7 +68,7 @@ impl SimulationApp { }); let mut rng = SmallRng::seed_from_u64(seed); let mut node_ids: Vec = (0..simulation_settings.node_count) - .map(|_| todo!()) + .map(NodeId::from_index) .collect(); node_ids.shuffle(&mut rng); @@ -76,40 +77,81 @@ impl SimulationApp { let regions_data = RegionsData::new(regions, behaviours); let ids = node_ids.clone(); - let network = Arc::new(Mutex::new(Network::<()>::new(regions_data, seed))); + let network = Arc::new(Mutex::new(Network::::new(regions_data, seed))); - // if dump_overlay_info { - // dump_json_to_file( - // Path::new("overlay_info.json"), - // &overlay_node::overlay_info( - // node_ids.clone(), - // node_ids.first().copied().unwrap(), - // &simulation_settings.overlay_settings, - // ), - // )?; - // } - - // let nodes: Vec> = node_ids - // .par_iter() - // .copied() - // .map(|node_id| todo!()) - // .collect(); - // let network = Arc::try_unwrap(network) - // .expect("network is not used anywhere else") - // .into_inner(); - // run::<_, _, _>(network, nodes, simulation_settings, stream_type)?; + let nodes: Vec<_> = node_ids + .iter() + .copied() + .map(|node_id| { + let mut network = network.lock(); + create_boxed_mixnode( + node_id, + &mut network, + simulation_settings.clone(), + no_netcap, + MixnodeSettings { + connected_peers: ids + .iter() + .filter(|&id| id != &node_id) + .copied() + .choose_multiple(&mut rng, 3), + }, + ) + }) + .collect(); + let network = Arc::try_unwrap(network) + .expect("network is not used anywhere else") + .into_inner(); + run::<_, _, _>(network, nodes, simulation_settings, stream_type)?; Ok(()) } } -fn run( +fn create_boxed_mixnode( + node_id: NodeId, + network: &mut Network, + simulation_settings: SimulationSettings, + no_netcap: bool, + mixnode_settings: MixnodeSettings, +) -> BoxedNode { + let (node_message_broadcast_sender, node_message_broadcast_receiver) = channel::unbounded(); + let (node_message_sender, node_message_receiver) = channel::unbounded(); + // Dividing milliseconds in second by milliseconds in the step. + let step_time_as_second_fraction = + simulation_settings.step_time.subsec_millis() as f32 / 1_000_000_f32; + let capacity_bps = if no_netcap { + None + } else { + simulation_settings + .node_settings + .network_capacity_kbps + .map(|c| (c as f32 * 1024.0 * step_time_as_second_fraction) as u32) + }; + let network_message_receiver = { + network.connect( + node_id, + capacity_bps, + node_message_receiver, + node_message_broadcast_receiver, + ) + }; + let network_interface = InMemoryNetworkInterface::new( + node_id, + node_message_broadcast_sender, + node_message_sender, + network_message_receiver, + ); + Box::new(MixNode::new(node_id, mixnode_settings, network_interface)) +} + +fn run( network: Network, nodes: Vec>, settings: SimulationSettings, stream_type: Option, ) -> anyhow::Result<()> where - M: Clone + Send + Sync + 'static, + M: std::fmt::Debug + Clone + Send + Sync + 'static, S: 'static, T: Serialize + Clone + 'static, { @@ -168,11 +210,6 @@ fn load_json_from_file(path: &Path) -> anyhow::Result { Ok(serde_json::from_reader(f)?) } -fn dump_json_to_file(path: &Path, data: &T) -> anyhow::Result<()> { - let f = File::create(path).map_err(Box::new)?; - Ok(serde_json::to_writer(f, data)?) -} - fn main() -> anyhow::Result<()> { let app: SimulationApp = SimulationApp::parse(); log::config_tracing(app.log_format, &app.log_to); diff --git a/network-runner/src/node/mix/mod.rs b/network-runner/src/node/mix/mod.rs new file mode 100644 index 0000000..21330cf --- /dev/null +++ b/network-runner/src/node/mix/mod.rs @@ -0,0 +1,77 @@ +use super::{Node, NodeId}; +use crate::network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize}; +use serde::{Deserialize, Serialize}; +use std::time::Duration; + +#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize)] +pub struct MixNodeState { + pub mock_counter: usize, +} + +#[derive(Debug, Clone)] +pub enum MixMessage { + Dummy(String), +} + +impl PayloadSize for MixMessage { + fn size_bytes(&self) -> u32 { + 2208 + } +} + +pub struct MixnodeSettings { + pub connected_peers: Vec, +} + +/// This node implementation only used for testing different streaming implementation purposes. +pub struct MixNode { + id: NodeId, + state: MixNodeState, + settings: MixnodeSettings, + network_interface: InMemoryNetworkInterface, +} + +impl MixNode { + pub fn new( + id: NodeId, + settings: MixnodeSettings, + network_interface: InMemoryNetworkInterface, + ) -> Self { + Self { + id, + network_interface, + settings, + state: MixNodeState::default(), + } + } +} + +impl Node for MixNode { + type Settings = MixnodeSettings; + + type State = MixNodeState; + + fn id(&self) -> NodeId { + self.id + } + + fn state(&self) -> &Self::State { + &self.state + } + + fn step(&mut self, _: Duration) { + let messages = self.network_interface.receive_messages(); + for message in messages { + println!(">>>>> Node {}, message: {message:?}", self.id); + } + + self.state.mock_counter += 1; + + for node_id in self.settings.connected_peers.iter() { + self.network_interface.send_message( + *node_id, + MixMessage::Dummy(format!("Hello from node: {}", self.id)), + ) + } + } +} diff --git a/network-runner/src/node/mod.rs b/network-runner/src/node/mod.rs index eaa7cb2..aab90c9 100644 --- a/network-runner/src/node/mod.rs +++ b/network-runner/src/node/mod.rs @@ -1,9 +1,9 @@ #[cfg(test)] pub mod dummy_streaming; +pub mod mix; // std use std::{ - collections::HashMap, ops::{Deref, DerefMut}, sync::Arc, time::Duration, diff --git a/network-runner/src/runner/mod.rs b/network-runner/src/runner/mod.rs index cb002d5..f81121e 100644 --- a/network-runner/src/runner/mod.rs +++ b/network-runner/src/runner/mod.rs @@ -134,14 +134,11 @@ where let nodes = Arc::new(RwLock::new(nodes)); let SimulationSettings { wards, - overlay_settings: _, node_settings: _, runner_settings, stream_settings: _, node_count: _, seed: _, - views_count: _, - leaders_count: _, network_settings: _, step_time, record_settings: _, @@ -166,6 +163,7 @@ where match self.runner_settings.clone() { RunnerSettings::Sync => sync_runner::simulate(self, step_time), + RunnerSettings::Async { chunks } => async_runner::simulate(self, chunks, step_time), } } } diff --git a/network-runner/src/settings.rs b/network-runner/src/settings.rs index cdeb889..270a8fc 100644 --- a/network-runner/src/settings.rs +++ b/network-runner/src/settings.rs @@ -9,6 +9,9 @@ use serde::{Deserialize, Serialize}; pub enum RunnerSettings { #[default] Sync, + Async { + chunks: usize, + }, } #[derive(Clone, Debug, Default, Serialize, Deserialize)] @@ -44,7 +47,6 @@ pub struct SimulationSettings { #[serde(default)] pub record_settings: BTreeMap, pub network_settings: NetworkSettings, - pub overlay_settings: OverlaySettings, pub node_settings: NodeSettings, #[serde(default)] pub runner_settings: RunnerSettings, @@ -52,7 +54,5 @@ pub struct SimulationSettings { #[serde(with = "humantime_serde")] pub step_time: std::time::Duration, pub node_count: usize, - pub views_count: usize, - pub leaders_count: usize, pub seed: Option, } diff --git a/network-runner/src/warding/ttf.rs b/network-runner/src/warding/ttf.rs index 9c242ee..ac2f7d9 100644 --- a/network-runner/src/warding/ttf.rs +++ b/network-runner/src/warding/ttf.rs @@ -11,7 +11,7 @@ pub struct MaxViewWard { impl SimulationWard for MaxViewWard { type SimulationState = SimulationState; - fn analyze(&mut self, state: &Self::SimulationState) -> bool { + fn analyze(&mut self, _state: &Self::SimulationState) -> bool { // state.nodes.read().iter(); //.all(|n| n.current_view() >= self.max_count) todo!()