From be304046dd21e7207aa0cd1083bd31da4617aab4 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Sat, 14 Dec 2024 00:42:23 +0900 Subject: [PATCH] fix(blend): ensure all nodes are connected and have the same peering degree (#64) * fix(blendnet): ensure all nodes are connected and have the same peering degree * cover the case where it is impossible to ensure all nodes have the same num of conns --- simlib/blendnet-sims/src/main.rs | 17 +-- simlib/blendnet-sims/src/node/blend/mod.rs | 1 + .../blendnet-sims/src/node/blend/topology.rs | 127 ++++++++++++++++++ 3 files changed, 137 insertions(+), 8 deletions(-) create mode 100644 simlib/blendnet-sims/src/node/blend/topology.rs diff --git a/simlib/blendnet-sims/src/main.rs b/simlib/blendnet-sims/src/main.rs index 378716b..a6d131d 100644 --- a/simlib/blendnet-sims/src/main.rs +++ b/simlib/blendnet-sims/src/main.rs @@ -16,12 +16,12 @@ use netrunner::node::{NodeId, NodeIdExt}; use netrunner::output_processors::Record; use netrunner::runner::{BoxedNode, SimulationRunnerHandle}; use netrunner::streaming::{io::IOSubscriber, naive::NaiveSubscriber, StreamType}; +use node::blend::topology::build_topology; use nomos_blend::cover_traffic::CoverTrafficSettings; use nomos_blend::message_blend::{ CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings, }; use parking_lot::Mutex; -use rand::prelude::IteratorRandom; use rand::seq::SliceRandom; use rand::{RngCore, SeedableRng}; use rand_chacha::ChaCha12Rng; @@ -87,9 +87,10 @@ impl SimulationApp { let behaviours = create_behaviours(&settings.simulation_settings.network_settings); let regions_data = RegionsData::new(regions, behaviours); - let ids = node_ids.clone(); let network = Arc::new(Mutex::new(Network::::new(regions_data, seed))); + let topology = build_topology(&node_ids, settings.connected_peers_count, &mut rng); + let nodes: Vec<_> = node_ids .iter() .copied() @@ -101,11 +102,7 @@ impl SimulationApp { settings.simulation_settings.clone(), no_netcap, BlendnodeSettings { - connected_peers: ids - .iter() - .filter(|&id| id != &node_id) - .copied() - .choose_multiple(&mut rng, settings.connected_peers_count), + connected_peers: topology.get(&node_id).unwrap().iter().copied().collect(), data_message_lottery_interval: settings.data_message_lottery_interval, stake_proportion: settings.stake_proportion / node_ids.len() as f64, seed: rng.next_u64(), @@ -174,7 +171,11 @@ fn create_boxed_blendnode( node_message_sender, network_message_receiver, ); - Box::new(BlendNode::new(node_id, blendnode_settings, network_interface)) + Box::new(BlendNode::new( + node_id, + blendnode_settings, + network_interface, + )) } fn run( diff --git a/simlib/blendnet-sims/src/node/blend/mod.rs b/simlib/blendnet-sims/src/node/blend/mod.rs index 27924cb..6dd5c9e 100644 --- a/simlib/blendnet-sims/src/node/blend/mod.rs +++ b/simlib/blendnet-sims/src/node/blend/mod.rs @@ -4,6 +4,7 @@ mod message; pub mod scheduler; pub mod state; pub mod stream_wrapper; +pub mod topology; use crate::node::blend::consensus_streams::{Epoch, Slot}; use cached::{Cached, TimedCache}; diff --git a/simlib/blendnet-sims/src/node/blend/topology.rs b/simlib/blendnet-sims/src/node/blend/topology.rs new file mode 100644 index 0000000..f01e890 --- /dev/null +++ b/simlib/blendnet-sims/src/node/blend/topology.rs @@ -0,0 +1,127 @@ +use std::collections::{HashMap, HashSet}; + +use netrunner::node::NodeId; +use rand::{seq::SliceRandom, RngCore}; + +pub type Topology = HashMap>; + +/// Builds a topology with the given nodes and peering degree +/// by ensuring that all nodes are connected (no partition) +/// and all nodes have the same number of connections (only if possible). +pub fn build_topology(nodes: &[NodeId], peering_degree: usize, mut rng: R) -> Topology { + tracing::info!("Building topology: peering_degree:{}", peering_degree); + loop { + let mut topology = nodes + .iter() + .map(|&node| (node, HashSet::new())) + .collect::>(); + + for node in nodes.iter() { + // Collect peer candidates + let mut others = nodes + .iter() + .filter(|&other| { + // Check if the other node is not already connected to the current node + // and the other node has not reached the peering degree. + other != node + && !topology.get(node).unwrap().contains(other) + && topology.get(other).unwrap().len() < peering_degree + }) + .copied() + .collect::>(); + + // How many more connections the current node needs + let num_needs = peering_degree - topology.get(node).unwrap().len(); + // Sample peers as many as possible and connect them to the current node + let k = std::cmp::min(num_needs, others.len()); + others.as_mut_slice().shuffle(&mut rng); + others.into_iter().take(k).for_each(|peer| { + topology.get_mut(node).unwrap().insert(peer); + topology.get_mut(&peer).unwrap().insert(*node); + }); + } + + // Check constraints: + // - All nodes are connected (no partition) + // - All nodes have the same number of connections (if possible) + let can_have_equal_conns = (nodes.len() * peering_degree) % 2 == 0; + if check_all_connected(&topology) + && (!can_have_equal_conns || check_equal_conns(&topology, peering_degree)) + { + return topology; + } + tracing::info!("Topology doesn't meet constraints. Retrying..."); + } +} + +/// Checks if all nodes are connected (no partition) in the topology. +fn check_all_connected(topology: &Topology) -> bool { + let visited = dfs(topology, *topology.keys().next().unwrap()); + visited.len() == topology.len() +} + +/// Depth-first search to visit nodes in the topology. +fn dfs(topology: &Topology, start_node: NodeId) -> HashSet { + let mut visited: HashSet = HashSet::new(); + let mut stack: Vec = Vec::new(); + stack.push(start_node); + while let Some(node) = stack.pop() { + visited.insert(node); + for peer in topology.get(&node).unwrap().iter() { + if !visited.contains(peer) { + stack.push(*peer); + } + } + } + visited +} + +/// Checks if all nodes have the same number of connections. +fn check_equal_conns(topology: &Topology, peering_degree: usize) -> bool { + topology + .iter() + .all(|(_, peers)| peers.len() == peering_degree) +} + +#[cfg(test)] +mod tests { + use netrunner::node::NodeIdExt; + + use super::*; + + #[test] + fn test_build_topology_with_equal_conns() { + // If num_nodes * peering_degree is even, + // it is possible that all nodes can have the same number of connections + let nodes = (0..7).map(NodeId::from_index).collect::>(); + let peering_degree = 4; + + let mut rng = rand::rngs::OsRng; + let topology = build_topology(&nodes, peering_degree, &mut rng); + assert_eq!(topology.len(), nodes.len()); + for (node, peers) in topology.iter() { + assert!(peers.len() == peering_degree); + for peer in peers.iter() { + assert!(topology.get(peer).unwrap().contains(node)); + } + } + } + + #[test] + fn test_build_topology_with_inequal_conns() { + // If num_nodes * peering_degree is odd, + // it is impossible that all nodes can have the same number of connections + let nodes = (0..7).map(NodeId::from_index).collect::>(); + let peering_degree = 3; + + let mut rng = rand::rngs::OsRng; + let topology = build_topology(&nodes, peering_degree, &mut rng); + assert_eq!(topology.len(), nodes.len()); + for (node, peers) in topology.iter() { + assert!(peers.len() <= peering_degree); + for peer in peers.iter() { + assert!(topology.get(peer).unwrap().contains(node)); + } + } + } +}