feat(blend): calculate topology diameter

This commit is contained in:
Youngjoon Lee 2024-12-20 01:13:07 +09:00
parent be304046dd
commit 9f9c6d0d34
No known key found for this signature in database
GPG Key ID: 303963A54A81DD4D
2 changed files with 162 additions and 76 deletions

View File

@ -1,3 +1,4 @@
use std::collections::HashMap;
// std // std
use std::fs::File; use std::fs::File;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
@ -16,7 +17,7 @@ use netrunner::node::{NodeId, NodeIdExt};
use netrunner::output_processors::Record; use netrunner::output_processors::Record;
use netrunner::runner::{BoxedNode, SimulationRunnerHandle}; use netrunner::runner::{BoxedNode, SimulationRunnerHandle};
use netrunner::streaming::{io::IOSubscriber, naive::NaiveSubscriber, StreamType}; use netrunner::streaming::{io::IOSubscriber, naive::NaiveSubscriber, StreamType};
use node::blend::topology::build_topology; use node::blend::topology::Topology;
use nomos_blend::cover_traffic::CoverTrafficSettings; use nomos_blend::cover_traffic::CoverTrafficSettings;
use nomos_blend::message_blend::{ use nomos_blend::message_blend::{
CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings, CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings,
@ -26,7 +27,7 @@ use rand::seq::SliceRandom;
use rand::{RngCore, SeedableRng}; use rand::{RngCore, SeedableRng};
use rand_chacha::ChaCha12Rng; use rand_chacha::ChaCha12Rng;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::Serialize; use serde::{Deserialize, Serialize};
// internal // internal
use crate::node::blend::BlendNode; use crate::node::blend::BlendNode;
use crate::settings::SimSettings; use crate::settings::SimSettings;
@ -89,7 +90,8 @@ impl SimulationApp {
let network = Arc::new(Mutex::new(Network::<BlendMessage>::new(regions_data, seed))); let network = Arc::new(Mutex::new(Network::<BlendMessage>::new(regions_data, seed)));
let topology = build_topology(&node_ids, settings.connected_peers_count, &mut rng); let topology = Topology::new(&node_ids, settings.connected_peers_count, &mut rng);
log_topology(&topology);
let nodes: Vec<_> = node_ids let nodes: Vec<_> = node_ids
.iter() .iter()
@ -243,6 +245,20 @@ fn load_json_from_file<T: DeserializeOwned>(path: &Path) -> anyhow::Result<T> {
Ok(serde_json::from_reader(f)?) Ok(serde_json::from_reader(f)?)
} }
fn log_topology(topology: &Topology) {
let log = TopologyLog {
topology: topology.to_node_indices(),
diameter: topology.diameter(),
};
tracing::info!("Topology: {}", serde_json::to_string(&log).unwrap());
}
#[derive(Debug, Serialize, Deserialize)]
struct TopologyLog {
topology: HashMap<usize, Vec<usize>>,
diameter: usize,
}
fn main() -> anyhow::Result<()> { fn main() -> anyhow::Result<()> {
let app: SimulationApp = SimulationApp::parse(); let app: SimulationApp = SimulationApp::parse();
let maybe_guard = log::config_tracing(app.log_format, &app.log_to, app.with_metrics); let maybe_guard = log::config_tracing(app.log_format, &app.log_to, app.with_metrics);

View File

@ -1,14 +1,16 @@
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use netrunner::node::NodeId; use netrunner::node::{NodeId, NodeIdExt};
use rand::{seq::SliceRandom, RngCore}; use rand::{seq::SliceRandom, RngCore};
pub type Topology = HashMap<NodeId, HashSet<NodeId>>; #[derive(Clone)]
pub struct Topology(HashMap<NodeId, HashSet<NodeId>>);
impl Topology {
/// Builds a topology with the given nodes and peering degree /// Builds a topology with the given nodes and peering degree
/// by ensuring that all nodes are connected (no partition) /// by ensuring that all nodes are connected (no partition)
/// and all nodes have the same number of connections (only if possible). /// and all nodes have the same number of connections (only if possible).
pub fn build_topology<R: RngCore>(nodes: &[NodeId], peering_degree: usize, mut rng: R) -> Topology { pub fn new<R: RngCore>(nodes: &[NodeId], peering_degree: usize, mut rng: R) -> Self {
tracing::info!("Building topology: peering_degree:{}", peering_degree); tracing::info!("Building topology: peering_degree:{}", peering_degree);
loop { loop {
let mut topology = nodes let mut topology = nodes
@ -44,9 +46,10 @@ pub fn build_topology<R: RngCore>(nodes: &[NodeId], peering_degree: usize, mut r
// Check constraints: // Check constraints:
// - All nodes are connected (no partition) // - All nodes are connected (no partition)
// - All nodes have the same number of connections (if possible) // - All nodes have the same number of connections (if possible)
let topology = Self(topology);
let can_have_equal_conns = (nodes.len() * peering_degree) % 2 == 0; let can_have_equal_conns = (nodes.len() * peering_degree) % 2 == 0;
if check_all_connected(&topology) if topology.check_all_connected()
&& (!can_have_equal_conns || check_equal_conns(&topology, peering_degree)) && (!can_have_equal_conns || topology.check_equal_conns(peering_degree))
{ {
return topology; return topology;
} }
@ -55,19 +58,19 @@ pub fn build_topology<R: RngCore>(nodes: &[NodeId], peering_degree: usize, mut r
} }
/// Checks if all nodes are connected (no partition) in the topology. /// Checks if all nodes are connected (no partition) in the topology.
fn check_all_connected(topology: &Topology) -> bool { fn check_all_connected(&self) -> bool {
let visited = dfs(topology, *topology.keys().next().unwrap()); let visited = self.dfs(*self.0.keys().next().unwrap());
visited.len() == topology.len() visited.len() == self.0.len()
} }
/// Depth-first search to visit nodes in the topology. /// Depth-first search to visit nodes in the topology.
fn dfs(topology: &Topology, start_node: NodeId) -> HashSet<NodeId> { fn dfs(&self, start_node: NodeId) -> HashSet<NodeId> {
let mut visited: HashSet<NodeId> = HashSet::new(); let mut visited: HashSet<NodeId> = HashSet::new();
let mut stack: Vec<NodeId> = Vec::new(); let mut stack: Vec<NodeId> = Vec::new();
stack.push(start_node); stack.push(start_node);
while let Some(node) = stack.pop() { while let Some(node) = stack.pop() {
visited.insert(node); visited.insert(node);
for peer in topology.get(&node).unwrap().iter() { for peer in self.0.get(&node).unwrap().iter() {
if !visited.contains(peer) { if !visited.contains(peer) {
stack.push(*peer); stack.push(*peer);
} }
@ -77,15 +80,70 @@ fn dfs(topology: &Topology, start_node: NodeId) -> HashSet<NodeId> {
} }
/// Checks if all nodes have the same number of connections. /// Checks if all nodes have the same number of connections.
fn check_equal_conns(topology: &Topology, peering_degree: usize) -> bool { fn check_equal_conns(&self, peering_degree: usize) -> bool {
topology self.0
.iter() .iter()
.all(|(_, peers)| peers.len() == peering_degree) .all(|(_, peers)| peers.len() == peering_degree)
} }
/// Calculate the diameter (longest path length) of the topology.
pub fn diameter(&self) -> usize {
// Calculate a diameter from each node and take the maximum
self.0
.keys()
.map(|&node| self.diameter_from(node))
.fold(0, usize::max)
}
/// Calculate a diameter (longest path length) of the topology from the start_node.
fn diameter_from(&self, start_node: NodeId) -> usize {
// start_node is visited at the beginning
let mut visited: HashSet<NodeId> = HashSet::from([start_node]);
// Count the number of hops to visit all nodes
let mut hop_count = 0;
let mut next_hop: HashSet<NodeId> = self.0.get(&start_node).unwrap().clone();
while !next_hop.is_empty() {
// First, visit all nodes in the next hop and increase the hop count
next_hop.iter().for_each(|&node| {
assert!(visited.insert(node));
});
hop_count += 1;
// Then, build the new next hop by collecting all peers of the current next hop
// except peers already visited
next_hop = next_hop
.iter()
.flat_map(|node| self.0.get(node).unwrap())
.filter(|&peer| !visited.contains(peer))
.copied()
.collect();
}
hop_count
}
pub fn get(&self, node: &NodeId) -> Option<&HashSet<NodeId>> {
self.0.get(node)
}
/// Converts all [`NodeId`]s in the topology to their indices.
pub fn to_node_indices(&self) -> HashMap<usize, Vec<usize>> {
self.0
.iter()
.map(|(node, peers)| {
(
node.index(),
peers.iter().map(|peer| peer.index()).collect(),
)
})
.collect()
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use netrunner::node::NodeIdExt; use netrunner::node::NodeIdExt;
use rand::SeedableRng;
use rand_chacha::ChaCha8Rng;
use super::*; use super::*;
@ -97,9 +155,9 @@ mod tests {
let peering_degree = 4; let peering_degree = 4;
let mut rng = rand::rngs::OsRng; let mut rng = rand::rngs::OsRng;
let topology = build_topology(&nodes, peering_degree, &mut rng); let topology = Topology::new(&nodes, peering_degree, &mut rng);
assert_eq!(topology.len(), nodes.len()); assert_eq!(topology.0.len(), nodes.len());
for (node, peers) in topology.iter() { for (node, peers) in topology.0.iter() {
assert!(peers.len() == peering_degree); assert!(peers.len() == peering_degree);
for peer in peers.iter() { for peer in peers.iter() {
assert!(topology.get(peer).unwrap().contains(node)); assert!(topology.get(peer).unwrap().contains(node));
@ -115,13 +173,25 @@ mod tests {
let peering_degree = 3; let peering_degree = 3;
let mut rng = rand::rngs::OsRng; let mut rng = rand::rngs::OsRng;
let topology = build_topology(&nodes, peering_degree, &mut rng); let topology = Topology::new(&nodes, peering_degree, &mut rng);
assert_eq!(topology.len(), nodes.len()); assert_eq!(topology.0.len(), nodes.len());
for (node, peers) in topology.iter() { for (node, peers) in topology.0.iter() {
assert!(peers.len() <= peering_degree); assert!(peers.len() <= peering_degree);
for peer in peers.iter() { for peer in peers.iter() {
assert!(topology.get(peer).unwrap().contains(node)); assert!(topology.get(peer).unwrap().contains(node));
} }
} }
} }
#[test]
fn test_diameter() {
let nodes = (0..100).map(NodeId::from_index).collect::<Vec<_>>();
let peering_degree = 4;
let mut rng = ChaCha8Rng::seed_from_u64(0);
let topology = Topology::new(&nodes, peering_degree, &mut rng);
let diameter = topology.diameter();
println!("diameter: {}", diameter);
assert!(diameter > 0);
assert!(diameter <= nodes.len());
}
} }