diff --git a/simlib/mixnet-sims/config/mixnode.json b/simlib/mixnet-sims/config/mixnode.json index 407d479..1be0dd8 100644 --- a/simlib/mixnet-sims/config/mixnode.json +++ b/simlib/mixnet-sims/config/mixnode.json @@ -25,12 +25,12 @@ "stream_settings": { "path": "test.json" }, - "node_count": 100, + "node_count": 10, "seed": 0, "record_settings": {}, "wards": [ { - "sum": 100 + "sum":2 } ], "connected_peers_count": 4, diff --git a/simlib/mixnet-sims/src/main.rs b/simlib/mixnet-sims/src/main.rs index 7d91157..0362e06 100644 --- a/simlib/mixnet-sims/src/main.rs +++ b/simlib/mixnet-sims/src/main.rs @@ -11,7 +11,7 @@ use clap::Parser; use crossbeam::channel; use netrunner::network::behaviour::create_behaviours; use netrunner::network::regions::{create_regions, RegionsData}; -use netrunner::network::{InMemoryNetworkInterface, Network}; +use netrunner::network::{InMemoryNetworkInterface, Network, PayloadSize}; use netrunner::node::{NodeId, NodeIdExt}; use netrunner::output_processors::Record; use netrunner::runner::{BoxedNode, SimulationRunnerHandle}; @@ -184,7 +184,7 @@ fn run( stream_type: Option, ) -> anyhow::Result<()> where - M: std::fmt::Debug + Clone + Send + Sync + 'static, + M: std::fmt::Debug + PayloadSize + Clone + Send + Sync + 'static, S: 'static, T: Serialize + Clone + 'static, { @@ -244,10 +244,11 @@ fn load_json_from_file(path: &Path) -> anyhow::Result { fn main() -> anyhow::Result<()> { let app: SimulationApp = SimulationApp::parse(); - let _maybe_guard = log::config_tracing(app.log_format, &app.log_to, app.with_metrics); + let maybe_guard = log::config_tracing(app.log_format, &app.log_to, app.with_metrics); if let Err(e) = app.run() { tracing::error!("error: {}", e); + drop(maybe_guard); std::process::exit(1); } Ok(()) diff --git a/simlib/netrunner/src/network/mod.rs b/simlib/netrunner/src/network/mod.rs index 70c413c..925bd55 100644 --- a/simlib/netrunner/src/network/mod.rs +++ b/simlib/netrunner/src/network/mod.rs @@ -133,6 +133,8 @@ impl NodeNetworkCapacity { false } } else { + let mut current_load = self.current_load.lock(); + *current_load += load; true } } @@ -141,19 +143,20 @@ impl NodeNetworkCapacity { self.load_to_flush.fetch_add(load, Ordering::Relaxed); } - fn flush_load(&self) { - if self.capacity_bps.is_none() { - return; - } - + fn flush_load(&self) -> u32 { let mut s = self.current_load.lock(); + let previous_load = *s; *s -= self.load_to_flush.load(Ordering::Relaxed); self.load_to_flush.store(0, Ordering::Relaxed); + previous_load } } #[derive(Debug)] -pub struct Network { +pub struct Network +where + M: std::fmt::Debug + PayloadSize, +{ pub regions: regions::RegionsData, network_time: NetworkTime, messages: Vec<(NetworkTime, NetworkMessage)>, @@ -161,12 +164,19 @@ pub struct Network { from_node_receivers: HashMap>>, from_node_broadcast_receivers: HashMap>>, to_node_senders: HashMap>>, + state: NetworkState, seed: u64, } +#[derive(Serialize, Deserialize, Default, Debug, Clone)] +pub struct NetworkState { + pub total_outbound_bandwidth: u64, + pub total_inbound_bandwidth: u64, +} + impl Network where - M: std::fmt::Debug + Send + Sync + Clone, + M: std::fmt::Debug + PayloadSize + Send + Sync + Clone, { pub fn new(regions: regions::RegionsData, seed: u64) -> Self { Self { @@ -177,10 +187,15 @@ where from_node_receivers: HashMap::new(), from_node_broadcast_receivers: HashMap::new(), to_node_senders: HashMap::new(), + state: NetworkState::default(), seed, } } + pub fn bandwidth_results(&self) -> NetworkState { + self.state.clone() + } + fn send_message_cost( &self, rng: &mut R, @@ -219,7 +234,9 @@ where /// Receive and store all messages from nodes. pub fn collect_messages(&mut self) { - let mut adhoc_messages = self + let mut total_step_outbound_bandwidth = 0u64; + + let mut adhoc_messages: Vec<(Instant, NetworkMessage)> = self .from_node_receivers .par_iter() .flat_map(|(_, from_node)| { @@ -229,6 +246,10 @@ where .collect::>() }) .collect(); + total_step_outbound_bandwidth += adhoc_messages + .iter() + .map(|(_, m)| m.payload().size_bytes() as u64) + .sum::(); self.messages.append(&mut adhoc_messages); let mut broadcast_messages = self @@ -245,7 +266,13 @@ where }) .map(|m| (self.network_time, m)) .collect::>(); + total_step_outbound_bandwidth += broadcast_messages + .iter() + .map(|(_, m)| m.payload().size_bytes() as u64) + .sum::(); self.messages.append(&mut broadcast_messages); + + self.state.total_outbound_bandwidth += total_step_outbound_bandwidth; } /// Reiterate all messages and send to appropriate nodes if simulated @@ -262,12 +289,14 @@ where }) .cloned() .collect(); + self.messages = delayed; + let mut total_step_inbound_bandwidth = 0u64; for (_, c) in self.node_network_capacity.iter() { - c.flush_load(); + total_step_inbound_bandwidth += c.flush_load() as u64; } - self.messages = delayed; + self.state.total_inbound_bandwidth += total_step_inbound_bandwidth; } /// Returns true if message needs to be delayed and be dispatched in future. @@ -325,6 +354,7 @@ where } remaining } else { + node_capacity.decrease_load(message.remaining_size()); 0 } } @@ -428,7 +458,7 @@ mod tests { use super::{ behaviour::NetworkBehaviour, regions::{Region, RegionsData}, - Network, NetworkInterface, NetworkMessage, + Network, NetworkInterface, NetworkMessage, PayloadSize, }; use crate::{ network::NetworkBehaviourKey, @@ -463,6 +493,12 @@ mod tests { } } + impl PayloadSize for () { + fn size_bytes(&self) -> u32 { + todo!() + } + } + impl NetworkInterface for MockNetworkInterface { type Payload = (); diff --git a/simlib/netrunner/src/output_processors/mod.rs b/simlib/netrunner/src/output_processors/mod.rs index aa3cf1d..fe61143 100644 --- a/simlib/netrunner/src/output_processors/mod.rs +++ b/simlib/netrunner/src/output_processors/mod.rs @@ -3,6 +3,7 @@ use std::time::Duration; use chrono::{DateTime, Utc}; use serde::Serialize; +use crate::network::NetworkState; use crate::settings::SimulationSettings; use crate::warding::SimulationState; @@ -36,23 +37,29 @@ pub trait Record: From + From + Send + Sync + 'stat pub type SerializedNodeState = serde_json::Value; #[derive(Serialize)] -pub struct Runtime { +pub struct Simulation { start: DateTime, end: DateTime, elapsed: Duration, } +#[derive(Serialize)] +pub enum Runtime { + Simulation(Simulation), + Network(NetworkState), +} + impl Runtime { pub(crate) fn load() -> anyhow::Result { let elapsed = crate::START_TIME.elapsed(); let end = Utc::now(); - Ok(Self { + Ok(Self::Simulation(Simulation { start: end .checked_sub_signed(chrono::Duration::from_std(elapsed)?) .unwrap(), end, elapsed, - }) + })) } } diff --git a/simlib/netrunner/src/runner/async_runner.rs b/simlib/netrunner/src/runner/async_runner.rs index 4cc5ddc..f9d9d09 100644 --- a/simlib/netrunner/src/runner/async_runner.rs +++ b/simlib/netrunner/src/runner/async_runner.rs @@ -1,3 +1,4 @@ +use crate::network::PayloadSize; use crate::node::NodeId; use crate::output_processors::Record; use crate::runner::SimulationRunner; @@ -19,7 +20,7 @@ pub fn simulate( step_time: Duration, ) -> anyhow::Result> where - M: std::fmt::Debug + Clone + Send + Sync + 'static, + M: std::fmt::Debug + PayloadSize + Clone + Send + Sync + 'static, R: Record + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + Send diff --git a/simlib/netrunner/src/runner/mod.rs b/simlib/netrunner/src/runner/mod.rs index f81121e..f3c6470 100644 --- a/simlib/netrunner/src/runner/mod.rs +++ b/simlib/netrunner/src/runner/mod.rs @@ -19,7 +19,7 @@ use rayon::prelude::*; use serde::Serialize; // internal -use crate::network::Network; +use crate::network::{Network, PayloadSize}; use crate::node::Node; use crate::settings::{RunnerSettings, SimulationSettings}; use crate::warding::{SimulationState, SimulationWard, Ward}; @@ -66,7 +66,10 @@ impl SimulationRunnerHandle { } } -pub(crate) struct SimulationRunnerInner { +pub(crate) struct SimulationRunnerInner +where + M: std::fmt::Debug + PayloadSize, +{ network: Network, wards: Vec, rng: SmallRng, @@ -74,7 +77,7 @@ pub(crate) struct SimulationRunnerInner { impl SimulationRunnerInner where - M: std::fmt::Debug + Send + Sync + Clone, + M: std::fmt::Debug + PayloadSize + Send + Sync + Clone, { fn check_wards(&mut self, state: &SimulationState) -> bool { self.wards @@ -94,7 +97,10 @@ where /// Encapsulation solution for the simulations runner /// Holds the network state, the simulating nodes and the simulation settings. -pub struct SimulationRunner { +pub struct SimulationRunner +where + M: std::fmt::Debug + PayloadSize, +{ inner: SimulationRunnerInner, nodes: Arc>>>, runner_settings: RunnerSettings, @@ -104,7 +110,7 @@ pub struct SimulationRunner { impl SimulationRunner where - M: std::fmt::Debug + Clone + Send + Sync + 'static, + M: std::fmt::Debug + PayloadSize + Clone + Send + Sync + 'static, R: Record + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + Send @@ -170,7 +176,7 @@ where impl SimulationRunner where - M: std::fmt::Debug + Clone + Send + Sync + 'static, + M: std::fmt::Debug + PayloadSize + Clone + Send + Sync + 'static, R: Record + serde::Serialize + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> diff --git a/simlib/netrunner/src/runner/sync_runner.rs b/simlib/netrunner/src/runner/sync_runner.rs index 030f44e..92b6580 100644 --- a/simlib/netrunner/src/runner/sync_runner.rs +++ b/simlib/netrunner/src/runner/sync_runner.rs @@ -1,4 +1,5 @@ use super::{SimulationRunner, SimulationRunnerHandle}; +use crate::network::PayloadSize; use crate::output_processors::Record; use crate::warding::SimulationState; use crossbeam::channel::{bounded, select}; @@ -11,7 +12,7 @@ pub fn simulate( step_time: Duration, ) -> anyhow::Result> where - M: std::fmt::Debug + Send + Sync + Clone + 'static, + M: std::fmt::Debug + PayloadSize + Send + Sync + Clone + 'static, R: Record + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + Send @@ -35,7 +36,7 @@ where loop { select! { recv(stop_rx) -> _ => { - return Ok(()); + break; } default => { // we must use a code block to make sure once the step call is finished then the write lock will be released, because in Record::try_from(&state), @@ -49,11 +50,16 @@ where p.send(R::try_from(&state)?)?; // check if any condition makes the simulation stop if inner_runner.check_wards(&state) { - return Ok(()); + break; } } } } + tracing::info!( + "Total bandwidth results: {:?}", + inner_runner.network.bandwidth_results() + ); + Ok(()) }); Ok(SimulationRunnerHandle { producer: p1, diff --git a/simlib/netrunner/src/streaming/runtime_subscriber.rs b/simlib/netrunner/src/streaming/runtime_subscriber.rs index f4c36be..3f45c56 100644 --- a/simlib/netrunner/src/streaming/runtime_subscriber.rs +++ b/simlib/netrunner/src/streaming/runtime_subscriber.rs @@ -95,7 +95,7 @@ where } fn subscribe_data_type() -> RecordType { - RecordType::Data + RecordType::Meta } }