From cd35121d5e4a9434413954f10a9d35f72f214645 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Sat, 17 Aug 2024 03:41:46 +0900 Subject: [PATCH] add queues --- mixnet-rs/dissemination/src/iteration.rs | 119 ++++++++ mixnet-rs/dissemination/src/node.rs | 20 +- mixnet-rs/dissemination/src/paramset.rs | 349 ++++++++++++++++++++++ mixnet-rs/dissemination/src/queue.rs | 358 +++++++++++++++++++++++ mixnet-rs/dissemination/src/topology.rs | 68 +++++ 5 files changed, 907 insertions(+), 7 deletions(-) create mode 100644 mixnet-rs/dissemination/src/iteration.rs create mode 100644 mixnet-rs/dissemination/src/paramset.rs create mode 100644 mixnet-rs/dissemination/src/queue.rs create mode 100644 mixnet-rs/dissemination/src/topology.rs diff --git a/mixnet-rs/dissemination/src/iteration.rs b/mixnet-rs/dissemination/src/iteration.rs new file mode 100644 index 0000000..8c7374b --- /dev/null +++ b/mixnet-rs/dissemination/src/iteration.rs @@ -0,0 +1,119 @@ +use std::{collections::HashMap, error::Error}; + +use rand::{rngs::StdRng, RngCore, SeedableRng}; + +use crate::{ + node::{MessageId, Node, NodeId}, + paramset::ParamSet, + queue::QueueConfig, + topology::build_topology, +}; + +pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology_path: &str) { + // Initialize nodes + let mut nodes: HashMap = HashMap::new(); + let mut queue_seed_rng = StdRng::seed_from_u64(seed); + for i in 0..paramset.num_nodes { + nodes.insert( + i, + Node::new(QueueConfig { + queue_type: paramset.queue_type, + seed: queue_seed_rng.next_u64(), + min_queue_size: paramset.min_queue_size, + }), + ); + } + + // Connect nodes + let topology = build_topology(paramset.num_nodes, paramset.peering_degree, seed); + save_topology(&topology, topology_path).unwrap(); + for (node_id, peers) in topology.iter() { + peers.iter().for_each(|peer_id| { + nodes.get_mut(node_id).unwrap().connect(*peer_id); + }); + } + + let sender_ids: Vec = (0..paramset.num_senders).collect(); + + let mut vtime: f32 = 0.0; + let interval: f32 = 1.0 / paramset.transmission_rate as f32; + let mut next_msg_id: MessageId = 0; + let mut sent_msgs: HashMap = HashMap::new(); + let mut num_disseminated_msgs = 0; + + let mut writer = csv::Writer::from_path(out_csv_path).unwrap(); + writer + .write_record(["dissemination_time", "sent_time", "all_received_time"]) + .unwrap(); + + loop { + // Send new messages + assert!(sent_msgs.len() % (paramset.num_senders as usize) == 0); + if sent_msgs.len() / (paramset.num_senders as usize) < paramset.num_sent_msgs as usize { + for sender_id in sender_ids.iter() { + nodes.get_mut(sender_id).unwrap().send(next_msg_id); + sent_msgs.insert(next_msg_id, (vtime, 1)); + next_msg_id += 1; + } + } + + // Collect messages to relay + let mut all_msgs_to_relay = Vec::new(); + for (node_id, node) in nodes.iter_mut() { + let msgs_to_relay = node.read_queues(); + msgs_to_relay.iter().for_each(|(receiver_id, msg)| { + all_msgs_to_relay.push((*receiver_id, *msg, *node_id)); + }); + } + + // Relay the messages + all_msgs_to_relay + .into_iter() + .for_each(|(receiver_id, msg, sender_id)| { + if nodes.get_mut(&receiver_id).unwrap().receive(msg, sender_id) { + let (sent_time, num_received_nodes) = sent_msgs.get_mut(&msg).unwrap(); + *num_received_nodes += 1; + if *num_received_nodes == paramset.num_nodes { + let dissemination_time = vtime - *sent_time; + writer + .write_record(&[ + dissemination_time.to_string(), + sent_time.to_string(), + vtime.to_string(), + ]) + .unwrap(); + num_disseminated_msgs += 1; + } + } + }); + + // Check if all messages have been disseminated to all nodes. + if num_disseminated_msgs == (paramset.num_senders * paramset.num_sent_msgs) as usize { + break; + } + + vtime += interval; + } +} + +fn save_topology( + topology: &HashMap>, + topology_path: &str, +) -> Result<(), Box> { + let mut wtr = csv::Writer::from_path(topology_path)?; + wtr.write_record(["node", "num_peers", "peers"])?; + + let mut sorted_keys: Vec<&u16> = topology.keys().collect(); + sorted_keys.sort(); + for &node in &sorted_keys { + let peers = topology.get(node).unwrap(); + let peers_str: Vec = peers.iter().map(|peer_id| peer_id.to_string()).collect(); + wtr.write_record(&[ + node.to_string(), + peers.len().to_string(), + format!("\"[{}]\"", peers_str.join(",")), + ])?; + } + wtr.flush()?; + Ok(()) +} diff --git a/mixnet-rs/dissemination/src/node.rs b/mixnet-rs/dissemination/src/node.rs index f0cd822..46f693d 100644 --- a/mixnet-rs/dissemination/src/node.rs +++ b/mixnet-rs/dissemination/src/node.rs @@ -1,23 +1,29 @@ -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, HashSet}; + +use crate::queue::{new_queue, Queue, QueueConfig}; pub type NodeId = u16; pub type MessageId = u32; pub struct Node { - queues: HashMap>, + queue_config: QueueConfig, + queues: HashMap>>, received_msgs: HashSet, } impl Node { - pub fn new() -> Self { + pub fn new(queue_config: QueueConfig) -> Self { Node { + queue_config, queues: HashMap::new(), received_msgs: HashSet::new(), } } pub fn connect(&mut self, peer_id: NodeId) { - self.queues.entry(peer_id).or_default(); + self.queues + .entry(peer_id) + .or_insert(new_queue(&self.queue_config)); } pub fn num_queues(&self) -> usize { @@ -27,7 +33,7 @@ impl Node { pub fn send(&mut self, msg: MessageId) { assert!(self.received_msgs.insert(msg)); for (_, queue) in self.queues.iter_mut() { - queue.push_back(msg); + queue.push(msg); } } @@ -36,7 +42,7 @@ impl Node { if first_received { for (node_id, queue) in self.queues.iter_mut() { if *node_id != from { - queue.push_back(msg); + queue.push(msg); } } } @@ -46,7 +52,7 @@ impl Node { pub fn read_queues(&mut self) -> Vec<(NodeId, MessageId)> { let mut msgs_to_relay: Vec<(NodeId, MessageId)> = Vec::new(); for (node_id, queue) in self.queues.iter_mut() { - if let Some(msg) = queue.pop_front() { + if let Some(msg) = queue.pop() { msgs_to_relay.push((*node_id, msg)); } } diff --git a/mixnet-rs/dissemination/src/paramset.rs b/mixnet-rs/dissemination/src/paramset.rs new file mode 100644 index 0000000..d811f62 --- /dev/null +++ b/mixnet-rs/dissemination/src/paramset.rs @@ -0,0 +1,349 @@ +use crate::queue::QueueType; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[repr(u8)] +pub enum ExperimentId { + Experiment1 = 1, + Experiment2 = 2, + Experiment3 = 3, + Experiment4 = 4, +} + +impl std::str::FromStr for ExperimentId { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "1" | "Experiment1" => Ok(ExperimentId::Experiment1), + "2" | "Experiment2" => Ok(ExperimentId::Experiment2), + "3" | "Experiment3" => Ok(ExperimentId::Experiment3), + "4" | "Experiment4" => Ok(ExperimentId::Experiment4), + _ => Err(format!("Invalid experiment ID: {}", s)), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[repr(u8)] +pub enum SessionId { + Session1 = 1, + Session2 = 2, + Session2_1 = 21, +} + +impl std::str::FromStr for SessionId { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "1" | "Session1" => Ok(SessionId::Session1), + "2" | "Session2" => Ok(SessionId::Session2), + "2.1" | "Session21" => Ok(SessionId::Session2_1), + _ => Err(format!("Invalid session ID: {}", s)), + } + } +} + +pub const PARAMSET_CSV_COLUMNS: &[&str] = &[ + "paramset", + "num_nodes", + "peering_degree", + "min_queue_size", + "transmission_rate", + "num_sent_msgs", + "num_senders", + "queue_type", + "num_iterations", +]; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ParamSet { + pub id: u16, + pub num_nodes: u16, + pub peering_degree: u16, + pub min_queue_size: u16, + pub transmission_rate: u16, + pub num_sent_msgs: u16, + pub num_senders: u16, + pub queue_type: QueueType, + pub num_iterations: u16, +} + +impl ParamSet { + pub fn new_all_paramsets( + exp_id: ExperimentId, + session_id: SessionId, + queue_type: QueueType, + ) -> Vec { + match session_id { + SessionId::Session1 => Self::new_session1_paramsets(exp_id, queue_type), + SessionId::Session2 => Self::new_session2_paramsets(exp_id, queue_type), + SessionId::Session2_1 => Self::new_session2_1_paramsets(exp_id, queue_type), + } + } + + fn new_session1_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec { + let mut start_id: u16 = 1; + let mut paramsets: Vec = Vec::new(); + for &num_nodes in &[20, 40, 80] { + let (mut new_paramsets, next_start_id) = Self::new_paramsets( + start_id, + num_nodes, + &[num_nodes / 5, num_nodes / 4, num_nodes / 2], + &[num_nodes / 2, num_nodes, num_nodes * 2], + &[num_nodes / 2, num_nodes, num_nodes * 2], + |_| match exp_id { + ExperimentId::Experiment1 | ExperimentId::Experiment3 => vec![1], + ExperimentId::Experiment2 | ExperimentId::Experiment4 => vec![8, 16, 32], + }, + match exp_id { + ExperimentId::Experiment1 | ExperimentId::Experiment2 => vec![1], + ExperimentId::Experiment3 | ExperimentId::Experiment4 => { + vec![num_nodes / 10, num_nodes / 5, num_nodes / 2] + } + } + .as_slice(), + queue_type, + num_nodes / 2, + ); + paramsets.append(&mut new_paramsets); + start_id = next_start_id; + } + paramsets + } + + fn new_session2_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec { + let mut start_id: u16 = 1; + let mut paramsets: Vec = Vec::new(); + for &num_nodes in &[100, 1000, 10000] { + let (mut new_paramsets, next_start_id) = Self::new_paramsets( + start_id, + num_nodes, + &[4, 8, 16], + &[10, 50, 100], + &[1, 10, 100], + |min_queue_size| match exp_id { + ExperimentId::Experiment1 | ExperimentId::Experiment3 => vec![1], + ExperimentId::Experiment2 | ExperimentId::Experiment4 => { + vec![min_queue_size / 2, min_queue_size, min_queue_size * 2] + } + }, + match exp_id { + ExperimentId::Experiment1 | ExperimentId::Experiment2 => vec![1], + ExperimentId::Experiment3 | ExperimentId::Experiment4 => { + vec![num_nodes / 10, num_nodes / 5, num_nodes / 2] + } + } + .as_slice(), + queue_type, + 20, + ); + paramsets.append(&mut new_paramsets); + start_id = next_start_id; + } + paramsets + } + + fn new_session2_1_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec { + let mut start_id: u16 = 1; + let mut paramsets: Vec = Vec::new(); + for &num_nodes in &[20, 200, 2000] { + let (mut new_paramsets, next_start_id) = Self::new_paramsets( + start_id, + num_nodes, + &[4, 6, 8], + &[10, 50, 100], + &[1], + |_| match exp_id { + ExperimentId::Experiment1 | ExperimentId::Experiment3 => vec![1], + ExperimentId::Experiment2 | ExperimentId::Experiment4 => vec![1000], + }, + match exp_id { + ExperimentId::Experiment1 | ExperimentId::Experiment2 => vec![1], + ExperimentId::Experiment3 | ExperimentId::Experiment4 => { + vec![num_nodes / 10, num_nodes / 5, num_nodes / 2] + } + } + .as_slice(), + queue_type, + 20, + ); + paramsets.append(&mut new_paramsets); + start_id = next_start_id; + } + paramsets + } + + #[allow(clippy::too_many_arguments)] + fn new_paramsets( + start_id: u16, + num_nodes: u16, + peering_degree_list: &[u16], + min_queue_size_list: &[u16], + transmission_rate_list: &[u16], + num_sent_msgs_list: impl Fn(u16) -> Vec, + num_senders_list: &[u16], + queue_type: QueueType, + num_iterations: u16, + ) -> (Vec, u16) { + let mut id = start_id; + let mut paramsets: Vec = Vec::new(); + for &peering_degree in peering_degree_list { + for &min_queue_size in min_queue_size_list { + for &transmission_rate in transmission_rate_list { + for &num_sent_msgs in num_sent_msgs_list(min_queue_size).iter() { + for &num_senders in num_senders_list { + if !Self::is_min_queue_size_applicable(&queue_type) + && min_queue_size != min_queue_size_list[0] + { + id += 1; + continue; + } + paramsets.push(ParamSet { + id, + num_nodes, + peering_degree, + min_queue_size, + transmission_rate, + num_sent_msgs, + num_senders, + queue_type, + num_iterations, + }); + id += 1; + } + } + } + } + } + (paramsets, id) + } + + pub fn is_min_queue_size_applicable(queue_type: &QueueType) -> bool { + matches!( + queue_type, + QueueType::PureCoinFlipping + | QueueType::PureRandomSampling + | QueueType::PermutedCoinFlipping + ) + } + + pub fn as_csv_record(&self) -> Vec { + vec![ + self.id.to_string(), + self.num_nodes.to_string(), + self.peering_degree.to_string(), + self.min_queue_size.to_string(), + self.transmission_rate.to_string(), + self.num_sent_msgs.to_string(), + self.num_senders.to_string(), + format!("{:?}", self.queue_type), + self.num_iterations.to_string(), + ] + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use strum::IntoEnumIterator; + + use crate::paramset::ParamSet; + + use super::*; + + #[test] + fn test_new_all_paramsets() { + let cases = vec![ + ( + (ExperimentId::Experiment1, SessionId::Session1), + 3u32.pow(4), + ), + ( + (ExperimentId::Experiment2, SessionId::Session1), + 3u32.pow(5), + ), + ( + (ExperimentId::Experiment3, SessionId::Session1), + 3u32.pow(5), + ), + ( + (ExperimentId::Experiment4, SessionId::Session1), + 3u32.pow(6), + ), + ( + (ExperimentId::Experiment1, SessionId::Session2), + 3u32.pow(4), + ), + ( + (ExperimentId::Experiment4, SessionId::Session2), + 3u32.pow(6), + ), + ( + (ExperimentId::Experiment1, SessionId::Session2_1), + 3u32.pow(3), + ), + ( + (ExperimentId::Experiment4, SessionId::Session2_1), + 3u32.pow(4), + ), + ]; + + for queue_type in QueueType::iter() { + for ((exp_id, session_id), mut expected_cnt) in cases.clone().into_iter() { + let paramsets = ParamSet::new_all_paramsets(exp_id, session_id, queue_type); + + // Check if the number of parameter sets is correct + if !ParamSet::is_min_queue_size_applicable(&queue_type) { + expected_cnt /= 3; + } + assert_eq!(paramsets.len(), expected_cnt as usize); + + // Check if all parameter sets are unique + let unique_paramsets: HashSet = paramsets.clone().into_iter().collect(); + assert_eq!(unique_paramsets.len(), paramsets.len()); + + // Check if paramset IDs are correct. + if ParamSet::is_min_queue_size_applicable(&queue_type) { + for (i, paramset) in paramsets.iter().enumerate() { + assert_eq!(paramset.id as usize, i + 1); + } + } + } + } + } + + #[test] + fn test_id_consistency() { + let cases = vec![ + (ExperimentId::Experiment1, SessionId::Session1), + (ExperimentId::Experiment2, SessionId::Session1), + (ExperimentId::Experiment3, SessionId::Session1), + (ExperimentId::Experiment4, SessionId::Session1), + (ExperimentId::Experiment1, SessionId::Session2), + (ExperimentId::Experiment4, SessionId::Session2), + (ExperimentId::Experiment1, SessionId::Session2_1), + (ExperimentId::Experiment4, SessionId::Session2_1), + ]; + + for (exp_id, session_id) in cases.into_iter() { + let paramsets_with_min_queue_size = + ParamSet::new_all_paramsets(exp_id, session_id, QueueType::PureCoinFlipping); + let paramsets_without_min_queue_size = + ParamSet::new_all_paramsets(exp_id, session_id, QueueType::NonMix); + + for (i, paramset) in paramsets_with_min_queue_size.iter().enumerate() { + assert_eq!(paramset.id as usize, i + 1); + } + + for mut paramset in paramsets_without_min_queue_size.into_iter() { + // To compare ParameterSet instances, use the same queue type. + paramset.queue_type = QueueType::PureCoinFlipping; + assert_eq!( + paramset, + paramsets_with_min_queue_size[paramset.id as usize - 1] + ); + } + } + } +} diff --git a/mixnet-rs/dissemination/src/queue.rs b/mixnet-rs/dissemination/src/queue.rs new file mode 100644 index 0000000..2332cc7 --- /dev/null +++ b/mixnet-rs/dissemination/src/queue.rs @@ -0,0 +1,358 @@ +use std::collections::VecDeque; + +use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng}; +use strum_macros::EnumIter; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumIter)] +pub enum QueueType { + NonMix, + PureCoinFlipping, + PureRandomSampling, + PermutedCoinFlipping, + NoisyCoinFlipping, +} + +impl std::str::FromStr for QueueType { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "NonMix" => Ok(QueueType::NonMix), + "PureCoinFlipping" => Ok(QueueType::PureCoinFlipping), + "PureRandomSampling" => Ok(QueueType::PureRandomSampling), + "PermutedCoinFlipping" => Ok(QueueType::PermutedCoinFlipping), + "NoisyCoinFlipping" => Ok(QueueType::NoisyCoinFlipping), + _ => Err(format!("Unknown queue type: {}", s)), + } + } +} + +pub trait Queue { + fn push(&mut self, msg: T); + fn pop(&mut self) -> Option; + fn len(&self) -> usize; +} + +pub struct QueueConfig { + pub queue_type: QueueType, + pub seed: u64, + pub min_queue_size: u16, +} + +pub fn new_queue(cfg: &QueueConfig) -> Box> { + match cfg.queue_type { + QueueType::NonMix => Box::new(NonMixQueue::new()), + QueueType::PureCoinFlipping => Box::new(PureCoinFlippingQueue::new( + cfg.min_queue_size, + StdRng::seed_from_u64(cfg.seed), + )), + QueueType::PureRandomSampling => Box::new(PureRandomSamplingQueue::new( + cfg.min_queue_size, + StdRng::seed_from_u64(cfg.seed), + )), + QueueType::PermutedCoinFlipping => Box::new(PermutedCoinFlippingQueue::new( + cfg.min_queue_size, + StdRng::seed_from_u64(cfg.seed), + )), + QueueType::NoisyCoinFlipping => { + Box::new(NoisyCoinFlippingQueue::new(StdRng::seed_from_u64(cfg.seed))) + } + } +} + +struct NonMixQueue { + queue: VecDeque, +} + +impl NonMixQueue { + fn new() -> Self { + Self { + queue: VecDeque::new(), + } + } +} + +impl Queue for NonMixQueue { + fn push(&mut self, msg: T) { + self.queue.push_back(msg) + } + + fn pop(&mut self) -> Option { + self.queue.pop_front() + } + + fn len(&self) -> usize { + self.queue.len() + } +} + +struct MixQueue { + queue: Vec>, + rng: StdRng, +} + +impl MixQueue { + fn new(rng: StdRng) -> Self { + Self { + queue: Vec::new(), + rng, + } + } + + fn push(&mut self, data: T) { + self.queue.push(Some(data)) + } + + fn pop(&mut self, idx: usize) -> Option { + if idx < self.queue.len() { + self.queue.remove(idx) + } else { + None + } + } + + fn len(&self) -> usize { + self.queue.len() + } +} + +struct MinSizeMixQueue { + queue: MixQueue, + min_pool_size: u16, +} + +impl MinSizeMixQueue { + fn new(min_pool_size: u16, rng: StdRng) -> Self { + let mut queue = MixQueue::new(rng); + queue.queue = vec![None; min_pool_size as usize]; + Self { + queue, + min_pool_size, + } + } + + fn push(&mut self, msg: T) { + self.queue.push(msg) + } + + fn pop(&mut self, idx: usize) -> Option { + self.queue.pop(idx) + } + + fn ensure_min_size(&mut self) { + if self.queue.len() < self.min_pool_size as usize { + self.queue.queue.extend( + std::iter::repeat(None).take(self.min_pool_size as usize - self.queue.queue.len()), + ); + } + } + + fn len(&self) -> usize { + self.queue.len() + } +} + +struct PureCoinFlippingQueue { + queue: MinSizeMixQueue, +} + +impl PureCoinFlippingQueue { + fn new(min_pool_size: u16, rng: StdRng) -> Self { + Self { + queue: MinSizeMixQueue::new(min_pool_size, rng), + } + } +} + +impl Queue for PureCoinFlippingQueue { + fn push(&mut self, msg: T) { + self.queue.push(msg) + } + + fn pop(&mut self) -> Option { + self.queue.ensure_min_size(); + + loop { + for i in 0..self.len() { + if self.queue.queue.rng.gen_bool(0.5) { + return self.queue.pop(i); + } + } + } + } + + fn len(&self) -> usize { + self.queue.len() + } +} + +struct PureRandomSamplingQueue { + queue: MinSizeMixQueue, +} + +impl PureRandomSamplingQueue { + fn new(min_pool_size: u16, rng: StdRng) -> Self { + Self { + queue: MinSizeMixQueue::new(min_pool_size, rng), + } + } +} + +impl Queue for PureRandomSamplingQueue { + fn push(&mut self, msg: T) { + self.queue.push(msg) + } + + fn pop(&mut self) -> Option { + self.queue.ensure_min_size(); + + let i = self.queue.queue.rng.gen_range(0..self.queue.len()); + self.queue.pop(i) + } + + fn len(&self) -> usize { + self.queue.len() + } +} + +struct PermutedCoinFlippingQueue { + queue: MinSizeMixQueue, +} + +impl PermutedCoinFlippingQueue { + fn new(min_pool_size: u16, rng: StdRng) -> Self { + Self { + queue: MinSizeMixQueue::new(min_pool_size, rng), + } + } +} + +impl Queue for PermutedCoinFlippingQueue { + fn push(&mut self, msg: T) { + self.queue.push(msg) + } + + fn pop(&mut self) -> Option { + self.queue.ensure_min_size(); + + self.queue + .queue + .queue + .as_mut_slice() + .shuffle(&mut self.queue.queue.rng); + + loop { + for i in 0..self.queue.len() { + if self.queue.queue.rng.gen_bool(0.5) { + return self.queue.pop(i); + } + } + } + } + + fn len(&self) -> usize { + self.queue.len() + } +} + +struct NoisyCoinFlippingQueue { + queue: MixQueue, +} + +impl NoisyCoinFlippingQueue { + pub fn new(rng: StdRng) -> Self { + Self { + queue: MixQueue::new(rng), + } + } +} + +impl Queue for NoisyCoinFlippingQueue { + fn push(&mut self, msg: T) { + self.queue.push(msg) + } + + fn pop(&mut self) -> Option { + if self.queue.len() == 0 { + return None; + } + + loop { + for i in 0..self.queue.len() { + if self.queue.rng.gen_bool(0.5) { + return self.queue.pop(i); + } else if i == 0 { + return None; + } + } + } + } + + fn len(&self) -> usize { + self.queue.len() + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use super::*; + + #[test] + fn test_non_mix_queue() { + let mut queue = new_queue(QueueType::NonMix, 0, 0); + + // Check if None (noise) is returned when queue is empty + assert_eq!(queue.pop(), None); + + // Check if queue is FIFO + queue.push(0); + queue.push(1); + assert_eq!(queue.pop(), Some(0)); + assert_eq!(queue.pop(), Some(1)); + + // Check if None (noise) is returned when queue is empty + assert_eq!(queue.pop(), None); + + // Check if queue is FIFO again + queue.push(2); + queue.push(3); + assert_eq!(queue.pop(), Some(2)); + assert_eq!(queue.pop(), Some(3)); + } + + #[test] + fn test_mix_queues() { + for queue_type in [ + QueueType::PureCoinFlipping, + QueueType::PureRandomSampling, + QueueType::PermutedCoinFlipping, + QueueType::NoisyCoinFlipping, + ] { + test_mix_queue(queue_type); + } + } + + fn test_mix_queue(queue_type: QueueType) { + let mut queue = new_queue(queue_type, 0, 4); + + // Check if None (noise) is returned when queue is empty + assert_eq!(queue.pop(), None); + + // Put only 2 messages even though the min queue size is 4 + queue.push(0); + queue.push(1); + + // Wait until 2 messages are returned from the queue + let mut set: HashSet<_> = vec![0, 1].into_iter().collect(); + while !set.is_empty() { + if let Some(msg) = queue.pop() { + assert!(set.remove(&msg)); + } + } + + // Check if None (noise) is returned when there is no real message remains + assert_eq!(queue.pop(), None); + } +} diff --git a/mixnet-rs/dissemination/src/topology.rs b/mixnet-rs/dissemination/src/topology.rs new file mode 100644 index 0000000..699ca22 --- /dev/null +++ b/mixnet-rs/dissemination/src/topology.rs @@ -0,0 +1,68 @@ +use std::collections::{HashMap, HashSet}; + +use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng}; + +use crate::node::NodeId; + +pub fn build_topology( + num_nodes: u16, + peering_degree: u16, + seed: u64, +) -> HashMap> { + let mut rng = StdRng::seed_from_u64(seed); + + loop { + let mut topology: HashMap> = HashMap::new(); + for node in 0..num_nodes { + topology.insert(node, HashSet::new()); + } + + for node in 0..num_nodes { + let mut others: Vec = Vec::new(); + for other in (0..node).chain(node + 1..num_nodes) { + // Check if the other node is not already connected to the current node + // and the other node has not reached the peering degree. + if !topology.get(&node).unwrap().contains(&other) + && topology.get(&other).unwrap().len() < peering_degree as usize + { + others.push(other); + } + } + + // How many more connections the current node needs + let num_needs = peering_degree as usize - topology.get(&node).unwrap().len(); + // Smaple peers as many as possible and connect them to the current node + let k = std::cmp::min(num_needs, others.len()); + others.as_mut_slice().shuffle(&mut rng); + others.into_iter().take(k).for_each(|peer| { + topology.get_mut(&node).unwrap().insert(peer); + topology.get_mut(&peer).unwrap().insert(node); + }); + } + + if are_all_nodes_connected(&topology) { + return topology; + } + } +} + +fn are_all_nodes_connected(topology: &HashMap>) -> bool { + let start_node = topology.keys().next().unwrap(); + let visited = dfs(topology, *start_node); + visited.len() == topology.len() +} + +fn dfs(topology: &HashMap>, start_node: NodeId) -> HashSet { + let mut visited: HashSet = HashSet::new(); + let mut stack: Vec = Vec::new(); + stack.push(start_node); + while let Some(node) = stack.pop() { + visited.insert(node); + for peer in topology.get(&node).unwrap().iter() { + if !visited.contains(peer) { + stack.push(*peer); + } + } + } + visited +}