From e9012eae832c4fcf59834da12985e559038a0df0 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Thu, 22 Aug 2024 11:42:04 +0200 Subject: [PATCH] refactor all, and prepare revised paramsets, but no multiple mix impl yet --- mixnet-rs/Cargo.toml | 2 +- mixnet-rs/dissemination/Cargo.toml | 2 +- mixnet-rs/dissemination/src/iteration.rs | 42 +-- mixnet-rs/dissemination/src/main.rs | 3 +- mixnet-rs/dissemination/src/node.rs | 88 ------- mixnet-rs/dissemination/src/paramset.rs | 2 +- mixnet-rs/dissemination/src/topology.rs | 3 +- .../{single-path => ordering}/Cargo.toml | 4 +- .../src/iteration.rs | 112 ++++---- .../{single-path => ordering}/src/main.rs | 5 +- .../src/ordercoeff.rs | 123 +++++---- mixnet-rs/ordering/src/paramset.rs | 240 ++++++++++++++++++ mixnet-rs/{queue => protocol}/Cargo.toml | 4 +- mixnet-rs/protocol/src/lib.rs | 2 + mixnet-rs/protocol/src/node.rs | 107 ++++++++ .../src/lib.rs => protocol/src/queue.rs} | 137 +++++----- mixnet-rs/single-path/src/node.rs | 35 --- mixnet-rs/single-path/src/paramset.rs | 220 ---------------- 18 files changed, 584 insertions(+), 547 deletions(-) delete mode 100644 mixnet-rs/dissemination/src/node.rs rename mixnet-rs/{single-path => ordering}/Cargo.toml (79%) rename mixnet-rs/{single-path => ordering}/src/iteration.rs (63%) rename mixnet-rs/{single-path => ordering}/src/main.rs (97%) rename mixnet-rs/{single-path => ordering}/src/ordercoeff.rs (69%) create mode 100644 mixnet-rs/ordering/src/paramset.rs rename mixnet-rs/{queue => protocol}/Cargo.toml (66%) create mode 100644 mixnet-rs/protocol/src/lib.rs create mode 100644 mixnet-rs/protocol/src/node.rs rename mixnet-rs/{queue/src/lib.rs => protocol/src/queue.rs} (73%) delete mode 100644 mixnet-rs/single-path/src/node.rs delete mode 100644 mixnet-rs/single-path/src/paramset.rs diff --git a/mixnet-rs/Cargo.toml b/mixnet-rs/Cargo.toml index 7890756..829907e 100644 --- a/mixnet-rs/Cargo.toml +++ b/mixnet-rs/Cargo.toml @@ -1,3 +1,3 @@ [workspace] -members = ["dissemination", "queue", "single-path"] +members = ["dissemination", "protocol", "ordering"] resolver = "2" diff --git a/mixnet-rs/dissemination/Cargo.toml b/mixnet-rs/dissemination/Cargo.toml index f873334..b98e3e3 100644 --- a/mixnet-rs/dissemination/Cargo.toml +++ b/mixnet-rs/dissemination/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" chrono = "0.4.38" clap = { version = "4.5.16", features = ["derive"] } csv = "1.3.0" -queue = { version = "0.1.0", path = "../queue" } +protocol = { version = "0.1.0", path = "../protocol" } rand = "0.8.5" rayon = "1.10.0" rustc-hash = "2.0.0" diff --git a/mixnet-rs/dissemination/src/iteration.rs b/mixnet-rs/dissemination/src/iteration.rs index 0710858..9068993 100644 --- a/mixnet-rs/dissemination/src/iteration.rs +++ b/mixnet-rs/dissemination/src/iteration.rs @@ -1,11 +1,13 @@ use std::error::Error; -use queue::QueueConfig; +use protocol::{ + node::{MessageId, Node, NodeId}, + queue::{Message, QueueConfig}, +}; use rand::{rngs::StdRng, seq::SliceRandom, RngCore, SeedableRng}; use rustc_hash::FxHashMap; use crate::{ - node::{MessageId, Node, NodeId}, paramset::ParamSet, topology::{build_topology, Topology}, }; @@ -28,6 +30,7 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology min_queue_size: paramset.min_queue_size, }, peering_degrees[node_id as usize], + true, )); } tracing::debug!("Nodes initialized."); @@ -132,7 +135,7 @@ fn relay_messages( writer: &mut csv::Writer, ) { // Collect messages to relay - let mut all_msgs_to_relay: Vec> = Vec::new(); + let mut all_msgs_to_relay: Vec)>> = Vec::new(); for node in nodes.iter_mut() { all_msgs_to_relay.push(node.read_queues()); } @@ -144,22 +147,25 @@ fn relay_messages( .for_each(|(sender_id, msgs_to_relay)| { let sender_id: NodeId = sender_id.try_into().unwrap(); msgs_to_relay.into_iter().for_each(|(receiver_id, msg)| { - if nodes[receiver_id as usize].receive(msg, sender_id) { - let (sent_time, num_received_nodes) = message_tracker.get_mut(&msg).unwrap(); - *num_received_nodes += 1; - if *num_received_nodes as usize == nodes.len() { - let dissemination_time = vtime - *sent_time; - writer - .write_record(&[ - dissemination_time.to_string(), - sent_time.to_string(), - vtime.to_string(), - ]) - .unwrap(); - writer.flush().unwrap(); - *num_disseminated_msgs += 1; + if let Message::Data(msg) = msg { + if nodes[receiver_id as usize].receive(msg, Some(sender_id)) { + let (sent_time, num_received_nodes) = + message_tracker.get_mut(&msg).unwrap(); + *num_received_nodes += 1; + if *num_received_nodes as usize == nodes.len() { + let dissemination_time = vtime - *sent_time; + writer + .write_record(&[ + dissemination_time.to_string(), + sent_time.to_string(), + vtime.to_string(), + ]) + .unwrap(); + writer.flush().unwrap(); + *num_disseminated_msgs += 1; - message_tracker.remove(&msg); + message_tracker.remove(&msg); + } } } }) diff --git a/mixnet-rs/dissemination/src/main.rs b/mixnet-rs/dissemination/src/main.rs index 88cf3f4..af20679 100644 --- a/mixnet-rs/dissemination/src/main.rs +++ b/mixnet-rs/dissemination/src/main.rs @@ -1,6 +1,6 @@ use chrono::Utc; use clap::Parser; -use queue::QueueType; +use protocol::queue::QueueType; use rayon::prelude::*; use std::{ error::Error, @@ -12,7 +12,6 @@ use iteration::run_iteration; use paramset::{ExperimentId, ParamSet, SessionId, PARAMSET_CSV_COLUMNS}; mod iteration; -mod node; mod paramset; mod topology; diff --git a/mixnet-rs/dissemination/src/node.rs b/mixnet-rs/dissemination/src/node.rs deleted file mode 100644 index 48ce4d5..0000000 --- a/mixnet-rs/dissemination/src/node.rs +++ /dev/null @@ -1,88 +0,0 @@ -use queue::{new_queue, Queue, QueueConfig}; -use rustc_hash::{FxHashMap, FxHashSet}; - -pub type NodeId = u32; -pub type MessageId = u32; - -pub struct Node { - queue_config: QueueConfig, - // To have the deterministic result, we use Vec instead of FxHashMap. - // Building `queues` is inefficient, but it's not a problem because it's done only once at the beginning. - // Instead, use `connected_peers` to build `queues` efficiently. - queues: Vec<(NodeId, Box>)>, - connected_peers: FxHashSet, - // A cache to avoid relaying the same message multiple times. - received_msgs: FxHashMap, - peering_degree: u32, -} - -impl Node { - pub fn new(queue_config: QueueConfig, peering_degree: u32) -> Self { - Node { - queue_config, - queues: Vec::new(), - connected_peers: FxHashSet::default(), - received_msgs: FxHashMap::default(), - peering_degree, - } - } - - pub fn connect(&mut self, peer_id: NodeId) { - if self.connected_peers.insert(peer_id) { - let pos = self - .queues - .binary_search_by(|probe| probe.0.cmp(&peer_id)) - .unwrap_or_else(|pos| pos); - self.queues - .insert(pos, (peer_id, new_queue(&self.queue_config))); - } - } - - pub fn send(&mut self, msg: MessageId) { - assert!(self.check_and_update_cache(msg, true)); - for (_, queue) in self.queues.iter_mut() { - queue.push(msg); - } - } - - pub fn receive(&mut self, msg: MessageId, from: NodeId) -> bool { - let first_received = self.check_and_update_cache(msg, false); - if first_received { - for (node_id, queue) in self.queues.iter_mut() { - if *node_id != from { - queue.push(msg); - } - } - } - first_received - } - - pub fn read_queues(&mut self) -> Vec<(NodeId, MessageId)> { - let mut msgs_to_relay: Vec<(NodeId, MessageId)> = Vec::new(); - for (node_id, queue) in self.queues.iter_mut() { - if let Some(msg) = queue.pop() { - msgs_to_relay.push((*node_id, msg)); - } - } - msgs_to_relay - } - - fn check_and_update_cache(&mut self, msg: MessageId, sending: bool) -> bool { - let first_received = if let Some(count) = self.received_msgs.get_mut(&msg) { - *count += 1; - false - } else { - self.received_msgs.insert(msg, if sending { 0 } else { 1 }); - true - }; - - // If the message have been received from all connected peers, remove it from the cache - // because there is no possibility that the message will be received again. - if self.received_msgs.get(&msg).unwrap() == &self.peering_degree { - tracing::debug!("Remove message from cache: {}", msg); - self.received_msgs.remove(&msg); - } - - first_received - } -} diff --git a/mixnet-rs/dissemination/src/paramset.rs b/mixnet-rs/dissemination/src/paramset.rs index c664fbf..6aaae32 100644 --- a/mixnet-rs/dissemination/src/paramset.rs +++ b/mixnet-rs/dissemination/src/paramset.rs @@ -1,4 +1,4 @@ -use queue::QueueType; +use protocol::queue::QueueType; use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] diff --git a/mixnet-rs/dissemination/src/topology.rs b/mixnet-rs/dissemination/src/topology.rs index 014afbf..41ad54c 100644 --- a/mixnet-rs/dissemination/src/topology.rs +++ b/mixnet-rs/dissemination/src/topology.rs @@ -1,9 +1,8 @@ use std::collections::HashSet; +use protocol::node::NodeId; use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng}; -use crate::node::NodeId; - pub type Topology = Vec>; pub fn build_topology(num_nodes: u32, peering_degrees: &[u32], seed: u64) -> Topology { diff --git a/mixnet-rs/single-path/Cargo.toml b/mixnet-rs/ordering/Cargo.toml similarity index 79% rename from mixnet-rs/single-path/Cargo.toml rename to mixnet-rs/ordering/Cargo.toml index 1cd920b..97a82fa 100644 --- a/mixnet-rs/single-path/Cargo.toml +++ b/mixnet-rs/ordering/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "single-path" +name = "ordering" version = "0.1.0" edition = "2021" @@ -7,7 +7,7 @@ edition = "2021" chrono = "0.4.38" clap = "4.5.16" csv = "1.3.0" -queue = { version = "0.1.0", path = "../queue" } +protocol = { version = "0.1.0", path = "../protocol" } rand = "0.8.5" rustc-hash = "2.0.0" tracing = "0.1.40" diff --git a/mixnet-rs/single-path/src/iteration.rs b/mixnet-rs/ordering/src/iteration.rs similarity index 63% rename from mixnet-rs/single-path/src/iteration.rs rename to mixnet-rs/ordering/src/iteration.rs index 3da622c..e53b8d2 100644 --- a/mixnet-rs/single-path/src/iteration.rs +++ b/mixnet-rs/ordering/src/iteration.rs @@ -1,14 +1,13 @@ use std::path::Path; -use queue::QueueConfig; +use protocol::{ + node::{MessageId, Node}, + queue::{Message, QueueConfig, QueueType}, +}; use rand::{rngs::StdRng, Rng, SeedableRng}; use rustc_hash::FxHashMap; -use crate::{ - node::{MessageId, Node}, - ordercoeff::Sequence, - paramset::ParamSet, -}; +use crate::{ordercoeff::Sequence, paramset::ParamSet}; pub fn run_iteration( paramset: ParamSet, @@ -30,11 +29,16 @@ pub fn run_iteration( } // Initialize a mix node - let mut mixnode = Node::new(&QueueConfig { - queue_type: paramset.queue_type, - seed, - min_queue_size: paramset.min_queue_size, - }); + let mut mixnode = Node::new( + QueueConfig { + queue_type: paramset.queue_type, + seed, + min_queue_size: paramset.min_queue_size, + }, + paramset.peering_degree, + false, + ); + mixnode.connect(u32::MAX); // connect to the virtual receiver node let mut next_msg_id: MessageId = 0; @@ -43,29 +47,39 @@ pub fn run_iteration( // Transmission interval that each queue must release a message let transmission_interval = 1.0 / paramset.transmission_rate as f32; // Results + let mut all_sent_count = 0; // all data + noise sent by the sender let mut sent_times: FxHashMap = FxHashMap::default(); let mut latencies: FxHashMap = FxHashMap::default(); let mut sent_sequence = Sequence::new(); let mut received_sequence = Sequence::new(); - let mut data_msg_counts_in_queue: Vec = Vec::new(); + let mut data_msg_counts_in_queue: Vec = Vec::new(); let mut rng = StdRng::seed_from_u64(seed); loop { + tracing::trace!( + "VTIME:{}, ALL_SENT:{}, DATA_SENT:{}, DATA_RECEIVED:{}", + vtime, + all_sent_count, + sent_times.len(), + latencies.len() + ); + // The sender emits a message (data or noise) to the mix node. - if sent_times.len() < paramset.num_sender_data_msgs as usize - && try_probability(&mut rng, paramset.sender_data_msg_prob) - { - let msg = next_msg_id; - next_msg_id += 1; - mixnode.receive(msg); - sent_times.insert(msg, vtime); - sent_sequence.add_message(msg); - } else { - // Generate noise and add it to the sequence to calculate ordering coefficients later, - // but don't need to send it to the mix node - // because the mix node will anyway drop the noise, - // and we don't need to record what the mix node receives. - sent_sequence.add_noise(); + if all_sent_count < paramset.num_sender_msgs as usize { + if try_probability(&mut rng, paramset.sender_data_msg_prob) { + let msg = next_msg_id; + next_msg_id += 1; + mixnode.receive(msg, None); + sent_times.insert(msg, vtime); + sent_sequence.add_message(msg); + } else { + // Generate noise and add it to the sequence to calculate ordering coefficients later, + // but don't need to send it to the mix node + // because the mix node will anyway drop the noise, + // and we don't need to record what the mix node receives. + sent_sequence.add_noise(); + } + all_sent_count += 1; } // The mix node add a new data message to its queue with a certain probability @@ -78,22 +92,26 @@ pub fn run_iteration( // The mix node emits a message (data or noise) to the receiver. // As the receiver, record the time and order of the received messages. - match mixnode.read_queue() { - Some(msg) => { + // TODO: handle all queues + match mixnode.read_queues().first().unwrap().1 { + Message::Data(msg) => { latencies.insert(msg, vtime - sent_times.get(&msg).unwrap()); received_sequence.add_message(msg); } - None => { + Message::Noise => { received_sequence.add_noise(); } } // Record the number of data messages in the mix node's queue - data_msg_counts_in_queue.push(mixnode.message_count_in_queue()); + // TODO: handle all queues + data_msg_counts_in_queue.push(*mixnode.data_count_in_queue().first().unwrap()); - // If all messages have been received by the receiver, stop the iteration. - assert!(latencies.len() <= paramset.num_sender_data_msgs as usize); - if latencies.len() == paramset.num_sender_data_msgs as usize { + // If all data amessages (that the sender has to send) have been received by the receiver, + // stop the iteration. + if all_sent_count == paramset.num_sender_msgs as usize + && sent_times.len() == latencies.len() + { break; } @@ -110,18 +128,20 @@ pub fn run_iteration( out_data_msg_counts_path, ); // Calculate ordering coefficients and save them to a CSV file. - let strong_ordering_coeff = sent_sequence.ordering_coefficient(&received_sequence, true); - let weak_ordering_coeff = sent_sequence.ordering_coefficient(&received_sequence, false); - tracing::info!( - "STRONG_COEFF:{}, WEAK_COEFF:{}", - strong_ordering_coeff, - weak_ordering_coeff - ); - save_ordering_coefficients( - strong_ordering_coeff, - weak_ordering_coeff, - out_ordering_coeff_path, - ); + if paramset.queue_type != QueueType::NonMix { + let strong_ordering_coeff = sent_sequence.ordering_coefficient(&received_sequence, true); + let weak_ordering_coeff = sent_sequence.ordering_coefficient(&received_sequence, false); + tracing::info!( + "STRONG_COEFF:{}, WEAK_COEFF:{}", + strong_ordering_coeff, + weak_ordering_coeff + ); + save_ordering_coefficients( + strong_ordering_coeff, + weak_ordering_coeff, + out_ordering_coeff_path, + ); + } } fn try_probability(rng: &mut StdRng, prob: f32) -> bool { @@ -163,7 +183,7 @@ fn save_sequence(sequence: &Sequence, path: &str) { } fn save_data_msg_counts( - data_msg_counts_in_queue: &[u32], + data_msg_counts_in_queue: &[usize], interval: f32, out_data_msg_counts_path: &str, ) { diff --git a/mixnet-rs/single-path/src/main.rs b/mixnet-rs/ordering/src/main.rs similarity index 97% rename from mixnet-rs/single-path/src/main.rs rename to mixnet-rs/ordering/src/main.rs index 840b8cd..7b3ea6f 100644 --- a/mixnet-rs/single-path/src/main.rs +++ b/mixnet-rs/ordering/src/main.rs @@ -1,5 +1,4 @@ mod iteration; -mod node; mod ordercoeff; mod paramset; @@ -13,7 +12,7 @@ use chrono::Utc; use clap::Parser; use iteration::run_iteration; use paramset::{ExperimentId, ParamSet, SessionId, PARAMSET_CSV_COLUMNS}; -use queue::QueueType; +use protocol::queue::QueueType; #[derive(Debug, Parser)] #[command(name = "Single Sender Single Mix Measurement")] @@ -49,7 +48,7 @@ fn main() { "Output directory does not exist: {outdir}" ); let subdir = format!( - "__WIP__dissemination_e{}s{}_{:?}_{}___DUR__", + "__WIP__ordering_e{}s{}_{:?}_{}___DUR__", exp_id as u8, session_id as u8, queue_type, diff --git a/mixnet-rs/single-path/src/ordercoeff.rs b/mixnet-rs/ordering/src/ordercoeff.rs similarity index 69% rename from mixnet-rs/single-path/src/ordercoeff.rs rename to mixnet-rs/ordering/src/ordercoeff.rs index af25a09..7ee1836 100644 --- a/mixnet-rs/single-path/src/ordercoeff.rs +++ b/mixnet-rs/ordering/src/ordercoeff.rs @@ -1,19 +1,19 @@ use std::fmt::Display; -use crate::node::MessageId; +use protocol::node::MessageId; pub struct Sequence(Vec); #[derive(Debug, PartialEq, Eq)] pub enum Entry { - Message(MessageId), + Data(MessageId), Noise(u32), // the number of consecutive noises } impl Display for Entry { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let s = match self { - Entry::Message(msg) => msg.to_string(), + Entry::Data(msg) => msg.to_string(), Entry::Noise(cnt) => format!("-{cnt}"), }; f.write_str(s.as_str()) @@ -26,22 +26,29 @@ impl Sequence { } pub fn add_message(&mut self, msg: MessageId) { - self.0.push(Entry::Message(msg)); + self.0.push(Entry::Data(msg)); } pub fn add_noise(&mut self) { if let Some(last) = self.0.last_mut() { - if let Entry::Noise(cnt) = last { - *cnt += 1; - } else { - self.0.push(Entry::Noise(1)) + match last { + Entry::Noise(cnt) => { + *cnt += 1; + } + _ => self.0.push(Entry::Noise(1)), } + } else { + self.0.push(Entry::Noise(1)) } } pub fn iter(&self) -> impl Iterator { self.0.iter() } + + pub fn len(&self) -> usize { + self.0.len() + } } impl Sequence { @@ -50,7 +57,7 @@ impl Sequence { let mut i = 0; while i < self.0.len() { - if let Entry::Message(_) = &self.0[i] { + if let Entry::Data(_) = &self.0[i] { let (c, next_i) = self.ordering_coefficient_from(i, other, strong); coeff += c; @@ -74,12 +81,12 @@ impl Sequence { strong: bool, ) -> (u64, usize) { let msg1 = match self.0[start_idx] { - Entry::Message(msg) => msg, + Entry::Data(msg) => msg, _ => panic!("Entry at {start_idx} must be Message"), }; for (j, entry) in other.iter().enumerate() { - if let Entry::Message(msg2) = entry { + if let Entry::Data(msg2) = entry { if msg1 == *msg2 { // Found the 1st matching msg. Start finding the next adjacent matching msg. if strong { @@ -112,7 +119,7 @@ impl Sequence { break; } } - (Entry::Message(msg1), Entry::Message(msg2)) => { + (Entry::Data(msg1), Entry::Data(msg2)) => { if msg1 == msg2 { coeff += 1; i += 1; @@ -152,7 +159,7 @@ impl Sequence { fn skip_noise(&self, mut index: usize) -> usize { while index < self.0.len() { - if let Entry::Message(_) = self.0[index] { + if let Entry::Data(_) = self.0[index] { break; } index += 1; @@ -171,91 +178,83 @@ mod tests { assert_eq!(seq.ordering_coefficient(&seq, strong), 0); // Case 1: Exact one matched pair with no noise - let seq = Sequence(vec![Entry::Message(1), Entry::Message(2)]); + let seq = Sequence(vec![Entry::Data(1), Entry::Data(2)]); assert_eq!(seq.ordering_coefficient(&seq, strong), 1); // Case 2: Exact one matched pair with noise - let seq = Sequence(vec![Entry::Message(1), Entry::Noise(10), Entry::Message(2)]); + let seq = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]); assert_eq!(seq.ordering_coefficient(&seq, strong), 1); // Case 3: One matched pair with no noise - let seq1 = Sequence(vec![ - Entry::Message(1), - Entry::Message(2), - Entry::Message(3), - ]); - let seq2 = Sequence(vec![ - Entry::Message(1), - Entry::Message(2), - Entry::Message(4), - ]); + let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Data(3)]); + let seq2 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Data(4)]); assert_eq!(seq1.ordering_coefficient(&seq2, strong), 1); assert_eq!(seq2.ordering_coefficient(&seq1, strong), 1); // Case 4: One matched pair with noise let seq1 = Sequence(vec![ - Entry::Message(1), + Entry::Data(1), Entry::Noise(10), - Entry::Message(2), - Entry::Message(3), + Entry::Data(2), + Entry::Data(3), ]); - let seq2 = Sequence(vec![Entry::Message(1), Entry::Noise(10), Entry::Message(2)]); + let seq2 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]); assert_eq!(seq1.ordering_coefficient(&seq2, strong), 1); assert_eq!(seq2.ordering_coefficient(&seq1, strong), 1); // Case 5: Two matched pairs with noise let seq1 = Sequence(vec![ - Entry::Message(1), + Entry::Data(1), Entry::Noise(10), - Entry::Message(2), - Entry::Message(3), + Entry::Data(2), + Entry::Data(3), ]); let seq2 = Sequence(vec![ - Entry::Message(1), + Entry::Data(1), Entry::Noise(10), - Entry::Message(2), - Entry::Message(3), - Entry::Message(4), + Entry::Data(2), + Entry::Data(3), + Entry::Data(4), ]); assert_eq!(seq1.ordering_coefficient(&seq2, strong), 2); assert_eq!(seq2.ordering_coefficient(&seq1, strong), 2); // Case 6: Only partial match with no noise - let seq1 = Sequence(vec![Entry::Message(1), Entry::Message(2)]); - let seq2 = Sequence(vec![Entry::Message(2), Entry::Message(3)]); + let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2)]); + let seq2 = Sequence(vec![Entry::Data(2), Entry::Data(3)]); assert_eq!(seq1.ordering_coefficient(&seq2, strong), 0); assert_eq!(seq2.ordering_coefficient(&seq1, strong), 0); // Case 7: Only partial match with noise - let seq1 = Sequence(vec![Entry::Message(1), Entry::Message(2), Entry::Noise(10)]); - let seq2 = Sequence(vec![Entry::Message(2), Entry::Noise(10), Entry::Message(3)]); + let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Noise(10)]); + let seq2 = Sequence(vec![Entry::Data(2), Entry::Noise(10), Entry::Data(3)]); assert_eq!(seq1.ordering_coefficient(&seq2, strong), 0); assert_eq!(seq2.ordering_coefficient(&seq1, strong), 0); // Case 8: No match at all - let seq1 = Sequence(vec![Entry::Message(1), Entry::Message(2), Entry::Noise(10)]); - let seq2 = Sequence(vec![Entry::Message(3), Entry::Noise(10), Entry::Message(4)]); + let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Noise(10)]); + let seq2 = Sequence(vec![Entry::Data(3), Entry::Noise(10), Entry::Data(4)]); assert_eq!(seq1.ordering_coefficient(&seq2, strong), 0); assert_eq!(seq2.ordering_coefficient(&seq1, strong), 0); // Case 9: Matches with noise but mixed orders let seq1 = Sequence(vec![ - Entry::Message(1), - Entry::Message(2), + Entry::Data(1), + Entry::Data(2), Entry::Noise(10), - Entry::Message(3), - Entry::Message(4), - Entry::Message(5), - Entry::Message(6), + Entry::Data(3), + Entry::Data(4), + Entry::Data(5), + Entry::Data(6), ]); let seq2 = Sequence(vec![ - Entry::Message(4), - Entry::Message(5), - Entry::Message(1), - Entry::Message(2), + Entry::Data(4), + Entry::Data(5), + Entry::Data(1), + Entry::Data(2), Entry::Noise(10), - Entry::Message(3), - Entry::Message(6), + Entry::Data(3), + Entry::Data(6), ]); assert_eq!(seq1.ordering_coefficient(&seq2, strong), 3); assert_eq!(seq2.ordering_coefficient(&seq1, strong), 3); @@ -266,14 +265,14 @@ mod tests { test_ordering_coefficient_common(true); // Case 0: No match because of noise - let seq1 = Sequence(vec![Entry::Message(1), Entry::Noise(10), Entry::Message(2)]); - let seq2 = Sequence(vec![Entry::Message(1), Entry::Message(2)]); + let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]); + let seq2 = Sequence(vec![Entry::Data(1), Entry::Data(2)]); assert_eq!(seq1.ordering_coefficient(&seq2, true), 0); assert_eq!(seq2.ordering_coefficient(&seq1, true), 0); // Case 1: No match because of different count of noises - let seq1 = Sequence(vec![Entry::Message(1), Entry::Noise(10), Entry::Message(2)]); - let seq2 = Sequence(vec![Entry::Message(1), Entry::Noise(5), Entry::Message(2)]); + let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]); + let seq2 = Sequence(vec![Entry::Data(1), Entry::Noise(5), Entry::Data(2)]); assert_eq!(seq1.ordering_coefficient(&seq2, true), 0); assert_eq!(seq2.ordering_coefficient(&seq1, true), 0); } @@ -283,14 +282,14 @@ mod tests { test_ordering_coefficient_common(false); // Case 0: Match ignoring noises - let seq1 = Sequence(vec![Entry::Message(1), Entry::Noise(10), Entry::Message(2)]); - let seq2 = Sequence(vec![Entry::Message(1), Entry::Message(2)]); + let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]); + let seq2 = Sequence(vec![Entry::Data(1), Entry::Data(2)]); assert_eq!(seq1.ordering_coefficient(&seq2, false), 1); assert_eq!(seq2.ordering_coefficient(&seq1, false), 1); // Case 1: Match ignoring noise count - let seq1 = Sequence(vec![Entry::Message(1), Entry::Noise(10), Entry::Message(2)]); - let seq2 = Sequence(vec![Entry::Message(1), Entry::Noise(5), Entry::Message(2)]); + let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]); + let seq2 = Sequence(vec![Entry::Data(1), Entry::Noise(5), Entry::Data(2)]); assert_eq!(seq1.ordering_coefficient(&seq2, false), 1); assert_eq!(seq2.ordering_coefficient(&seq1, false), 1); } diff --git a/mixnet-rs/ordering/src/paramset.rs b/mixnet-rs/ordering/src/paramset.rs new file mode 100644 index 0000000..e211b38 --- /dev/null +++ b/mixnet-rs/ordering/src/paramset.rs @@ -0,0 +1,240 @@ +use protocol::queue::QueueType; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[repr(u8)] +pub enum ExperimentId { + Experiment1 = 1, + Experiment2 = 2, + Experiment3 = 3, + Experiment4 = 4, + Experiment5 = 5, + Experiment6 = 6, +} + +impl std::str::FromStr for ExperimentId { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "1" | "Experiment1" => Ok(ExperimentId::Experiment1), + "2" | "Experiment2" => Ok(ExperimentId::Experiment2), + "3" | "Experiment3" => Ok(ExperimentId::Experiment3), + "4" | "Experiment4" => Ok(ExperimentId::Experiment4), + "5" | "Experiment5" => Ok(ExperimentId::Experiment5), + "6" | "Experiment6" => Ok(ExperimentId::Experiment6), + _ => Err(format!("Invalid experiment ID: {}", s)), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[repr(u8)] +pub enum SessionId { + Session1 = 1, +} + +impl std::str::FromStr for SessionId { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "1" | "Session1" => Ok(SessionId::Session1), + _ => Err(format!("Invalid session ID: {}", s)), + } + } +} + +pub const PARAMSET_CSV_COLUMNS: &[&str] = &[ + "paramset", + "num_mixes", + "num_paths", + "random_topology", + "peering_degree", + "min_queue_size", + "transmission_rate", + "num_sender_msgs", + "sender_data_msg_prob", + "mix_data_msg_prob", + "queue_type", + "num_iterations", +]; + +#[derive(Debug, Clone, PartialEq)] +pub struct ParamSet { + pub id: u16, + pub num_mixes: u32, + pub num_paths: u16, + pub random_topology: bool, + pub peering_degree: u32, + pub min_queue_size: u16, + pub transmission_rate: u16, + pub num_sender_msgs: u32, + pub sender_data_msg_prob: f32, + pub mix_data_msg_prob: f32, + pub queue_type: QueueType, + pub num_iterations: usize, +} + +impl ParamSet { + pub fn new_all_paramsets( + exp_id: ExperimentId, + session_id: SessionId, + queue_type: QueueType, + ) -> Vec { + match session_id { + SessionId::Session1 => Self::new_session1_paramsets(exp_id, queue_type), + } + } + + fn new_session1_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec { + let transmission_rate: u16 = 1; + let min_queue_size: u16 = 10; + let num_sender_msgs: u32 = 1000000; + let sender_data_msg_probs: &[f32] = &[0.01, 0.1, 0.5, 0.9, 0.99, 1.0]; + let mix_data_msg_probs: &[f32] = match exp_id { + ExperimentId::Experiment1 | ExperimentId::Experiment3 | ExperimentId::Experiment5 => { + &[0.0] + } + ExperimentId::Experiment2 | ExperimentId::Experiment4 | ExperimentId::Experiment6 => { + &[0.001, 0.01, 0.1] + } + }; + + let mut id: u16 = 1; + let mut paramsets: Vec = Vec::new(); + match exp_id { + ExperimentId::Experiment1 + | ExperimentId::Experiment2 + | ExperimentId::Experiment3 + | ExperimentId::Experiment4 => { + for &num_paths in &[1, 2, 3, 4] { + for &num_mixes in &[1, 2, 3, 4] { + for &sender_data_msg_prob in sender_data_msg_probs { + for &mix_data_msg_prob in mix_data_msg_probs { + let paramset = ParamSet { + id, + num_mixes, + num_paths, + random_topology: false, + peering_degree: 1, + min_queue_size, + transmission_rate, + num_sender_msgs, + sender_data_msg_prob, + mix_data_msg_prob, + queue_type, + num_iterations: 1, + }; + id += 1; + paramsets.push(paramset); + } + } + } + } + } + ExperimentId::Experiment5 | ExperimentId::Experiment6 => { + for &num_mixes in &[8, 16, 32] { + for &peering_degree in &[2, 3, 4] { + for &sender_data_msg_prob in sender_data_msg_probs { + for &mix_data_msg_prob in mix_data_msg_probs { + let paramset = ParamSet { + id, + num_mixes, + num_paths: 0, // since we're gonna build random topology + random_topology: true, + peering_degree, + min_queue_size, + transmission_rate, + num_sender_msgs, + sender_data_msg_prob, + mix_data_msg_prob, + queue_type, + num_iterations: 10, + }; + id += 1; + paramsets.push(paramset); + } + } + } + } + } + } + + paramsets + } + + pub fn as_csv_record(&self) -> Vec { + vec![ + self.id.to_string(), + self.num_mixes.to_string(), + self.num_paths.to_string(), + self.random_topology.to_string(), + self.peering_degree.to_string(), + self.min_queue_size.to_string(), + self.transmission_rate.to_string(), + self.num_sender_msgs.to_string(), + self.sender_data_msg_prob.to_string(), + self.mix_data_msg_prob.to_string(), + format!("{:?}", self.queue_type), + self.num_iterations.to_string(), + ] + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use strum::IntoEnumIterator; + + use crate::paramset::ParamSet; + + use super::*; + + #[test] + fn test_new_all_paramsets() { + let cases = vec![ + ((ExperimentId::Experiment1, SessionId::Session1), 4 * 4 * 6), + ( + (ExperimentId::Experiment2, SessionId::Session1), + 4 * 4 * 6 * 3, + ), + ((ExperimentId::Experiment3, SessionId::Session1), 4 * 4 * 6), + ( + (ExperimentId::Experiment4, SessionId::Session1), + 4 * 4 * 6 * 3, + ), + ((ExperimentId::Experiment5, SessionId::Session1), 3 * 3 * 6), + ( + (ExperimentId::Experiment6, SessionId::Session1), + 3 * 3 * 6 * 3, + ), + ]; + + for queue_type in QueueType::iter() { + for ((exp_id, session_id), expected_cnt) in cases.clone().into_iter() { + let paramsets = ParamSet::new_all_paramsets(exp_id, session_id, queue_type); + + assert_eq!( + paramsets.len(), + expected_cnt as usize, + "queue_type:{:?}, exp:{:?}, session:{:?}", + queue_type, + exp_id, + session_id, + ); + + // Check if all parameter sets are unique + let unique_paramsets: HashSet> = paramsets + .iter() + .map(|paramset| paramset.as_csv_record()) + .collect(); + assert_eq!(unique_paramsets.len(), paramsets.len()); + + // Check if paramset IDs are correct. + for (i, paramset) in paramsets.iter().enumerate() { + assert_eq!(paramset.id as usize, i + 1); + } + } + } + } +} diff --git a/mixnet-rs/queue/Cargo.toml b/mixnet-rs/protocol/Cargo.toml similarity index 66% rename from mixnet-rs/queue/Cargo.toml rename to mixnet-rs/protocol/Cargo.toml index 4c1663d..e91a783 100644 --- a/mixnet-rs/queue/Cargo.toml +++ b/mixnet-rs/protocol/Cargo.toml @@ -1,9 +1,11 @@ [package] -name = "queue" +name = "protocol" version = "0.1.0" edition = "2021" [dependencies] rand = "0.8.5" +rustc-hash = "2.0.0" strum = "0.26.3" strum_macros = "0.26.4" +tracing = "0.1.40" diff --git a/mixnet-rs/protocol/src/lib.rs b/mixnet-rs/protocol/src/lib.rs new file mode 100644 index 0000000..c848b2b --- /dev/null +++ b/mixnet-rs/protocol/src/lib.rs @@ -0,0 +1,2 @@ +pub mod node; +pub mod queue; diff --git a/mixnet-rs/protocol/src/node.rs b/mixnet-rs/protocol/src/node.rs new file mode 100644 index 0000000..42f15c2 --- /dev/null +++ b/mixnet-rs/protocol/src/node.rs @@ -0,0 +1,107 @@ +use rustc_hash::{FxHashMap, FxHashSet}; + +use crate::queue::{new_queue, Message, Queue, QueueConfig}; + +pub type NodeId = u32; +pub type MessageId = u32; + +pub struct Node { + queue_config: QueueConfig, + // To have the deterministic result, we use Vec instead of FxHashMap. + // Building `queues` is inefficient, but it's not a problem because it's done only once at the beginning. + // Instead, use `connected_peers` to build `queues` efficiently. + queues: Vec<(NodeId, Box>)>, + connected_peers: FxHashSet, + // A cache to avoid relaying the same message multiple times. + received_msgs: Option>, + peering_degree: u32, +} + +impl Node { + pub fn new(queue_config: QueueConfig, peering_degree: u32, enable_cache: bool) -> Self { + Node { + queue_config, + queues: Vec::new(), + connected_peers: FxHashSet::default(), + received_msgs: if enable_cache { + Some(FxHashMap::default()) + } else { + None + }, + peering_degree, + } + } + + pub fn connect(&mut self, peer_id: NodeId) { + if self.connected_peers.insert(peer_id) { + let pos = self + .queues + .binary_search_by(|probe| probe.0.cmp(&peer_id)) + .unwrap_or_else(|pos| pos); + self.queues + .insert(pos, (peer_id, new_queue(&self.queue_config))); + } + } + + pub fn send(&mut self, msg: MessageId) { + assert!(self.check_and_update_cache(msg, true)); + for (_, queue) in self.queues.iter_mut() { + queue.push(msg); + } + } + + pub fn receive(&mut self, msg: MessageId, from: Option) -> bool { + let first_received = self.check_and_update_cache(msg, false); + if first_received { + for (node_id, queue) in self.queues.iter_mut() { + match from { + Some(sender) => { + if *node_id != sender { + queue.push(msg); + } + } + None => queue.push(msg), + } + } + } + first_received + } + + pub fn read_queues(&mut self) -> Vec<(NodeId, Message)> { + let mut msgs_to_relay: Vec<(NodeId, Message)> = Vec::new(); + self.queues.iter_mut().for_each(|(node_id, queue)| { + msgs_to_relay.push((*node_id, queue.pop())); + }); + msgs_to_relay + } + + pub fn data_count_in_queue(&self) -> Vec { + self.queues + .iter() + .map(|(_, queue)| queue.data_count()) + .collect() + } + + fn check_and_update_cache(&mut self, msg: MessageId, sending: bool) -> bool { + if let Some(received_msgs) = &mut self.received_msgs { + let first_received = if let Some(count) = received_msgs.get_mut(&msg) { + *count += 1; + false + } else { + received_msgs.insert(msg, if sending { 0 } else { 1 }); + true + }; + + // If the message have been received from all connected peers, remove it from the cache + // because there is no possibility that the message will be received again. + if received_msgs.get(&msg).unwrap() == &self.peering_degree { + tracing::debug!("Remove message from cache: {}", msg); + received_msgs.remove(&msg); + } + + first_received + } else { + true + } + } +} diff --git a/mixnet-rs/queue/src/lib.rs b/mixnet-rs/protocol/src/queue.rs similarity index 73% rename from mixnet-rs/queue/src/lib.rs rename to mixnet-rs/protocol/src/queue.rs index da9d901..c8c0044 100644 --- a/mixnet-rs/queue/src/lib.rs +++ b/mixnet-rs/protocol/src/queue.rs @@ -30,9 +30,15 @@ impl std::str::FromStr for QueueType { } pub trait Queue { - fn push(&mut self, msg: T); - fn pop(&mut self) -> Option; - fn message_count(&self) -> usize; + fn push(&mut self, data: T); + fn pop(&mut self) -> Message; + fn data_count(&self) -> usize; +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Message { + Data(T), + Noise, } pub struct QueueConfig { @@ -61,7 +67,7 @@ pub fn new_queue(cfg: &QueueConfig) -> Box> { } struct NonMixQueue { - queue: VecDeque, + queue: VecDeque, // don't need to contain Noise } impl NonMixQueue { @@ -73,59 +79,60 @@ impl NonMixQueue { } impl Queue for NonMixQueue { - fn push(&mut self, msg: T) { - self.queue.push_back(msg) + fn push(&mut self, data: T) { + self.queue.push_back(data) } - fn pop(&mut self) -> Option { - self.queue.pop_front() + fn pop(&mut self) -> Message { + match self.queue.pop_front() { + Some(data) => Message::Data(data), + None => Message::Noise, + } } - fn message_count(&self) -> usize { + fn data_count(&self) -> usize { self.queue.len() } } struct MixQueue { - queue: Vec>, // None element means noise - message_count: usize, + queue: Vec>, + data_count: usize, rng: StdRng, } impl MixQueue { fn new(num_initial_noises: usize, seed: u64) -> Self { Self { - queue: vec![None; num_initial_noises], - message_count: 0, + queue: vec![Message::Noise; num_initial_noises], + data_count: 0, rng: StdRng::seed_from_u64(seed), } } fn push(&mut self, data: T) { - self.queue.push(Some(data)); - self.message_count += 1; + self.queue.push(Message::Data(data)); + self.data_count += 1; } fn fill_noises(&mut self, k: usize) { - self.queue.extend(std::iter::repeat(None).take(k)) + self.queue.extend(std::iter::repeat(Message::Noise).take(k)) } - fn pop(&mut self, idx: usize) -> Option { + fn pop(&mut self, idx: usize) -> Option> { if idx < self.queue.len() { - match self.queue.remove(idx) { - Some(msg) => { - self.message_count -= 1; - Some(msg) - } - None => None, + let msg = self.queue.remove(idx); + if let Message::Data(_) = msg { + self.data_count -= 1; } + Some(msg) } else { None } } - fn message_count(&self) -> usize { - self.message_count + fn data_count(&self) -> usize { + self.data_count } fn len(&self) -> usize { @@ -162,12 +169,12 @@ impl MinSizeMixQueue { self.queue.push(msg) } - fn pop(&mut self, idx: usize) -> Option { + fn pop(&mut self, idx: usize) -> Option> { self.queue.pop(idx) } - fn message_count(&self) -> usize { - self.queue.message_count() + fn data_count(&self) -> usize { + self.queue.data_count() } fn ensure_min_size(&mut self) { @@ -211,20 +218,20 @@ impl Queue for PureCoinFlippingQueue { self.queue.push(msg) } - fn pop(&mut self) -> Option { + fn pop(&mut self) -> Message { self.queue.ensure_min_size(); loop { for i in 0..self.queue.len() { if self.queue.flip_coin() { - return self.queue.pop(i); + return self.queue.pop(i).unwrap(); } } } } - fn message_count(&self) -> usize { - self.queue.message_count() + fn data_count(&self) -> usize { + self.queue.data_count() } } @@ -245,15 +252,15 @@ impl Queue for PureRandomSamplingQueue { self.queue.push(msg) } - fn pop(&mut self) -> Option { + fn pop(&mut self) -> Message { self.queue.ensure_min_size(); let i = self.queue.sample_index(); - self.queue.pop(i) + self.queue.pop(i).unwrap() } - fn message_count(&self) -> usize { - self.queue.message_count() + fn data_count(&self) -> usize { + self.queue.data_count() } } @@ -274,7 +281,7 @@ impl Queue for PermutedCoinFlippingQueue { self.queue.push(msg) } - fn pop(&mut self) -> Option { + fn pop(&mut self) -> Message { self.queue.ensure_min_size(); self.queue.shuffle(); @@ -282,14 +289,14 @@ impl Queue for PermutedCoinFlippingQueue { loop { for i in 0..self.queue.len() { if self.queue.flip_coin() { - return self.queue.pop(i); + return self.queue.pop(i).unwrap(); } } } } - fn message_count(&self) -> usize { - self.queue.message_count() + fn data_count(&self) -> usize { + self.queue.data_count() } } @@ -310,24 +317,24 @@ impl Queue for NoisyCoinFlippingQueue { self.queue.push(msg) } - fn pop(&mut self) -> Option { + fn pop(&mut self) -> Message { if self.queue.len() == 0 { - return None; + return Message::Noise; } loop { for i in 0..self.queue.len() { if self.queue.flip_coin() { - return self.queue.pop(i); + return self.queue.pop(i).unwrap(); } else if i == 0 { - return None; + return Message::Noise; } } } } - fn message_count(&self) -> usize { - self.queue.message_count() + fn data_count(&self) -> usize { + self.queue.data_count() } } @@ -348,21 +355,21 @@ impl Queue for NoisyCoinFlippingRandomReleaseQueue { self.queue.push(msg) } - fn pop(&mut self) -> Option { + fn pop(&mut self) -> Message { if self.queue.len() == 0 { - return None; + return Message::Noise; } if self.queue.flip_coin() { let i = self.queue.sample_index(); - self.queue.pop(i) + self.queue.pop(i).unwrap() } else { - None + Message::Noise } } - fn message_count(&self) -> usize { - self.queue.message_count() + fn data_count(&self) -> usize { + self.queue.data_count() } } @@ -380,23 +387,23 @@ mod tests { min_queue_size: 0, }); - // Check if None (noise) is returned when queue is empty - assert_eq!(queue.pop(), None); + // Check if noise is returned when queue is empty + assert_eq!(queue.pop(), Message::Noise); // Check if queue is FIFO queue.push(0); queue.push(1); - assert_eq!(queue.pop(), Some(0)); - assert_eq!(queue.pop(), Some(1)); + assert_eq!(queue.pop(), Message::Data(0)); + assert_eq!(queue.pop(), Message::Data(1)); - // Check if None (noise) is returned when queue is empty - assert_eq!(queue.pop(), None); + // Check if noise is returned when queue is empty + assert_eq!(queue.pop(), Message::Noise); // Check if queue is FIFO again queue.push(2); queue.push(3); - assert_eq!(queue.pop(), Some(2)); - assert_eq!(queue.pop(), Some(3)); + assert_eq!(queue.pop(), Message::Data(2)); + assert_eq!(queue.pop(), Message::Data(3)); } #[test] @@ -419,8 +426,8 @@ mod tests { min_queue_size: 4, }); - // Check if None (noise) is returned when queue is empty - assert_eq!(queue.pop(), None); + // Check if noise is returned when queue is empty + assert_eq!(queue.pop(), Message::Noise); // Put only 2 messages even though the min queue size is 4 queue.push(0); @@ -429,12 +436,12 @@ mod tests { // Wait until 2 messages are returned from the queue let mut set: HashSet<_> = vec![0, 1].into_iter().collect(); while !set.is_empty() { - if let Some(msg) = queue.pop() { + if let Message::Data(msg) = queue.pop() { assert!(set.remove(&msg)); } } - // Check if None (noise) is returned when there is no real message remains - assert_eq!(queue.pop(), None); + // Check if noise is returned when there is no real message remains + assert_eq!(queue.pop(), Message::Noise); } } diff --git a/mixnet-rs/single-path/src/node.rs b/mixnet-rs/single-path/src/node.rs deleted file mode 100644 index 21c0b2c..0000000 --- a/mixnet-rs/single-path/src/node.rs +++ /dev/null @@ -1,35 +0,0 @@ -use queue::{new_queue, Queue, QueueConfig}; - -pub type MessageId = u32; - -pub struct Node { - queue: Box>, -} - -impl Node { - pub fn new(queue_config: &QueueConfig) -> Self { - Node { - queue: new_queue(queue_config), - } - } - - pub fn send(&mut self, msg: MessageId) { - // Schedule sending a new data message to the peer - self.queue.push(msg); - } - - pub fn receive(&mut self, msg: MessageId) { - // Relay the message to another peer. - // Don't need to accept noise in this function because it anyway has to be dropped. - self.queue.push(msg); - } - - pub fn read_queue(&mut self) -> Option { - // Returns `None` if a noise was read from the queue - self.queue.pop() - } - - pub fn message_count_in_queue(&self) -> u32 { - self.queue.message_count().try_into().unwrap() - } -} diff --git a/mixnet-rs/single-path/src/paramset.rs b/mixnet-rs/single-path/src/paramset.rs deleted file mode 100644 index f2fc92e..0000000 --- a/mixnet-rs/single-path/src/paramset.rs +++ /dev/null @@ -1,220 +0,0 @@ -use queue::QueueType; - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -#[repr(u8)] -pub enum ExperimentId { - Experiment1 = 1, - Experiment2 = 2, -} - -impl std::str::FromStr for ExperimentId { - type Err = String; - - fn from_str(s: &str) -> Result { - match s { - "1" | "Experiment1" => Ok(ExperimentId::Experiment1), - "2" | "Experiment2" => Ok(ExperimentId::Experiment2), - _ => Err(format!("Invalid experiment ID: {}", s)), - } - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -#[repr(u8)] -pub enum SessionId { - Session1 = 1, -} - -impl std::str::FromStr for SessionId { - type Err = String; - - fn from_str(s: &str) -> Result { - match s { - "1" | "Session1" => Ok(SessionId::Session1), - _ => Err(format!("Invalid session ID: {}", s)), - } - } -} - -pub const PARAMSET_CSV_COLUMNS: &[&str] = &[ - "paramset", - "num_nodes", - "peering_degree", - "min_queue_size", - "transmission_rate", - "num_sender_data_msgs", - "sender_data_msg_prob", - "mix_data_msg_prob", - "queue_type", - "num_iterations", -]; - -#[derive(Debug, Clone, PartialEq)] -pub struct ParamSet { - pub id: u16, - pub num_nodes: u32, - pub peering_degree: u32, - pub min_queue_size: u16, - pub transmission_rate: u16, - pub num_sender_data_msgs: u32, - pub sender_data_msg_prob: f32, - pub mix_data_msg_prob: f32, - pub queue_type: QueueType, - pub num_iterations: usize, -} - -impl ParamSet { - pub fn new_all_paramsets( - exp_id: ExperimentId, - session_id: SessionId, - queue_type: QueueType, - ) -> Vec { - match session_id { - SessionId::Session1 => Self::new_session1_paramsets(exp_id, queue_type), - } - } - - fn new_session1_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec { - let num_nodes: u32 = 3; - let peering_degree: u32 = 1; - let transmission_rate: u16 = 1; - let min_queue_sizes: &[u16] = &[ - transmission_rate.checked_div(2).unwrap(), - transmission_rate, - transmission_rate.checked_mul(2).unwrap(), - ]; - let num_sender_data_msgs: u32 = (transmission_rate as u32).checked_mul(1000).unwrap(); - let sender_data_msg_probs: &[f32] = &[0.01, 0.1, 0.5, 0.9, 0.99, 1.0]; - let mix_data_msg_probs: &[f32] = match exp_id { - ExperimentId::Experiment1 => &[0.0], - ExperimentId::Experiment2 => &[0.00001, 0.0001, 0.001, 0.01, 0.1], - }; - let num_iterations: usize = 100; - - let mut id: u16 = 1; - let mut paramsets: Vec = Vec::new(); - for &min_queue_size in min_queue_sizes { - for &sender_data_msg_prob in sender_data_msg_probs { - for &mix_data_msg_prob in mix_data_msg_probs { - if !Self::is_min_queue_size_applicable(&queue_type) - && min_queue_size != min_queue_sizes[0] - { - id += 1; - continue; - } - let paramset = ParamSet { - id, - num_nodes, - peering_degree, - min_queue_size, - transmission_rate, - num_sender_data_msgs, - sender_data_msg_prob, - mix_data_msg_prob, - queue_type, - num_iterations, - }; - id += 1; - paramsets.push(paramset); - } - } - } - paramsets - } - - pub fn is_min_queue_size_applicable(queue_type: &QueueType) -> bool { - matches!( - queue_type, - QueueType::PureCoinFlipping - | QueueType::PureRandomSampling - | QueueType::PermutedCoinFlipping - ) - } - - pub fn as_csv_record(&self) -> Vec { - vec![ - self.id.to_string(), - self.num_nodes.to_string(), - self.peering_degree.to_string(), - self.min_queue_size.to_string(), - self.transmission_rate.to_string(), - self.num_sender_data_msgs.to_string(), - self.sender_data_msg_prob.to_string(), - self.mix_data_msg_prob.to_string(), - format!("{:?}", self.queue_type), - self.num_iterations.to_string(), - ] - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashSet; - use strum::IntoEnumIterator; - - use crate::paramset::ParamSet; - - use super::*; - - #[test] - fn test_new_all_paramsets() { - let cases = vec![ - ((ExperimentId::Experiment1, SessionId::Session1), 3 * 6), - ((ExperimentId::Experiment2, SessionId::Session1), 3 * 6 * 5), - ]; - - for queue_type in QueueType::iter() { - for ((exp_id, session_id), mut expected_cnt) in cases.clone().into_iter() { - let paramsets = ParamSet::new_all_paramsets(exp_id, session_id, queue_type); - - // Check if the number of parameter sets is correct - if !ParamSet::is_min_queue_size_applicable(&queue_type) { - expected_cnt /= 3; - } - assert_eq!(paramsets.len(), expected_cnt as usize); - - // Check if all parameter sets are unique - let unique_paramsets: HashSet> = paramsets - .iter() - .map(|paramset| paramset.as_csv_record()) - .collect(); - assert_eq!(unique_paramsets.len(), paramsets.len()); - - // Check if paramset IDs are correct. - if ParamSet::is_min_queue_size_applicable(&queue_type) { - for (i, paramset) in paramsets.iter().enumerate() { - assert_eq!(paramset.id as usize, i + 1); - } - } - } - } - } - - #[test] - fn test_id_consistency() { - let cases = vec![ - (ExperimentId::Experiment1, SessionId::Session1), - (ExperimentId::Experiment2, SessionId::Session1), - ]; - - for (exp_id, session_id) in cases.into_iter() { - let paramsets_with_min_queue_size = - ParamSet::new_all_paramsets(exp_id, session_id, QueueType::PureCoinFlipping); - let paramsets_without_min_queue_size = - ParamSet::new_all_paramsets(exp_id, session_id, QueueType::NonMix); - - for (i, paramset) in paramsets_with_min_queue_size.iter().enumerate() { - assert_eq!(paramset.id as usize, i + 1); - } - - for mut paramset in paramsets_without_min_queue_size.into_iter() { - // To compare ParameterSet instances, use the same queue type. - paramset.queue_type = QueueType::PureCoinFlipping; - assert_eq!( - paramset, - paramsets_with_min_queue_size[paramset.id as usize - 1] - ); - } - } - } -}