update conn maintenance and add topology

This commit is contained in:
Youngjoon Lee 2024-12-10 14:51:34 +09:00
parent 1662a659e9
commit 137bd72435
No known key found for this signature in database
GPG Key ID: 303963A54A81DD4D
3 changed files with 81 additions and 5 deletions

View File

@ -16,6 +16,7 @@ use netrunner::node::{NodeId, NodeIdExt};
use netrunner::output_processors::Record;
use netrunner::runner::{BoxedNode, SimulationRunnerHandle};
use netrunner::streaming::{io::IOSubscriber, naive::NaiveSubscriber, StreamType};
use node::mix::topology::build_topology;
use nomos_mix::cover_traffic::CoverTrafficSettings;
use nomos_mix::message_blend::{
CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings,
@ -88,6 +89,12 @@ impl SimulationApp {
let network = Arc::new(Mutex::new(Network::<MixMessage>::new(regions_data, seed)));
let topology = build_topology(
&node_ids,
settings.conn_maintenance.peering_degree,
&mut rng,
);
let nodes: Vec<_> = node_ids
.iter()
.copied()
@ -100,6 +107,7 @@ impl SimulationApp {
no_netcap,
MixnodeSettings {
membership: node_ids.clone(),
topology: topology.clone(),
data_message_lottery_interval: settings.data_message_lottery_interval,
stake_proportion: settings.stake_proportion / node_ids.len() as f64,
seed: rng.next_u64(),

View File

@ -4,6 +4,7 @@ mod message;
pub mod scheduler;
pub mod state;
pub mod stream_wrapper;
pub mod topology;
use crate::node::mix::consensus_streams::{Epoch, Slot};
use cached::{Cached, TimedCache};
@ -39,6 +40,7 @@ use sha2::{Digest, Sha256};
use state::MixnodeState;
use std::{pin::pin, task::Poll, time::Duration};
use stream_wrapper::CrossbeamReceiverStream;
use topology::Topology;
#[derive(Debug, Clone)]
pub struct MixMessage(Vec<u8>);
@ -52,6 +54,7 @@ impl PayloadSize for MixMessage {
#[derive(Deserialize)]
pub struct MixnodeSettings {
pub membership: Vec<NodeId>,
pub topology: Topology,
pub data_message_lottery_interval: Duration,
pub stake_proportion: f64,
pub seed: u64,
@ -144,10 +147,12 @@ impl MixNode {
membership.clone(),
ChaCha12Rng::from_rng(&mut rng_generator).unwrap(),
);
conn_maintenance
.bootstrap()
.into_iter()
.for_each(|peer| conn_maintenance.add_connected_peer(peer));
settings
.topology
.get(&id)
.unwrap()
.iter()
.for_each(|peer| conn_maintenance.add_connected_peer(*peer));
let (conn_maintenance_update_time_sender, conn_maintenance_update_time_receiver) =
channel::unbounded();
let (persistent_sender, persistent_receiver) = channel::unbounded();
@ -378,7 +383,7 @@ impl Node for MixNode {
let effective_messages_series = Series::from_iter(
monitors
.values()
.map(|monitor| monitor.effective_messages as u64),
.map(|monitor| monitor.effective_messages.to_num::<u64>()),
);
self.log_monitors(&effective_messages_series);
}

View File

@ -0,0 +1,63 @@
use std::collections::{HashMap, HashSet};
use netrunner::node::NodeId;
use rand::{seq::SliceRandom, RngCore};
pub type Topology = HashMap<NodeId, HashSet<NodeId>>;
pub fn build_topology<R: RngCore>(nodes: &[NodeId], peering_degree: usize, mut rng: R) -> Topology {
loop {
let mut topology = nodes
.iter()
.map(|&node| (node, HashSet::new()))
.collect::<HashMap<_, _>>();
for node in nodes.iter() {
let mut others = nodes
.iter()
.filter(|&other| {
// Check if the other node is not already connected to the current node
// and the other node has not reached the peering degree.
other != node
&& !topology.get(node).unwrap().contains(other)
&& topology.get(other).unwrap().len() < peering_degree
})
.copied()
.collect::<Vec<_>>();
// How many more connections the current node needs
let num_needs = peering_degree - topology.get(node).unwrap().len();
// Sample peers as many as possible and connect them to the current node
let k = std::cmp::min(num_needs, others.len());
others.as_mut_slice().shuffle(&mut rng);
others.into_iter().take(k).for_each(|peer| {
topology.get_mut(node).unwrap().insert(peer);
topology.get_mut(&peer).unwrap().insert(*node);
});
}
if are_all_nodes_connected(&topology) {
return topology;
}
}
}
fn are_all_nodes_connected(topology: &Topology) -> bool {
let visited = dfs(topology, *topology.keys().next().unwrap());
visited.len() == topology.len()
}
fn dfs(topology: &Topology, start_node: NodeId) -> HashSet<NodeId> {
let mut visited: HashSet<NodeId> = HashSet::new();
let mut stack: Vec<NodeId> = Vec::new();
stack.push(start_node);
while let Some(node) = stack.pop() {
visited.insert(node);
for peer in topology.get(&node).unwrap().iter() {
if !visited.contains(peer) {
stack.push(*peer);
}
}
}
visited
}