diff --git a/simlib/blendnet-sims/src/main.rs b/simlib/blendnet-sims/src/main.rs index 378716b..a6d131d 100644 --- a/simlib/blendnet-sims/src/main.rs +++ b/simlib/blendnet-sims/src/main.rs @@ -16,12 +16,12 @@ use netrunner::node::{NodeId, NodeIdExt}; use netrunner::output_processors::Record; use netrunner::runner::{BoxedNode, SimulationRunnerHandle}; use netrunner::streaming::{io::IOSubscriber, naive::NaiveSubscriber, StreamType}; +use node::blend::topology::build_topology; use nomos_blend::cover_traffic::CoverTrafficSettings; use nomos_blend::message_blend::{ CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings, }; use parking_lot::Mutex; -use rand::prelude::IteratorRandom; use rand::seq::SliceRandom; use rand::{RngCore, SeedableRng}; use rand_chacha::ChaCha12Rng; @@ -87,9 +87,10 @@ impl SimulationApp { let behaviours = create_behaviours(&settings.simulation_settings.network_settings); let regions_data = RegionsData::new(regions, behaviours); - let ids = node_ids.clone(); let network = Arc::new(Mutex::new(Network::::new(regions_data, seed))); + let topology = build_topology(&node_ids, settings.connected_peers_count, &mut rng); + let nodes: Vec<_> = node_ids .iter() .copied() @@ -101,11 +102,7 @@ impl SimulationApp { settings.simulation_settings.clone(), no_netcap, BlendnodeSettings { - connected_peers: ids - .iter() - .filter(|&id| id != &node_id) - .copied() - .choose_multiple(&mut rng, settings.connected_peers_count), + connected_peers: topology.get(&node_id).unwrap().iter().copied().collect(), data_message_lottery_interval: settings.data_message_lottery_interval, stake_proportion: settings.stake_proportion / node_ids.len() as f64, seed: rng.next_u64(), @@ -174,7 +171,11 @@ fn create_boxed_blendnode( node_message_sender, network_message_receiver, ); - Box::new(BlendNode::new(node_id, blendnode_settings, network_interface)) + Box::new(BlendNode::new( + node_id, + blendnode_settings, + network_interface, + )) } fn run( diff --git a/simlib/blendnet-sims/src/node/blend/mod.rs b/simlib/blendnet-sims/src/node/blend/mod.rs index 27924cb..6dd5c9e 100644 --- a/simlib/blendnet-sims/src/node/blend/mod.rs +++ b/simlib/blendnet-sims/src/node/blend/mod.rs @@ -4,6 +4,7 @@ mod message; pub mod scheduler; pub mod state; pub mod stream_wrapper; +pub mod topology; use crate::node::blend::consensus_streams::{Epoch, Slot}; use cached::{Cached, TimedCache}; diff --git a/simlib/blendnet-sims/src/node/blend/topology.rs b/simlib/blendnet-sims/src/node/blend/topology.rs new file mode 100644 index 0000000..2c6fc78 --- /dev/null +++ b/simlib/blendnet-sims/src/node/blend/topology.rs @@ -0,0 +1,110 @@ +use std::collections::{HashMap, HashSet}; + +use netrunner::node::NodeId; +use rand::{seq::SliceRandom, RngCore}; + +pub type Topology = HashMap>; + +/// Builds a topology with the given nodes and peering degree +/// by ensuring that all nodes are connected (no partition) and have the same peering degree. +pub fn build_topology(nodes: &[NodeId], peering_degree: usize, mut rng: R) -> Topology { + tracing::info!("Building topology: peering_degree:{}", peering_degree); + loop { + let mut topology = nodes + .iter() + .map(|&node| (node, HashSet::new())) + .collect::>(); + + for node in nodes.iter() { + // Collect peer candidates + let mut others = nodes + .iter() + .filter(|&other| { + // Check if the other node is not already connected to the current node + // and the other node has not reached the peering degree. + other != node + && !topology.get(node).unwrap().contains(other) + && topology.get(other).unwrap().len() < peering_degree + }) + .copied() + .collect::>(); + + // How many more connections the current node needs + let num_needs = peering_degree - topology.get(node).unwrap().len(); + // Sample peers as many as possible and connect them to the current node + let k = std::cmp::min(num_needs, others.len()); + others.as_mut_slice().shuffle(&mut rng); + others.into_iter().take(k).for_each(|peer| { + topology.get_mut(node).unwrap().insert(peer); + topology.get_mut(&peer).unwrap().insert(*node); + }); + } + + // Check constraints + let all_connected = check_all_connected(&topology); + let all_have_peering_degree = check_peering_degree(&topology, peering_degree); + if all_connected && all_have_peering_degree { + tracing::info!("Topology built successfully"); + return topology; + } else { + tracing::info!( + "Retrying to build topology: all_connected:{}, all_have_peering_degree:{}", + all_connected, + all_have_peering_degree + ); + } + } +} + +/// Checks if all nodes are connected (no partition) in the topology. +fn check_all_connected(topology: &Topology) -> bool { + let visited = dfs(topology, *topology.keys().next().unwrap()); + visited.len() == topology.len() +} + +/// Depth-first search to visit nodes in the topology. +fn dfs(topology: &Topology, start_node: NodeId) -> HashSet { + let mut visited: HashSet = HashSet::new(); + let mut stack: Vec = Vec::new(); + stack.push(start_node); + while let Some(node) = stack.pop() { + visited.insert(node); + for peer in topology.get(&node).unwrap().iter() { + if !visited.contains(peer) { + stack.push(*peer); + } + } + } + visited +} + +/// Checks if all nodes have the same peering degree in the topology. +fn check_peering_degree(topology: &Topology, peering_degree: usize) -> bool { + topology + .iter() + .all(|(_, peers)| peers.len() == peering_degree) +} + +#[cfg(test)] +mod tests { + use netrunner::node::NodeIdExt; + + use super::*; + + #[test] + fn test_build_topology() { + tracing_subscriber::fmt::init(); + + let nodes = (0..100).map(NodeId::from_index).collect::>(); + let peering_degree = 3; + let mut rng = rand::rngs::OsRng; + let topology = build_topology(&nodes, peering_degree, &mut rng); + assert_eq!(topology.len(), nodes.len()); + for (node, peers) in topology.iter() { + assert!(peers.len() == peering_degree); + for peer in peers.iter() { + assert!(topology.get(peer).unwrap().contains(node)); + } + } + } +}