diff --git a/simlib/blendnet-sims/src/main.rs b/simlib/blendnet-sims/src/main.rs index 9f4050c..0bb099a 100644 --- a/simlib/blendnet-sims/src/main.rs +++ b/simlib/blendnet-sims/src/main.rs @@ -16,6 +16,7 @@ use netrunner::node::{NodeId, NodeIdExt}; use netrunner::output_processors::Record; use netrunner::runner::{BoxedNode, SimulationRunnerHandle}; use netrunner::streaming::{io::IOSubscriber, naive::NaiveSubscriber, StreamType}; +use node::mix::topology::build_topology; use nomos_mix::cover_traffic::CoverTrafficSettings; use nomos_mix::message_blend::{ CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings, @@ -88,6 +89,12 @@ impl SimulationApp { let network = Arc::new(Mutex::new(Network::::new(regions_data, seed))); + let topology = build_topology( + &node_ids, + settings.conn_maintenance.peering_degree, + &mut rng, + ); + let nodes: Vec<_> = node_ids .iter() .copied() @@ -100,6 +107,7 @@ impl SimulationApp { no_netcap, MixnodeSettings { membership: node_ids.clone(), + topology: topology.clone(), data_message_lottery_interval: settings.data_message_lottery_interval, stake_proportion: settings.stake_proportion / node_ids.len() as f64, seed: rng.next_u64(), diff --git a/simlib/blendnet-sims/src/node/mix/mod.rs b/simlib/blendnet-sims/src/node/mix/mod.rs index cdfe2ae..6fa099b 100644 --- a/simlib/blendnet-sims/src/node/mix/mod.rs +++ b/simlib/blendnet-sims/src/node/mix/mod.rs @@ -4,6 +4,7 @@ mod message; pub mod scheduler; pub mod state; pub mod stream_wrapper; +pub mod topology; use crate::node::mix::consensus_streams::{Epoch, Slot}; use cached::{Cached, TimedCache}; @@ -39,6 +40,7 @@ use sha2::{Digest, Sha256}; use state::MixnodeState; use std::{pin::pin, task::Poll, time::Duration}; use stream_wrapper::CrossbeamReceiverStream; +use topology::Topology; #[derive(Debug, Clone)] pub struct MixMessage(Vec); @@ -52,6 +54,7 @@ impl PayloadSize for MixMessage { #[derive(Deserialize)] pub struct MixnodeSettings { pub membership: Vec, + pub topology: Topology, pub data_message_lottery_interval: Duration, pub stake_proportion: f64, pub seed: u64, @@ -144,10 +147,12 @@ impl MixNode { membership.clone(), ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), ); - conn_maintenance - .bootstrap() - .into_iter() - .for_each(|peer| conn_maintenance.add_connected_peer(peer)); + settings + .topology + .get(&id) + .unwrap() + .iter() + .for_each(|peer| conn_maintenance.add_connected_peer(*peer)); let (conn_maintenance_update_time_sender, conn_maintenance_update_time_receiver) = channel::unbounded(); let (persistent_sender, persistent_receiver) = channel::unbounded(); @@ -378,7 +383,7 @@ impl Node for MixNode { let effective_messages_series = Series::from_iter( monitors .values() - .map(|monitor| monitor.effective_messages as u64), + .map(|monitor| monitor.effective_messages.to_num::()), ); self.log_monitors(&effective_messages_series); } diff --git a/simlib/blendnet-sims/src/node/mix/topology.rs b/simlib/blendnet-sims/src/node/mix/topology.rs new file mode 100644 index 0000000..e288042 --- /dev/null +++ b/simlib/blendnet-sims/src/node/mix/topology.rs @@ -0,0 +1,63 @@ +use std::collections::{HashMap, HashSet}; + +use netrunner::node::NodeId; +use rand::{seq::SliceRandom, RngCore}; + +pub type Topology = HashMap>; + +pub fn build_topology(nodes: &[NodeId], peering_degree: usize, mut rng: R) -> Topology { + loop { + let mut topology = nodes + .iter() + .map(|&node| (node, HashSet::new())) + .collect::>(); + + for node in nodes.iter() { + let mut others = nodes + .iter() + .filter(|&other| { + // Check if the other node is not already connected to the current node + // and the other node has not reached the peering degree. + other != node + && !topology.get(node).unwrap().contains(other) + && topology.get(other).unwrap().len() < peering_degree + }) + .copied() + .collect::>(); + + // How many more connections the current node needs + let num_needs = peering_degree - topology.get(node).unwrap().len(); + // Sample peers as many as possible and connect them to the current node + let k = std::cmp::min(num_needs, others.len()); + others.as_mut_slice().shuffle(&mut rng); + others.into_iter().take(k).for_each(|peer| { + topology.get_mut(node).unwrap().insert(peer); + topology.get_mut(&peer).unwrap().insert(*node); + }); + } + + if are_all_nodes_connected(&topology) { + return topology; + } + } +} + +fn are_all_nodes_connected(topology: &Topology) -> bool { + let visited = dfs(topology, *topology.keys().next().unwrap()); + visited.len() == topology.len() +} + +fn dfs(topology: &Topology, start_node: NodeId) -> HashSet { + let mut visited: HashSet = HashSet::new(); + let mut stack: Vec = Vec::new(); + stack.push(start_node); + while let Some(node) = stack.pop() { + visited.insert(node); + for peer in topology.get(&node).unwrap().iter() { + if !visited.contains(peer) { + stack.push(*peer); + } + } + } + visited +}