From f63db0e46fbe978b558bf4a7f0fd09b2609d4a55 Mon Sep 17 00:00:00 2001 From: Gusto Date: Tue, 5 Nov 2024 13:49:46 +0200 Subject: [PATCH] Network interface in the mixnode --- network-runner/src/bin/app/main.rs | 63 ++++++++++++++++++------------ network-runner/src/node/mix/mod.rs | 28 +++++++++++-- network-runner/src/node/mod.rs | 1 - network-runner/src/runner/mod.rs | 1 + network-runner/src/settings.rs | 3 ++ network-runner/src/warding/ttf.rs | 2 +- 6 files changed, 68 insertions(+), 30 deletions(-) diff --git a/network-runner/src/bin/app/main.rs b/network-runner/src/bin/app/main.rs index cc4eada..5032b74 100644 --- a/network-runner/src/bin/app/main.rs +++ b/network-runner/src/bin/app/main.rs @@ -6,13 +6,14 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; // crates use anyhow::Ok; use clap::Parser; +use crossbeam::channel; use nomos_simulations_network_runner::network::behaviour::create_behaviours; use nomos_simulations_network_runner::network::regions::{create_regions, RegionsData}; -use nomos_simulations_network_runner::network::Network; +use nomos_simulations_network_runner::network::{InMemoryNetworkInterface, Network}; use nomos_simulations_network_runner::node::mix::{ MixMessage, MixNode, MixNodeState, MixnodeSettings, }; -use nomos_simulations_network_runner::node::{Node, NodeId, NodeIdExt}; +use nomos_simulations_network_runner::node::{NodeId, NodeIdExt}; use nomos_simulations_network_runner::output_processors::{OutData, Record}; use nomos_simulations_network_runner::runner::{BoxedNode, SimulationRunnerHandle}; #[cfg(feature = "polars")] @@ -25,7 +26,6 @@ use rand::prelude::IteratorRandom; use rand::rngs::SmallRng; use rand::seq::SliceRandom; use rand::SeedableRng; -use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use serde::de::DeserializeOwned; use serde::Serialize; // internal @@ -46,8 +46,6 @@ pub struct SimulationApp { #[clap(long, default_value = "stdout")] log_to: log::LogOutput, #[clap(long)] - dump_overlay_info: bool, - #[clap(long)] no_netcap: bool, } @@ -58,7 +56,6 @@ impl SimulationApp { stream_type, log_format: _, log_to: _, - dump_overlay_info, no_netcap, } = self; let simulation_settings: SimulationSettings = load_json_from_file(&input_settings)?; @@ -82,23 +79,16 @@ impl SimulationApp { let ids = node_ids.clone(); let network = Arc::new(Mutex::new(Network::::new(regions_data, seed))); - // if dump_overlay_info { - // dump_json_to_file( - // Path::new("overlay_info.json"), - // &overlay_node::overlay_info( - // node_ids.clone(), - // node_ids.first().copied().unwrap(), - // &simulation_settings.overlay_settings, - // ), - // )?; - // } - let nodes: Vec<_> = node_ids .iter() .copied() .map(|node_id| { + let mut network = network.lock(); create_boxed_mixnode( node_id, + &mut network, + simulation_settings.clone(), + no_netcap, MixnodeSettings { connected_peers: ids .iter() @@ -119,9 +109,39 @@ impl SimulationApp { fn create_boxed_mixnode( node_id: NodeId, - settings: MixnodeSettings, + network: &mut Network, + simulation_settings: SimulationSettings, + no_netcap: bool, + mixnode_settings: MixnodeSettings, ) -> BoxedNode { - Box::new(MixNode::new(node_id, settings)) + let (node_message_broadcast_sender, node_message_broadcast_receiver) = channel::unbounded(); + let (node_message_sender, node_message_receiver) = channel::unbounded(); + // Dividing milliseconds in second by milliseconds in the step. + let step_time_as_second_fraction = + simulation_settings.step_time.subsec_millis() as f32 / 1_000_000_f32; + let capacity_bps = if no_netcap { + None + } else { + simulation_settings + .node_settings + .network_capacity_kbps + .map(|c| (c as f32 * 1024.0 * step_time_as_second_fraction) as u32) + }; + let network_message_receiver = { + network.connect( + node_id, + capacity_bps, + node_message_receiver, + node_message_broadcast_receiver, + ) + }; + let network_interface = InMemoryNetworkInterface::new( + node_id, + node_message_broadcast_sender, + node_message_sender, + network_message_receiver, + ); + Box::new(MixNode::new(node_id, mixnode_settings, network_interface)) } fn run( @@ -190,11 +210,6 @@ fn load_json_from_file(path: &Path) -> anyhow::Result { Ok(serde_json::from_reader(f)?) } -fn dump_json_to_file(path: &Path, data: &T) -> anyhow::Result<()> { - let f = File::create(path).map_err(Box::new)?; - Ok(serde_json::to_writer(f, data)?) -} - fn main() -> anyhow::Result<()> { let app: SimulationApp = SimulationApp::parse(); log::config_tracing(app.log_format, &app.log_to); diff --git a/network-runner/src/node/mix/mod.rs b/network-runner/src/node/mix/mod.rs index b11ece2..ddd799b 100644 --- a/network-runner/src/node/mix/mod.rs +++ b/network-runner/src/node/mix/mod.rs @@ -1,6 +1,8 @@ use serde::{Deserialize, Serialize}; use std::time::Duration; +use crate::network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize}; + use super::{Node, NodeId}; #[derive(Debug, Default, Copy, Clone, Serialize, Deserialize)] @@ -13,6 +15,12 @@ pub enum MixMessage { Dummy(String), } +impl PayloadSize for MixMessage { + fn size_bytes(&self) -> u32 { + todo!() + } +} + pub struct MixnodeSettings { pub connected_peers: Vec, } @@ -21,15 +29,21 @@ pub struct MixnodeSettings { pub struct MixNode { id: NodeId, state: MixNodeState, - settings: MixnodeSettings, + _settings: MixnodeSettings, + network_interface: InMemoryNetworkInterface, } impl MixNode { - pub fn new(id: NodeId, settings: MixnodeSettings) -> Self { + pub fn new( + id: NodeId, + settings: MixnodeSettings, + network_interface: InMemoryNetworkInterface, + ) -> Self { Self { id, + network_interface, + _settings: settings, state: MixNodeState::default(), - settings, } } } @@ -48,6 +62,12 @@ impl Node for MixNode { } fn step(&mut self, _: Duration) { - todo!() + let _messages = self.network_interface.receive_messages(); + + // Do stuff on the messages; + // Network interface can be passed into the functions for outputting the messages: + // ```rust + // self.network_interface.send_message(receiving_node_id, payload); + // ``` } } diff --git a/network-runner/src/node/mod.rs b/network-runner/src/node/mod.rs index 5c85f15..aab90c9 100644 --- a/network-runner/src/node/mod.rs +++ b/network-runner/src/node/mod.rs @@ -4,7 +4,6 @@ pub mod mix; // std use std::{ - collections::HashMap, ops::{Deref, DerefMut}, sync::Arc, time::Duration, diff --git a/network-runner/src/runner/mod.rs b/network-runner/src/runner/mod.rs index cb002d5..ce90308 100644 --- a/network-runner/src/runner/mod.rs +++ b/network-runner/src/runner/mod.rs @@ -166,6 +166,7 @@ where match self.runner_settings.clone() { RunnerSettings::Sync => sync_runner::simulate(self, step_time), + RunnerSettings::Async { chunks } => async_runner::simulate(self, chunks, step_time), } } } diff --git a/network-runner/src/settings.rs b/network-runner/src/settings.rs index cdeb889..0d3fc30 100644 --- a/network-runner/src/settings.rs +++ b/network-runner/src/settings.rs @@ -9,6 +9,9 @@ use serde::{Deserialize, Serialize}; pub enum RunnerSettings { #[default] Sync, + Async { + chunks: usize, + }, } #[derive(Clone, Debug, Default, Serialize, Deserialize)] diff --git a/network-runner/src/warding/ttf.rs b/network-runner/src/warding/ttf.rs index 9c242ee..ac2f7d9 100644 --- a/network-runner/src/warding/ttf.rs +++ b/network-runner/src/warding/ttf.rs @@ -11,7 +11,7 @@ pub struct MaxViewWard { impl SimulationWard for MaxViewWard { type SimulationState = SimulationState; - fn analyze(&mut self, state: &Self::SimulationState) -> bool { + fn analyze(&mut self, _state: &Self::SimulationState) -> bool { // state.nodes.read().iter(); //.all(|n| n.current_view() >= self.max_count) todo!()