From 58fccb085ed83024a0202ce47f16a35bda37e289 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Tue, 20 Aug 2024 16:19:01 +0200 Subject: [PATCH] implement logic --- mixnet-rs/queue/src/lib.rs | 46 +++++- mixnet-rs/single-path/src/iteration.rs | 209 +++++++++++++++++++++++++ mixnet-rs/single-path/src/main.rs | 115 +++++++++++++- mixnet-rs/single-path/src/node.rs | 35 +++++ 4 files changed, 401 insertions(+), 4 deletions(-) create mode 100644 mixnet-rs/single-path/src/iteration.rs create mode 100644 mixnet-rs/single-path/src/node.rs diff --git a/mixnet-rs/queue/src/lib.rs b/mixnet-rs/queue/src/lib.rs index f6b1722..da9d901 100644 --- a/mixnet-rs/queue/src/lib.rs +++ b/mixnet-rs/queue/src/lib.rs @@ -32,6 +32,7 @@ impl std::str::FromStr for QueueType { pub trait Queue { fn push(&mut self, msg: T); fn pop(&mut self) -> Option; + fn message_count(&self) -> usize; } pub struct QueueConfig { @@ -79,10 +80,15 @@ impl Queue for NonMixQueue { fn pop(&mut self) -> Option { self.queue.pop_front() } + + fn message_count(&self) -> usize { + self.queue.len() + } } struct MixQueue { queue: Vec>, // None element means noise + message_count: usize, rng: StdRng, } @@ -90,12 +96,14 @@ impl MixQueue { fn new(num_initial_noises: usize, seed: u64) -> Self { Self { queue: vec![None; num_initial_noises], + message_count: 0, rng: StdRng::seed_from_u64(seed), } } fn push(&mut self, data: T) { - self.queue.push(Some(data)) + self.queue.push(Some(data)); + self.message_count += 1; } fn fill_noises(&mut self, k: usize) { @@ -104,12 +112,22 @@ impl MixQueue { fn pop(&mut self, idx: usize) -> Option { if idx < self.queue.len() { - self.queue.remove(idx) + match self.queue.remove(idx) { + Some(msg) => { + self.message_count -= 1; + Some(msg) + } + None => None, + } } else { None } } + fn message_count(&self) -> usize { + self.message_count + } + fn len(&self) -> usize { self.queue.len() } @@ -148,6 +166,10 @@ impl MinSizeMixQueue { self.queue.pop(idx) } + fn message_count(&self) -> usize { + self.queue.message_count() + } + fn ensure_min_size(&mut self) { if self.queue.len() < self.min_pool_size as usize { self.queue @@ -200,6 +222,10 @@ impl Queue for PureCoinFlippingQueue { } } } + + fn message_count(&self) -> usize { + self.queue.message_count() + } } struct PureRandomSamplingQueue { @@ -225,6 +251,10 @@ impl Queue for PureRandomSamplingQueue { let i = self.queue.sample_index(); self.queue.pop(i) } + + fn message_count(&self) -> usize { + self.queue.message_count() + } } struct PermutedCoinFlippingQueue { @@ -257,6 +287,10 @@ impl Queue for PermutedCoinFlippingQueue { } } } + + fn message_count(&self) -> usize { + self.queue.message_count() + } } struct NoisyCoinFlippingQueue { @@ -291,6 +325,10 @@ impl Queue for NoisyCoinFlippingQueue { } } } + + fn message_count(&self) -> usize { + self.queue.message_count() + } } struct NoisyCoinFlippingRandomReleaseQueue { @@ -322,6 +360,10 @@ impl Queue for NoisyCoinFlippingRandomReleaseQueue { None } } + + fn message_count(&self) -> usize { + self.queue.message_count() + } } #[cfg(test)] diff --git a/mixnet-rs/single-path/src/iteration.rs b/mixnet-rs/single-path/src/iteration.rs new file mode 100644 index 0000000..2d1d25a --- /dev/null +++ b/mixnet-rs/single-path/src/iteration.rs @@ -0,0 +1,209 @@ +use std::{fmt::Display, path::Path}; + +use queue::QueueConfig; +use rand::{rngs::StdRng, Rng, SeedableRng}; +use rustc_hash::FxHashMap; + +use crate::{ + node::{MessageId, Node}, + paramset::ParamSet, +}; + +pub fn run_iteration( + paramset: ParamSet, + seed: u64, + out_latency_path: &str, + out_sent_sequence_path: &str, + out_received_sequence_path: &str, + out_data_msg_counts_path: &str, +) { + // Ensure that all output files do not exist + for path in &[ + out_latency_path, + out_sent_sequence_path, + out_received_sequence_path, + out_data_msg_counts_path, + ] { + assert!(!Path::new(path).exists(), "File already exists: {path}"); + } + + // Initialize a mix node + let mut mixnode = Node::new(&QueueConfig { + queue_type: paramset.queue_type, + seed, + min_queue_size: paramset.min_queue_size, + }); + + let mut next_msg_id: MessageId = 0; + + // Virtual discrete time + let mut vtime: f32 = 0.0; + // Transmission interval that each queue must release a message + let transmission_interval = 1.0 / paramset.transmission_rate as f32; + // Results + let mut sent_times: FxHashMap = FxHashMap::default(); + let mut latencies: FxHashMap = FxHashMap::default(); + let mut sent_sequence = MessageSequence::new(); + let mut received_sequence = MessageSequence::new(); + let mut data_msg_counts_in_queue: Vec = Vec::new(); + + let mut rng = StdRng::seed_from_u64(seed); + loop { + // The sender emits a message (data or noise) to the mix node. + if sent_times.len() < paramset.num_sender_data_msgs as usize + && try_probability(&mut rng, paramset.sender_data_msg_prob) + { + let msg = next_msg_id; + next_msg_id += 1; + mixnode.receive(msg); + sent_times.insert(msg, vtime); + sent_sequence.add_message(msg); + } else { + // Generate noise and add it to the sequence to calculate ordering coefficients later, + // but don't need to send it to the mix node + // because the mix node will anyway drop the noise, + // and we don't need to record what the mix node receives. + sent_sequence.add_noise(); + } + + // The mix node add a new data message to its queue with a certain probability + if try_probability(&mut rng, paramset.mix_data_msg_prob) { + mixnode.send(next_msg_id); + next_msg_id += 1; + // Don't put the msg into the sent_sequence + // because sent_sequence is only for recording messages sent by the sender, not the mixnode. + } + + // The mix node emits a message (data or noise) to the receiver. + // As the receiver, record the time and order of the received messages. + match mixnode.read_queue() { + Some(msg) => { + latencies.insert(msg, vtime - sent_times.get(&msg).unwrap()); + received_sequence.add_message(msg); + } + None => { + received_sequence.add_noise(); + } + } + + // Record the number of data messages in the mix node's queue + data_msg_counts_in_queue.push(mixnode.message_count_in_queue()); + + // If all messages have been received by the receiver, stop the iteration. + assert!(latencies.len() <= paramset.num_sender_data_msgs as usize); + if latencies.len() == paramset.num_sender_data_msgs as usize { + break; + } + + vtime += transmission_interval; + } + + // Save results to CSV files + save_latencies(&latencies, &sent_times, out_latency_path); + save_sequence(&sent_sequence, out_sent_sequence_path); + save_sequence(&received_sequence, out_received_sequence_path); + save_data_msg_counts( + &data_msg_counts_in_queue, + transmission_interval, + out_data_msg_counts_path, + ); +} + +fn try_probability(rng: &mut StdRng, prob: f32) -> bool { + assert!( + (0.0..=1.0).contains(&prob), + "Probability must be in [0, 1]." + ); + rng.gen::() < prob +} + +fn save_latencies( + latencies: &FxHashMap, + sent_times: &FxHashMap, + path: &str, +) { + let mut writer = csv::Writer::from_path(path).unwrap(); + writer + .write_record(["latency", "sent_time", "received_time"]) + .unwrap(); + for (msg, latency) in latencies.iter() { + let sent_time = sent_times.get(msg).unwrap(); + writer + .write_record(&[ + latency.to_string(), + sent_time.to_string(), + (sent_time + latency).to_string(), + ]) + .unwrap(); + } + writer.flush().unwrap(); +} + +fn save_sequence(sequence: &MessageSequence, path: &str) { + let mut writer = csv::Writer::from_path(path).unwrap(); + sequence.messages.iter().for_each(|entry| { + writer.write_record([entry.to_string()]).unwrap(); + }); + writer.flush().unwrap(); +} + +fn save_data_msg_counts( + data_msg_counts_in_queue: &[u32], + interval: f32, + out_data_msg_counts_path: &str, +) { + let mut writer = csv::Writer::from_path(out_data_msg_counts_path).unwrap(); + writer + .write_record(["vtime", "data_msg_count_in_queue"]) + .unwrap(); + data_msg_counts_in_queue + .iter() + .enumerate() + .for_each(|(i, count)| { + writer + .write_record([(i as f64 * interval as f64).to_string(), count.to_string()]) + .unwrap(); + }); + writer.flush().unwrap(); +} + +struct MessageSequence { + messages: Vec, +} + +impl MessageSequence { + fn new() -> Self { + Self { + messages: Vec::new(), + } + } + + fn add_message(&mut self, msg: MessageId) { + self.messages.push(SequenceEntry::Message(msg)); + } + + fn add_noise(&mut self) { + if let Some(last) = self.messages.last_mut() { + if let SequenceEntry::Noise(cnt) = last { + *cnt += 1; + } else { + self.messages.push(SequenceEntry::Noise(1)) + } + } + } +} + +enum SequenceEntry { + Message(MessageId), + Noise(u32), // the number of consecutive noises +} + +impl Display for SequenceEntry { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let s = match self { + SequenceEntry::Message(msg) => msg.to_string(), + SequenceEntry::Noise(cnt) => format!("-{cnt}"), + }; + f.write_str(s.as_str()) + } +} diff --git a/mixnet-rs/single-path/src/main.rs b/mixnet-rs/single-path/src/main.rs index fce7508..df33c6a 100644 --- a/mixnet-rs/single-path/src/main.rs +++ b/mixnet-rs/single-path/src/main.rs @@ -1,7 +1,17 @@ +mod iteration; +mod node; mod paramset; +use std::{ + error::Error, + path::Path, + time::{Duration, SystemTime}, +}; + +use chrono::Utc; use clap::Parser; -use paramset::{ExperimentId, SessionId}; +use iteration::run_iteration; +use paramset::{ExperimentId, ParamSet, SessionId, PARAMSET_CSV_COLUMNS}; use queue::QueueType; #[derive(Debug, Parser)] @@ -20,5 +30,106 @@ struct Args { } fn main() { - println!("Hello, world!"); + tracing_subscriber::fmt::init(); + + let args = Args::parse(); + tracing::info!("Arguments: {:?}", args); + let Args { + exp_id, + session_id, + queue_type, + outdir, + from_paramset, + } = args; + + // Create a directory and initialize a CSV file only with a header + assert!( + Path::new(&outdir).is_dir(), + "Output directory does not exist: {outdir}" + ); + let subdir = format!( + "__WIP__dissemination_e{}s{}_{:?}_{}___DUR__", + exp_id as u8, + session_id as u8, + queue_type, + Utc::now().to_rfc3339() + ); + std::fs::create_dir_all(&format!("{outdir}/{subdir}")).unwrap(); + + let paramsets = ParamSet::new_all_paramsets(exp_id, session_id, queue_type); + + let session_start_time = SystemTime::now(); + + for paramset in paramsets { + if paramset.id < from_paramset.unwrap_or(0) { + tracing::info!("ParamSet:{} skipped", paramset.id); + continue; + } + + let paramset_dir = format!("{outdir}/{subdir}/__WIP__paramset_{}", paramset.id); + std::fs::create_dir_all(paramset_dir.as_str()).unwrap(); + save_paramset_info(¶mset, format!("{paramset_dir}/paramset.csv").as_str()).unwrap(); + + for i in 0..paramset.num_iterations { + run_iteration( + paramset.clone(), + i as u64, + &format!("{paramset_dir}/iteration_{i}_latency.csv"), + &format!("{paramset_dir}/iteration_{i}_sent_seq.csv"), + &format!("{paramset_dir}/iteration_{i}_recv_seq.csv"), + &format!("{paramset_dir}/iteration_{i}_data_msg_counts.csv"), + ); + tracing::info!("ParamSet:{}, Iteration:{} completed.", paramset.id, i); + } + let new_paramset_dir = paramset_dir.replace("__WIP__paramset_", "paramset_"); + std::fs::rename(¶mset_dir, &new_paramset_dir) + .expect("Failed to rename: {paramset_dir} -> {new_paramset_dir}: {e}"); + + tracing::info!("ParamSet:{} completed", paramset.id); + } + + let session_duration = SystemTime::now() + .duration_since(session_start_time) + .unwrap(); + + // Replace "__WIP__" and "__DUR__" in the subdir string + let new_subdir = subdir + .replace("__WIP__", "") + .replace("__DUR__", &format_duration(session_duration)); + let old_path = format!("{}/{}", outdir, subdir); + let new_path = format!("{}/{}", outdir, new_subdir); + assert!( + !Path::new(&new_path).exists(), + "The new directory already exists: {new_path}" + ); + std::fs::rename(&old_path, &new_path) + .expect("Failed to rename the directory: {old_path} -> {new_path}"); + + tracing::info!("Session completed."); +} + +fn save_paramset_info(paramset: &ParamSet, path: &str) -> Result<(), Box> { + // Assert that the file does not already exist + assert!( + !Path::new(path).exists(), + "File already exists at path: {path}", + ); + + let mut wtr = csv::Writer::from_path(path)?; + wtr.write_record(PARAMSET_CSV_COLUMNS)?; + wtr.write_record(paramset.as_csv_record())?; + wtr.flush()?; + + Ok(()) +} + +fn format_duration(duration: Duration) -> String { + let total_seconds = duration.as_secs(); + + let days = total_seconds / 86_400; + let hours = (total_seconds % 86_400) / 3_600; + let minutes = (total_seconds % 3_600) / 60; + let seconds = total_seconds % 60; + + format!("{}d{}h{}m{}s", days, hours, minutes, seconds) } diff --git a/mixnet-rs/single-path/src/node.rs b/mixnet-rs/single-path/src/node.rs new file mode 100644 index 0000000..21c0b2c --- /dev/null +++ b/mixnet-rs/single-path/src/node.rs @@ -0,0 +1,35 @@ +use queue::{new_queue, Queue, QueueConfig}; + +pub type MessageId = u32; + +pub struct Node { + queue: Box>, +} + +impl Node { + pub fn new(queue_config: &QueueConfig) -> Self { + Node { + queue: new_queue(queue_config), + } + } + + pub fn send(&mut self, msg: MessageId) { + // Schedule sending a new data message to the peer + self.queue.push(msg); + } + + pub fn receive(&mut self, msg: MessageId) { + // Relay the message to another peer. + // Don't need to accept noise in this function because it anyway has to be dropped. + self.queue.push(msg); + } + + pub fn read_queue(&mut self) -> Option { + // Returns `None` if a noise was read from the queue + self.queue.pop() + } + + pub fn message_count_in_queue(&self) -> u32 { + self.queue.message_count().try_into().unwrap() + } +}