fix(blend): ensure all nodes are connected and have the same peering degree (#64)
* fix(blendnet): ensure all nodes are connected and have the same peering degree * cover the case where it is impossible to ensure all nodes have the same num of conns
This commit is contained in:
parent
45a2152bba
commit
be304046dd
|
@ -16,12 +16,12 @@ use netrunner::node::{NodeId, NodeIdExt};
|
||||||
use netrunner::output_processors::Record;
|
use netrunner::output_processors::Record;
|
||||||
use netrunner::runner::{BoxedNode, SimulationRunnerHandle};
|
use netrunner::runner::{BoxedNode, SimulationRunnerHandle};
|
||||||
use netrunner::streaming::{io::IOSubscriber, naive::NaiveSubscriber, StreamType};
|
use netrunner::streaming::{io::IOSubscriber, naive::NaiveSubscriber, StreamType};
|
||||||
|
use node::blend::topology::build_topology;
|
||||||
use nomos_blend::cover_traffic::CoverTrafficSettings;
|
use nomos_blend::cover_traffic::CoverTrafficSettings;
|
||||||
use nomos_blend::message_blend::{
|
use nomos_blend::message_blend::{
|
||||||
CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings,
|
CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings,
|
||||||
};
|
};
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use rand::prelude::IteratorRandom;
|
|
||||||
use rand::seq::SliceRandom;
|
use rand::seq::SliceRandom;
|
||||||
use rand::{RngCore, SeedableRng};
|
use rand::{RngCore, SeedableRng};
|
||||||
use rand_chacha::ChaCha12Rng;
|
use rand_chacha::ChaCha12Rng;
|
||||||
|
@ -87,9 +87,10 @@ impl SimulationApp {
|
||||||
let behaviours = create_behaviours(&settings.simulation_settings.network_settings);
|
let behaviours = create_behaviours(&settings.simulation_settings.network_settings);
|
||||||
let regions_data = RegionsData::new(regions, behaviours);
|
let regions_data = RegionsData::new(regions, behaviours);
|
||||||
|
|
||||||
let ids = node_ids.clone();
|
|
||||||
let network = Arc::new(Mutex::new(Network::<BlendMessage>::new(regions_data, seed)));
|
let network = Arc::new(Mutex::new(Network::<BlendMessage>::new(regions_data, seed)));
|
||||||
|
|
||||||
|
let topology = build_topology(&node_ids, settings.connected_peers_count, &mut rng);
|
||||||
|
|
||||||
let nodes: Vec<_> = node_ids
|
let nodes: Vec<_> = node_ids
|
||||||
.iter()
|
.iter()
|
||||||
.copied()
|
.copied()
|
||||||
|
@ -101,11 +102,7 @@ impl SimulationApp {
|
||||||
settings.simulation_settings.clone(),
|
settings.simulation_settings.clone(),
|
||||||
no_netcap,
|
no_netcap,
|
||||||
BlendnodeSettings {
|
BlendnodeSettings {
|
||||||
connected_peers: ids
|
connected_peers: topology.get(&node_id).unwrap().iter().copied().collect(),
|
||||||
.iter()
|
|
||||||
.filter(|&id| id != &node_id)
|
|
||||||
.copied()
|
|
||||||
.choose_multiple(&mut rng, settings.connected_peers_count),
|
|
||||||
data_message_lottery_interval: settings.data_message_lottery_interval,
|
data_message_lottery_interval: settings.data_message_lottery_interval,
|
||||||
stake_proportion: settings.stake_proportion / node_ids.len() as f64,
|
stake_proportion: settings.stake_proportion / node_ids.len() as f64,
|
||||||
seed: rng.next_u64(),
|
seed: rng.next_u64(),
|
||||||
|
@ -174,7 +171,11 @@ fn create_boxed_blendnode(
|
||||||
node_message_sender,
|
node_message_sender,
|
||||||
network_message_receiver,
|
network_message_receiver,
|
||||||
);
|
);
|
||||||
Box::new(BlendNode::new(node_id, blendnode_settings, network_interface))
|
Box::new(BlendNode::new(
|
||||||
|
node_id,
|
||||||
|
blendnode_settings,
|
||||||
|
network_interface,
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run<M, S, T>(
|
fn run<M, S, T>(
|
||||||
|
|
|
@ -4,6 +4,7 @@ mod message;
|
||||||
pub mod scheduler;
|
pub mod scheduler;
|
||||||
pub mod state;
|
pub mod state;
|
||||||
pub mod stream_wrapper;
|
pub mod stream_wrapper;
|
||||||
|
pub mod topology;
|
||||||
|
|
||||||
use crate::node::blend::consensus_streams::{Epoch, Slot};
|
use crate::node::blend::consensus_streams::{Epoch, Slot};
|
||||||
use cached::{Cached, TimedCache};
|
use cached::{Cached, TimedCache};
|
||||||
|
|
|
@ -0,0 +1,127 @@
|
||||||
|
use std::collections::{HashMap, HashSet};
|
||||||
|
|
||||||
|
use netrunner::node::NodeId;
|
||||||
|
use rand::{seq::SliceRandom, RngCore};
|
||||||
|
|
||||||
|
pub type Topology = HashMap<NodeId, HashSet<NodeId>>;
|
||||||
|
|
||||||
|
/// Builds a topology with the given nodes and peering degree
|
||||||
|
/// by ensuring that all nodes are connected (no partition)
|
||||||
|
/// and all nodes have the same number of connections (only if possible).
|
||||||
|
pub fn build_topology<R: RngCore>(nodes: &[NodeId], peering_degree: usize, mut rng: R) -> Topology {
|
||||||
|
tracing::info!("Building topology: peering_degree:{}", peering_degree);
|
||||||
|
loop {
|
||||||
|
let mut topology = nodes
|
||||||
|
.iter()
|
||||||
|
.map(|&node| (node, HashSet::new()))
|
||||||
|
.collect::<HashMap<_, _>>();
|
||||||
|
|
||||||
|
for node in nodes.iter() {
|
||||||
|
// Collect peer candidates
|
||||||
|
let mut others = nodes
|
||||||
|
.iter()
|
||||||
|
.filter(|&other| {
|
||||||
|
// Check if the other node is not already connected to the current node
|
||||||
|
// and the other node has not reached the peering degree.
|
||||||
|
other != node
|
||||||
|
&& !topology.get(node).unwrap().contains(other)
|
||||||
|
&& topology.get(other).unwrap().len() < peering_degree
|
||||||
|
})
|
||||||
|
.copied()
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
// How many more connections the current node needs
|
||||||
|
let num_needs = peering_degree - topology.get(node).unwrap().len();
|
||||||
|
// Sample peers as many as possible and connect them to the current node
|
||||||
|
let k = std::cmp::min(num_needs, others.len());
|
||||||
|
others.as_mut_slice().shuffle(&mut rng);
|
||||||
|
others.into_iter().take(k).for_each(|peer| {
|
||||||
|
topology.get_mut(node).unwrap().insert(peer);
|
||||||
|
topology.get_mut(&peer).unwrap().insert(*node);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check constraints:
|
||||||
|
// - All nodes are connected (no partition)
|
||||||
|
// - All nodes have the same number of connections (if possible)
|
||||||
|
let can_have_equal_conns = (nodes.len() * peering_degree) % 2 == 0;
|
||||||
|
if check_all_connected(&topology)
|
||||||
|
&& (!can_have_equal_conns || check_equal_conns(&topology, peering_degree))
|
||||||
|
{
|
||||||
|
return topology;
|
||||||
|
}
|
||||||
|
tracing::info!("Topology doesn't meet constraints. Retrying...");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Checks if all nodes are connected (no partition) in the topology.
|
||||||
|
fn check_all_connected(topology: &Topology) -> bool {
|
||||||
|
let visited = dfs(topology, *topology.keys().next().unwrap());
|
||||||
|
visited.len() == topology.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Depth-first search to visit nodes in the topology.
|
||||||
|
fn dfs(topology: &Topology, start_node: NodeId) -> HashSet<NodeId> {
|
||||||
|
let mut visited: HashSet<NodeId> = HashSet::new();
|
||||||
|
let mut stack: Vec<NodeId> = Vec::new();
|
||||||
|
stack.push(start_node);
|
||||||
|
while let Some(node) = stack.pop() {
|
||||||
|
visited.insert(node);
|
||||||
|
for peer in topology.get(&node).unwrap().iter() {
|
||||||
|
if !visited.contains(peer) {
|
||||||
|
stack.push(*peer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
visited
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Checks if all nodes have the same number of connections.
|
||||||
|
fn check_equal_conns(topology: &Topology, peering_degree: usize) -> bool {
|
||||||
|
topology
|
||||||
|
.iter()
|
||||||
|
.all(|(_, peers)| peers.len() == peering_degree)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use netrunner::node::NodeIdExt;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_build_topology_with_equal_conns() {
|
||||||
|
// If num_nodes * peering_degree is even,
|
||||||
|
// it is possible that all nodes can have the same number of connections
|
||||||
|
let nodes = (0..7).map(NodeId::from_index).collect::<Vec<_>>();
|
||||||
|
let peering_degree = 4;
|
||||||
|
|
||||||
|
let mut rng = rand::rngs::OsRng;
|
||||||
|
let topology = build_topology(&nodes, peering_degree, &mut rng);
|
||||||
|
assert_eq!(topology.len(), nodes.len());
|
||||||
|
for (node, peers) in topology.iter() {
|
||||||
|
assert!(peers.len() == peering_degree);
|
||||||
|
for peer in peers.iter() {
|
||||||
|
assert!(topology.get(peer).unwrap().contains(node));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_build_topology_with_inequal_conns() {
|
||||||
|
// If num_nodes * peering_degree is odd,
|
||||||
|
// it is impossible that all nodes can have the same number of connections
|
||||||
|
let nodes = (0..7).map(NodeId::from_index).collect::<Vec<_>>();
|
||||||
|
let peering_degree = 3;
|
||||||
|
|
||||||
|
let mut rng = rand::rngs::OsRng;
|
||||||
|
let topology = build_topology(&nodes, peering_degree, &mut rng);
|
||||||
|
assert_eq!(topology.len(), nodes.len());
|
||||||
|
for (node, peers) in topology.iter() {
|
||||||
|
assert!(peers.len() <= peering_degree);
|
||||||
|
for peer in peers.iter() {
|
||||||
|
assert!(topology.get(peer).unwrap().contains(node));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue