diff --git a/mixnet/ordering/src/iteration.rs b/mixnet/ordering/src/iteration.rs index 4c8849c..22a6886 100644 --- a/mixnet/ordering/src/iteration.rs +++ b/mixnet/ordering/src/iteration.rs @@ -36,7 +36,7 @@ impl Iteration { (0..self.paramset.num_senders) .map(|sender_idx| format!("{dir}/sent_seq_{sender_idx}__WIP__.csv")) .collect(), - (0..self.paramset.num_receiver_connections()) + (0..self.paramset.num_sender_or_receiver_conns()) .map(|conn_idx| format!("{dir}/recv_seq_{conn_idx}__WIP__.csv")) .collect(), format!("{dir}/data_msg_counts__WIP__.csv"), diff --git a/mixnet/ordering/src/paramset.rs b/mixnet/ordering/src/paramset.rs index 71192d4..cfa3040 100644 --- a/mixnet/ordering/src/paramset.rs +++ b/mixnet/ordering/src/paramset.rs @@ -1,3 +1,5 @@ +use std::fmt::Display; + use protocol::queue::QueueType; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -16,12 +18,12 @@ impl std::str::FromStr for ExperimentId { fn from_str(s: &str) -> Result { match s { - "1" | "Experiment1" => Ok(ExperimentId::Experiment1), - "2" | "Experiment2" => Ok(ExperimentId::Experiment2), - "3" | "Experiment3" => Ok(ExperimentId::Experiment3), - "4" | "Experiment4" => Ok(ExperimentId::Experiment4), - "5" | "Experiment5" => Ok(ExperimentId::Experiment5), - "6" | "Experiment6" => Ok(ExperimentId::Experiment6), + "1" => Ok(ExperimentId::Experiment1), + "2" => Ok(ExperimentId::Experiment2), + "3" => Ok(ExperimentId::Experiment3), + "4" => Ok(ExperimentId::Experiment4), + "5" => Ok(ExperimentId::Experiment5), + "6" => Ok(ExperimentId::Experiment6), _ => Err(format!("Invalid experiment ID: {}", s)), } } @@ -31,6 +33,7 @@ impl std::str::FromStr for ExperimentId { #[repr(u8)] pub enum SessionId { Session1 = 1, + Session3 = 3, } impl std::str::FromStr for SessionId { @@ -38,7 +41,8 @@ impl std::str::FromStr for SessionId { fn from_str(s: &str) -> Result { match s { - "1" | "Session1" => Ok(SessionId::Session1), + "1" => Ok(SessionId::Session1), + "3" => Ok(SessionId::Session3), _ => Err(format!("Invalid session ID: {}", s)), } } @@ -66,7 +70,7 @@ pub struct ParamSet { pub num_mixes: u32, pub num_paths: u16, pub random_topology: bool, - pub peering_degree: u32, + pub peering_degree: PeeringDegree, pub min_queue_size: u16, pub transmission_rate: u16, pub num_senders: u8, @@ -77,6 +81,21 @@ pub struct ParamSet { pub num_iterations: usize, } +#[derive(Debug, Clone, PartialEq)] +pub enum PeeringDegree { + Fixed(u32), + Random(Vec<(u32, f32)>), +} + +impl Display for PeeringDegree { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PeeringDegree::Fixed(c) => write!(f, "{c}"), + PeeringDegree::Random(c_probs) => write!(f, "{c_probs:?}"), + } + } +} + impl ParamSet { pub fn new_all_paramsets( exp_id: ExperimentId, @@ -85,6 +104,7 @@ impl ParamSet { ) -> Vec { match session_id { SessionId::Session1 => Self::new_session1_paramsets(exp_id, queue_type), + SessionId::Session3 => Self::new_session3_paramsets(exp_id, queue_type), } } @@ -130,7 +150,7 @@ impl ParamSet { num_mixes, num_paths, random_topology: false, - peering_degree: 1, + peering_degree: PeeringDegree::Fixed(1), min_queue_size, transmission_rate, num_senders, @@ -157,7 +177,7 @@ impl ParamSet { num_mixes, num_paths: 0, // since we're gonna build random topology random_topology: true, - peering_degree, + peering_degree: PeeringDegree::Fixed(peering_degree), min_queue_size, transmission_rate, num_senders, @@ -179,9 +199,77 @@ impl ParamSet { paramsets } - pub fn num_receiver_connections(&self) -> usize { + fn new_session3_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec { + let sender_data_msg_probs: &[f32] = match exp_id { + ExperimentId::Experiment5 => &[0.01, 0.1, 0.5, 0.9, 0.99, 1.0], + ExperimentId::Experiment6 => &[0.01, 0.1, 0.5], + _ => { + panic!("Only Experiment5 and Experiment6 are supported for Session3"); + } + }; + let mix_data_msg_probs = |num_mixes: u32| match exp_id { + ExperimentId::Experiment5 => { + vec![0.0] + } + ExperimentId::Experiment6 => { + let g: f32 = num_mixes as f32; + vec![1.0 / (2.0 * g), 1.0 / g, 2.0 / g] + } + _ => { + panic!("Only Experiment5 and Experiment6 are supported for Session3"); + } + }; + + let mut id: u16 = 1; + let mut paramsets: Vec = Vec::new(); + match exp_id { + ExperimentId::Experiment5 | ExperimentId::Experiment6 => { + let num_mixes = 32; + for &sender_data_msg_prob in sender_data_msg_probs { + for &mix_data_msg_prob in &mix_data_msg_probs(num_mixes) { + let paramset = ParamSet { + id, + num_mixes, + num_paths: 0, // since we're gonna build random topology + random_topology: true, + peering_degree: PeeringDegree::Random(vec![ + (4, 0.87), + (12, 0.123), + (24, 0.007), + ]), + min_queue_size: 10, + transmission_rate: 1, + num_senders: 1, + num_sender_msgs: match exp_id { + ExperimentId::Experiment6 => 10000, + _ => 1000000, + }, + sender_data_msg_prob, + mix_data_msg_prob, + queue_type, + num_iterations: 10, + }; + id += 1; + paramsets.push(paramset); + } + } + } + _ => { + panic!("Only Experiment5 and Experiment6 are supported for Session3"); + } + } + + paramsets + } + + pub fn num_sender_or_receiver_conns(&self) -> usize { if self.random_topology { - self.peering_degree as usize + match &self.peering_degree { + PeeringDegree::Fixed(c) => *c as usize, + PeeringDegree::Random(c_probs) => { + c_probs.iter().map(|(c, _)| *c).min().unwrap() as usize + } + } } else { self.num_paths as usize } @@ -233,6 +321,8 @@ mod tests { (ExperimentId::Experiment6, SessionId::Session1), 3 * 3 * 3 * 3, ), + ((ExperimentId::Experiment5, SessionId::Session3), 6), + ((ExperimentId::Experiment6, SessionId::Session3), 3 * 3), ]; for queue_type in QueueType::iter() { @@ -260,6 +350,18 @@ mod tests { assert_eq!(paramset.id as usize, i + 1); println!("{:?}", paramset); } + + // Check PeeringDegree + for paramset in paramsets.iter() { + match session_id { + SessionId::Session1 => { + assert!(matches!(paramset.peering_degree, PeeringDegree::Fixed(_))) + } + SessionId::Session3 => { + assert!(matches!(paramset.peering_degree, PeeringDegree::Random(_))) + } + } + } } } } diff --git a/mixnet/ordering/src/topology.rs b/mixnet/ordering/src/topology.rs index bbe3bfc..2384b20 100644 --- a/mixnet/ordering/src/topology.rs +++ b/mixnet/ordering/src/topology.rs @@ -8,7 +8,10 @@ use protocol::{ use rand::{rngs::StdRng, seq::SliceRandom, RngCore, SeedableRng}; use rustc_hash::FxHashMap; -use crate::{outputs::Outputs, paramset::ParamSet}; +use crate::{ + outputs::Outputs, + paramset::{ParamSet, PeeringDegree}, +}; use ordering::message::SenderIdx; pub const RECEIVER_NODE_ID: NodeId = NodeId::MAX; @@ -18,6 +21,13 @@ pub fn build_striped_network (Vec>, AllSenderPeers, ReceiverPeers) { assert!(!paramset.random_topology); + let peering_degree = match paramset.peering_degree { + PeeringDegree::Fixed(c) => c, + PeeringDegree::Random(_) => { + panic!("PeeringDegree::Random not supported for striped network"); + } + }; + let mut next_node_id: NodeId = 0; let mut queue_seed_rng = StdRng::seed_from_u64(seed); let mut mixnodes: Vec> = @@ -35,7 +45,7 @@ pub fn build_striped_network (Vec>, AllSenderPeers, ReceiverPeers) { assert!(paramset.random_topology); + + let peering_degrees = match ¶mset.peering_degree { + PeeringDegree::Fixed(c) => vec![*c; paramset.num_mixes as usize], + PeeringDegree::Random(c_probs) => { + // Sort c_probs by the probability in ascending order + let mut c_probs = c_probs.clone(); + c_probs.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap()); + + let mut degrees = Vec::with_capacity(paramset.num_mixes as usize); + for (i, (c, prob)) in c_probs.iter().enumerate() { + let count = if i < c_probs.len() - 1 { + (prob * paramset.num_mixes as f32).ceil() as u32 + } else { + let num_determined: u32 = degrees.len().try_into().unwrap(); + paramset.num_mixes - num_determined + }; + degrees.extend(std::iter::repeat(*c).take(count as usize)); + } + degrees + } + }; + // Init mix nodes let mut queue_seed_rng = StdRng::seed_from_u64(seed); let mut mixnodes: Vec> = Vec::with_capacity(paramset.num_mixes as usize); @@ -91,7 +123,7 @@ pub fn build_random_network = mixnodes.iter().map(|mixnode| mixnode.id).collect(); - assert!(candidates.len() >= paramset.peering_degree as usize); + let num_sender_or_receiver_conns = paramset.num_sender_or_receiver_conns(); + assert!(candidates.len() >= num_sender_or_receiver_conns); let mut all_sender_peers = AllSenderPeers::new(paramset.num_senders); for _ in 0..paramset.num_senders { candidates.as_mut_slice().shuffle(&mut peers_rng); let mut peers: Vec = candidates .iter() .cloned() - .take(paramset.peering_degree as usize) + .take(num_sender_or_receiver_conns) .collect(); peers.sort(); all_sender_peers.add(peers); @@ -115,14 +148,17 @@ pub fn build_random_network = candidates .iter() .cloned() - .take(paramset.peering_degree as usize) + .take(num_sender_or_receiver_conns) .collect(); receiver_peer_ids.sort(); // Connect mix nodes let topology = build_topology( - paramset.num_mixes, - &vec![paramset.peering_degree; paramset.num_mixes as usize], + mixnodes.len().try_into().unwrap(), + &mixnodes + .iter() + .map(|mixnode| mixnode.peering_degree) + .collect::>(), seed, ); for (node_id, peers) in topology.iter().enumerate() { diff --git a/mixnet/protocol/src/node.rs b/mixnet/protocol/src/node.rs index 74dab61..59488d8 100644 --- a/mixnet/protocol/src/node.rs +++ b/mixnet/protocol/src/node.rs @@ -19,7 +19,7 @@ where connected_peers: FxHashSet, // A cache to avoid relaying the same message multiple times. received_msgs: Option>, - peering_degree: u32, + pub peering_degree: u32, } pub type MessagesToRelay = Vec<(NodeId, Message)>; diff --git a/mixnet/protocol/src/topology.rs b/mixnet/protocol/src/topology.rs index ec86e5c..b1bceae 100644 --- a/mixnet/protocol/src/topology.rs +++ b/mixnet/protocol/src/topology.rs @@ -7,6 +7,9 @@ pub type Topology = Vec>; pub fn build_topology(num_nodes: u32, peering_degrees: &[u32], seed: u64) -> Topology { assert_eq!(num_nodes as usize, peering_degrees.len()); + // Assert that peering degrees are sorted in descending order + assert!(peering_degrees.windows(2).all(|w| w[0] >= w[1])); + let mut rng = StdRng::seed_from_u64(seed); loop {