From 962b72dcedd565a5741b4611a5d2b33482a1c2f1 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Thu, 22 Aug 2024 20:13:44 +0200 Subject: [PATCH] impl exp5~6: random network --- mixnet-rs/dissemination/src/iteration.rs | 25 +-- mixnet-rs/dissemination/src/main.rs | 1 - mixnet-rs/ordering/src/iteration.rs | 194 ++++++++++-------- mixnet-rs/ordering/src/main.rs | 3 +- mixnet-rs/protocol/Cargo.toml | 1 + mixnet-rs/protocol/src/lib.rs | 1 + .../src/topology.rs | 21 +- 7 files changed, 137 insertions(+), 109 deletions(-) rename mixnet-rs/{dissemination => protocol}/src/topology.rs (79%) diff --git a/mixnet-rs/dissemination/src/iteration.rs b/mixnet-rs/dissemination/src/iteration.rs index 9068993..e6c0757 100644 --- a/mixnet-rs/dissemination/src/iteration.rs +++ b/mixnet-rs/dissemination/src/iteration.rs @@ -1,16 +1,12 @@ -use std::error::Error; - use protocol::{ node::{MessageId, Node, NodeId}, queue::{Message, QueueConfig}, + topology::{build_topology, save_topology}, }; use rand::{rngs::StdRng, seq::SliceRandom, RngCore, SeedableRng}; use rustc_hash::FxHashMap; -use crate::{ - paramset::ParamSet, - topology::{build_topology, Topology}, -}; +use crate::paramset::ParamSet; // An interval that the sender nodes send (schedule) new messages const MSG_INTERVAL: f32 = 1.0; @@ -172,23 +168,6 @@ fn relay_messages( }); } -fn save_topology(topology: &Topology, topology_path: &str) -> Result<(), Box> { - let mut wtr = csv::Writer::from_path(topology_path)?; - wtr.write_record(["node", "num_peers", "peers"])?; - - for (node, peers) in topology.iter().enumerate() { - let node: NodeId = node.try_into().unwrap(); - let peers_str: Vec = peers.iter().map(|peer_id| peer_id.to_string()).collect(); - wtr.write_record(&[ - node.to_string(), - peers.len().to_string(), - format!("[{}]", peers_str.join(",")), - ])?; - } - wtr.flush()?; - Ok(()) -} - struct SenderSelector { candidates: Vec, num_senders: NodeId, diff --git a/mixnet-rs/dissemination/src/main.rs b/mixnet-rs/dissemination/src/main.rs index af20679..a341e05 100644 --- a/mixnet-rs/dissemination/src/main.rs +++ b/mixnet-rs/dissemination/src/main.rs @@ -13,7 +13,6 @@ use paramset::{ExperimentId, ParamSet, SessionId, PARAMSET_CSV_COLUMNS}; mod iteration; mod paramset; -mod topology; #[derive(Debug, Parser)] #[command(name = "Message Dessemination Time Measurement")] diff --git a/mixnet-rs/ordering/src/iteration.rs b/mixnet-rs/ordering/src/iteration.rs index f4e0165..2a12ec1 100644 --- a/mixnet-rs/ordering/src/iteration.rs +++ b/mixnet-rs/ordering/src/iteration.rs @@ -3,8 +3,9 @@ use std::path::Path; use protocol::{ node::{MessageId, Node, NodeId}, queue::{Message, QueueConfig, QueueType}, + topology::{build_topology, save_topology}, }; -use rand::{rngs::StdRng, Rng, SeedableRng}; +use rand::{rngs::StdRng, seq::SliceRandom, Rng, RngCore, SeedableRng}; use rustc_hash::FxHashMap; use crate::{ordercoeff::Sequence, paramset::ParamSet}; @@ -12,38 +13,6 @@ use crate::{ordercoeff::Sequence, paramset::ParamSet}; const RECEIVER_ID: NodeId = NodeId::MAX; pub fn run_iteration( - paramset: ParamSet, - seed: u64, - out_latency_path: &str, - out_sent_sequence_path: &str, - out_received_sequence_path: &str, - out_data_msg_counts_path: &str, - out_ordering_coeff_path: &str, -) { - if paramset.random_topology { - run_iteration_with_random_topology( - paramset, - seed, - out_latency_path, - out_sent_sequence_path, - out_received_sequence_path, - out_data_msg_counts_path, - out_ordering_coeff_path, - ) - } else { - run_iteration_without_random_topology( - paramset, - seed, - out_latency_path, - out_sent_sequence_path, - out_received_sequence_path, - out_data_msg_counts_path, - out_ordering_coeff_path, - ) - } -} - -fn run_iteration_without_random_topology( paramset: ParamSet, seed: u64, out_latency_path: &str, @@ -51,9 +20,8 @@ fn run_iteration_without_random_topology( out_received_sequence_path_prefix: &str, out_queue_data_msg_counts_path: &str, out_ordering_coeff_path: &str, + out_topology_path: &str, ) { - assert!(!paramset.random_topology); - // Ensure that all output files do not exist for path in &[ out_latency_path, @@ -64,44 +32,11 @@ fn run_iteration_without_random_topology( assert!(!Path::new(path).exists(), "File already exists: {path}"); } - // Initialize mix nodes - let mut next_node_id: NodeId = 0; - let mut mixnodes: FxHashMap = FxHashMap::default(); - let mut paths: Vec> = Vec::with_capacity(paramset.num_paths as usize); - for _ in 0..paramset.num_paths { - let mut ids = Vec::with_capacity(paramset.num_mixes as usize); - for _ in 0..paramset.num_mixes { - let id = next_node_id; - next_node_id += 1; - mixnodes.insert( - id, - Node::new( - QueueConfig { - queue_type: paramset.queue_type, - seed, - min_queue_size: paramset.min_queue_size, - }, - paramset.peering_degree, - paramset.random_topology, // disable cache - ), - ); - ids.push(id); - } - paths.push(ids); - } - - // Connect mix nodes - for path in paths.iter() { - for (i, id) in path.iter().enumerate() { - if i != path.len() - 1 { - let peer_id = path[i + 1]; - mixnodes.get_mut(id).unwrap().connect(peer_id); - } else { - mixnodes.get_mut(id).unwrap().connect(RECEIVER_ID); - } - } - } - let sender_peers: Vec = paths.iter().map(|path| path[0]).collect(); + let (mut mixnodes, sender_peers) = if paramset.random_topology { + build_random_network(¶mset, seed, out_topology_path) + } else { + build_striped_network(¶mset, seed) + }; let mut next_msg_id: MessageId = 0; @@ -235,17 +170,112 @@ fn run_iteration_without_random_topology( } } -fn run_iteration_with_random_topology( - paramset: ParamSet, +fn build_striped_network(paramset: &ParamSet, seed: u64) -> (FxHashMap, Vec) { + assert!(!paramset.random_topology); + let mut next_node_id: NodeId = 0; + let mut queue_seed_rng = StdRng::seed_from_u64(seed); + let mut mixnodes: FxHashMap = FxHashMap::default(); + let mut paths: Vec> = Vec::with_capacity(paramset.num_paths as usize); + for _ in 0..paramset.num_paths { + let mut ids = Vec::with_capacity(paramset.num_mixes as usize); + for _ in 0..paramset.num_mixes { + let id = next_node_id; + next_node_id += 1; + mixnodes.insert( + id, + Node::new( + QueueConfig { + queue_type: paramset.queue_type, + seed: queue_seed_rng.next_u64(), + min_queue_size: paramset.min_queue_size, + }, + paramset.peering_degree, + false, // disable cache + ), + ); + ids.push(id); + } + paths.push(ids); + } + + // Connect mix nodes + for path in paths.iter() { + for (i, id) in path.iter().enumerate() { + if i != path.len() - 1 { + let peer_id = path[i + 1]; + mixnodes.get_mut(id).unwrap().connect(peer_id); + } else { + mixnodes.get_mut(id).unwrap().connect(RECEIVER_ID); + } + } + } + let sender_peers: Vec = paths.iter().map(|path| *path.first().unwrap()).collect(); + (mixnodes, sender_peers) +} + +fn build_random_network( + paramset: &ParamSet, seed: u64, - out_latency_path: &str, - out_sent_sequence_path: &str, - out_received_sequence_path: &str, - out_data_msg_counts_path: &str, - out_ordering_coeff_path: &str, -) { + out_topology_path: &str, +) -> (FxHashMap, Vec) { assert!(paramset.random_topology); - todo!() + // Init mix nodes + let mut queue_seed_rng = StdRng::seed_from_u64(seed); + let mut mixnodes: FxHashMap = FxHashMap::default(); + for id in 0..paramset.num_mixes { + mixnodes.insert( + id, + Node::new( + QueueConfig { + queue_type: paramset.queue_type, + seed: queue_seed_rng.next_u64(), + min_queue_size: paramset.min_queue_size, + }, + paramset.peering_degree, + true, // enable cache + ), + ); + } + + // Choose sender's peers and receiver's peers randomly + let mut peers_rng = StdRng::seed_from_u64(seed); + let mut candidates: Vec = (0..paramset.num_mixes).collect(); + assert!(candidates.len() >= paramset.peering_degree as usize); + candidates.as_mut_slice().shuffle(&mut peers_rng); + let sender_peers: Vec = candidates + .iter() + .cloned() + .take(paramset.peering_degree as usize) + .collect(); + candidates.as_mut_slice().shuffle(&mut peers_rng); + let receiver_peers: Vec = candidates + .iter() + .cloned() + .take(paramset.peering_degree as usize) + .collect(); + + // Connect mix nodes + let topology = build_topology( + paramset.num_mixes, + &vec![paramset.peering_degree; paramset.num_mixes as usize], + seed, + ); + save_topology(&topology, out_topology_path).unwrap(); + for (node_id, peers) in topology.iter().enumerate() { + peers.iter().for_each(|peer_id| { + mixnodes + .get_mut(&(node_id.try_into().unwrap())) + .unwrap() + .connect(*peer_id); + }); + } + + // Connect the selected mix nodes with the receiver + for id in receiver_peers.iter() { + mixnodes.get_mut(id).unwrap().connect(RECEIVER_ID); + } + + (mixnodes, sender_peers) } fn try_probability(rng: &mut StdRng, prob: f32) -> bool { diff --git a/mixnet-rs/ordering/src/main.rs b/mixnet-rs/ordering/src/main.rs index 7b3ea6f..3265fcd 100644 --- a/mixnet-rs/ordering/src/main.rs +++ b/mixnet-rs/ordering/src/main.rs @@ -76,9 +76,10 @@ fn main() { i as u64, &format!("{paramset_dir}/iteration_{i}_latency.csv"), &format!("{paramset_dir}/iteration_{i}_sent_seq.csv"), - &format!("{paramset_dir}/iteration_{i}_recv_seq.csv"), + &format!("{paramset_dir}/iteration_{i}_recv_seq"), &format!("{paramset_dir}/iteration_{i}_data_msg_counts.csv"), &format!("{paramset_dir}/iteration_{i}_ordering_coeff.csv"), + &format!("{paramset_dir}/iteration_{i}_topology.csv"), ); tracing::info!("ParamSet:{}, Iteration:{} completed.", paramset.id, i); } diff --git a/mixnet-rs/protocol/Cargo.toml b/mixnet-rs/protocol/Cargo.toml index e91a783..cd05301 100644 --- a/mixnet-rs/protocol/Cargo.toml +++ b/mixnet-rs/protocol/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +csv = "1.3.0" rand = "0.8.5" rustc-hash = "2.0.0" strum = "0.26.3" diff --git a/mixnet-rs/protocol/src/lib.rs b/mixnet-rs/protocol/src/lib.rs index c848b2b..5a2da42 100644 --- a/mixnet-rs/protocol/src/lib.rs +++ b/mixnet-rs/protocol/src/lib.rs @@ -1,2 +1,3 @@ pub mod node; pub mod queue; +pub mod topology; diff --git a/mixnet-rs/dissemination/src/topology.rs b/mixnet-rs/protocol/src/topology.rs similarity index 79% rename from mixnet-rs/dissemination/src/topology.rs rename to mixnet-rs/protocol/src/topology.rs index 41ad54c..ec86e5c 100644 --- a/mixnet-rs/dissemination/src/topology.rs +++ b/mixnet-rs/protocol/src/topology.rs @@ -1,6 +1,6 @@ -use std::collections::HashSet; +use std::{collections::HashSet, error::Error}; -use protocol::node::NodeId; +use crate::node::NodeId; use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng}; pub type Topology = Vec>; @@ -69,3 +69,20 @@ fn dfs(topology: &[HashSet], start_node: NodeId) -> HashSet { } visited } + +pub fn save_topology(topology: &Topology, path: &str) -> Result<(), Box> { + let mut wtr = csv::Writer::from_path(path)?; + wtr.write_record(["node", "num_peers", "peers"])?; + + for (node, peers) in topology.iter().enumerate() { + let node: NodeId = node.try_into().unwrap(); + let peers_str: Vec = peers.iter().map(|peer_id| peer_id.to_string()).collect(); + wtr.write_record(&[ + node.to_string(), + peers.len().to_string(), + format!("[{}]", peers_str.join(",")), + ])?; + } + wtr.flush()?; + Ok(()) +}