From 960a184ee6c421966a6dd71ca03379aa0e97cf1f Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Sun, 25 Aug 2024 01:33:17 +0900 Subject: [PATCH] parallelize iterations --- mixnet-rs/ordering/Cargo.toml | 1 + mixnet-rs/ordering/src/iteration.rs | 315 ++++++++++++++++------------ mixnet-rs/ordering/src/main.rs | 85 ++------ 3 files changed, 207 insertions(+), 194 deletions(-) diff --git a/mixnet-rs/ordering/Cargo.toml b/mixnet-rs/ordering/Cargo.toml index 29d2397..c54b5eb 100644 --- a/mixnet-rs/ordering/Cargo.toml +++ b/mixnet-rs/ordering/Cargo.toml @@ -9,6 +9,7 @@ clap = { version = "4.5.16", features = ["derive"] } csv = "1.3.0" protocol = { version = "0.1.0", path = "../protocol" } rand = "0.8.5" +rayon = "1.10.0" rustc-hash = "2.0.0" tracing = "0.1.40" tracing-subscriber = "0.3.18" diff --git a/mixnet-rs/ordering/src/iteration.rs b/mixnet-rs/ordering/src/iteration.rs index 1a8c2dd..58560d9 100644 --- a/mixnet-rs/ordering/src/iteration.rs +++ b/mixnet-rs/ordering/src/iteration.rs @@ -1,4 +1,4 @@ -use std::collections::hash_map::Entry; +use std::{collections::hash_map::Entry, time::SystemTime}; use protocol::{ node::{MessagesToRelay, Node, NodeId}, @@ -8,145 +8,196 @@ use rand::{rngs::StdRng, Rng, SeedableRng}; use rustc_hash::FxHashMap; use crate::{ + format_duration, message::{DataMessage, DataMessageGenerator}, outputs::Outputs, paramset::ParamSet, topology::{build_random_network, build_striped_network, RECEIVER_NODE_ID}, }; -pub fn run_iteration(paramset: ParamSet, seed: u64, outputs: &mut Outputs) -> f32 { - let (mut mixnodes, all_sender_peers, receiver_peers) = if paramset.random_topology { - build_random_network(¶mset, seed, outputs) - } else { - build_striped_network(¶mset, seed) - }; - // Check node ID consistency - for (i, node) in mixnodes.iter().enumerate() { - assert_eq!(node.id as usize, i); - } - - // For N senders + 1 mix (all mixnodes will share the same sender ID) - let mut data_msg_gen = DataMessageGenerator::new(paramset.num_senders + 1); - let mix_msg_sender_id = paramset.num_senders; - - // Virtual discrete time - let mut vtime: f32 = 0.0; - // Transmission interval that each queue must release a message - let transmission_interval = 1.0 / paramset.transmission_rate as f32; - // Results - let mut all_sent_count = 0; // all data + noise sent by all senders - let all_sent_count_target = (paramset.num_sender_msgs as usize) - .checked_mul(paramset.num_senders as usize) - .unwrap(); - let mut sent_data_msgs: FxHashMap = FxHashMap::default(); - let mut recv_data_msgs: FxHashMap = FxHashMap::default(); - - outputs.write_header_queue_data_msg_counts(&mixnodes); - - let mut data_msg_rng = StdRng::seed_from_u64(seed); - loop { - tracing::trace!( - "VTIME:{}, ALL_SENT:{}, DATA_SENT:{}, DATA_RECEIVED:{}", - vtime, - all_sent_count, - sent_data_msgs.len(), - recv_data_msgs.len(), - ); - - // All senders emit a message (data or noise) to all of their own adjacent peers. - if all_sent_count < all_sent_count_target { - // For each sender - for (sender_idx, sender_peers) in all_sender_peers.iter() { - if try_probability(&mut data_msg_rng, paramset.sender_data_msg_prob) { - let msg = data_msg_gen.next(sender_idx); - sender_peers.iter().for_each(|peer_id| { - mixnodes - .get_mut(*peer_id as usize) - .unwrap() - .receive(msg, None); - }); - sent_data_msgs.insert(msg, vtime); - outputs.add_sent_msg(&msg) - } else { - // Generate noise and add it to the sequence to calculate ordering coefficients later, - // but don't need to send it to the mix nodes - // because the mix nodes will anyway drop the noise, - // and we don't need to record what the mix nodes receive. - outputs.add_sent_noise(sender_idx); - } - all_sent_count += 1; - } - } - - // Each mix node add a new data message to its queue with a certain probability - if paramset.mix_data_msg_prob > 0.0 { - for node in mixnodes.iter_mut() { - if try_probability(&mut data_msg_rng, paramset.mix_data_msg_prob) { - node.send(data_msg_gen.next(mix_msg_sender_id)); - // We don't put the msg into the sent_sequence - // because sent_sequence is only for recording messages sent by the senders, not the mixnode. - } - } - } - - // Each mix node relays a message (data or noise) to the next mix node or the receiver. - // As the receiver, record the time and order of the received messages. - AllMessagesToRelay::new(&mut mixnodes).into_iter().for_each( - |(relayer_id, msgs_to_relay)| { - msgs_to_relay.into_iter().for_each(|(peer_id, msg)| { - if peer_id == RECEIVER_NODE_ID { - match msg { - Message::Data(msg) => { - // If msg was sent by the sender (not by any mix) - if let Some(&sent_time) = sent_data_msgs.get(&msg) { - // If this is the first time to see the msg, - // update stats that must ignore duplicate messages. - if let Entry::Vacant(e) = recv_data_msgs.entry(msg) { - e.insert(vtime); - outputs.add_latency(&msg, sent_time, vtime); - } - } - // Record msg to the sequence - let conn_idx = receiver_peers.conn_idx(&relayer_id).unwrap(); - outputs.add_recv_msg(&msg, conn_idx); - } - Message::Noise => { - // Record noise to the sequence - let conn_idx = receiver_peers.conn_idx(&relayer_id).unwrap(); - outputs.add_recv_noise(conn_idx); - } - } - } else if let Message::Data(msg) = msg { - let peer = mixnodes.get_mut(peer_id as usize).unwrap(); - assert_eq!(peer.id, peer_id); - peer.receive(msg, Some(relayer_id)); - } - }); - }, - ); - - // Record the number of data messages in each mix node's queues - outputs.add_queue_data_msg_counts(vtime, &mixnodes); - - // If all senders finally emitted all data+noise messages, - // and If all data messages have been received by the receiver, - // stop the iteration. - if all_sent_count == all_sent_count_target && sent_data_msgs.len() == recv_data_msgs.len() { - break; - } - - vtime += transmission_interval; - } - - vtime +pub struct Iteration { + pub paramset: ParamSet, + pub iteration_idx: usize, + pub rootdir: String, } -fn try_probability(rng: &mut StdRng, prob: f32) -> bool { - assert!( - (0.0..=1.0).contains(&prob), - "Probability must be in [0, 1]." - ); - rng.gen::() < prob +impl Iteration { + pub fn start(&mut self) { + let dir = format!( + "{}/iteration_{}__WIP_DUR__", + self.rootdir, self.iteration_idx + ); + std::fs::create_dir_all(dir.as_str()).unwrap(); + + let mut outputs = Outputs::new( + format!("{dir}/latency__WIP__.csv"), + (0..self.paramset.num_senders) + .map(|sender_idx| format!("{dir}/sent_seq_{sender_idx}__WIP__.csv")) + .collect(), + (0..self.paramset.peering_degree) + .map(|conn_idx| format!("{dir}/recv_seq_{conn_idx}__WIP__.csv")) + .collect(), + format!("{dir}/data_msg_counts__WIP__.csv"), + format!("{dir}/topology.csv"), + ); + + let start_time = SystemTime::now(); + + let vtime = self.run(self.iteration_idx as u64, &mut outputs); + outputs.close(); + outputs.rename_paths("__WIP__.csv", ".csv"); + + let duration = format_duration(SystemTime::now().duration_since(start_time).unwrap()); + let new_dir = dir.replace("__WIP_DUR__", &format!("_{duration}")); + std::fs::rename(dir, new_dir).unwrap(); + + tracing::info!( + "ParamSet:{}, Iteration:{} completed. Duration:{}, vtime:{}", + self.paramset.id, + self.iteration_idx, + duration, + vtime + ); + } + + fn run(&mut self, seed: u64, outputs: &mut Outputs) -> f32 { + let paramset = &self.paramset; + + let (mut mixnodes, all_sender_peers, receiver_peers) = if paramset.random_topology { + build_random_network(paramset, seed, outputs) + } else { + build_striped_network(paramset, seed) + }; + // Check node ID consistency + for (i, node) in mixnodes.iter().enumerate() { + assert_eq!(node.id as usize, i); + } + + // For N senders + 1 mix (all mixnodes will share the same sender ID) + let mut data_msg_gen = DataMessageGenerator::new(paramset.num_senders + 1); + let mix_msg_sender_id = paramset.num_senders; + + // Virtual discrete time + let mut vtime: f32 = 0.0; + // Transmission interval that each queue must release a message + let transmission_interval = 1.0 / paramset.transmission_rate as f32; + // Results + let mut all_sent_count = 0; // all data + noise sent by all senders + let all_sent_count_target = (paramset.num_sender_msgs as usize) + .checked_mul(paramset.num_senders as usize) + .unwrap(); + let mut sent_data_msgs: FxHashMap = FxHashMap::default(); + let mut recv_data_msgs: FxHashMap = FxHashMap::default(); + + outputs.write_header_queue_data_msg_counts(&mixnodes); + + let mut data_msg_rng = StdRng::seed_from_u64(seed); + loop { + tracing::trace!( + "VTIME:{}, ALL_SENT:{}, DATA_SENT:{}, DATA_RECEIVED:{}", + vtime, + all_sent_count, + sent_data_msgs.len(), + recv_data_msgs.len(), + ); + + // All senders emit a message (data or noise) to all of their own adjacent peers. + if all_sent_count < all_sent_count_target { + // For each sender + for (sender_idx, sender_peers) in all_sender_peers.iter() { + if Self::try_probability(&mut data_msg_rng, paramset.sender_data_msg_prob) { + let msg = data_msg_gen.next(sender_idx); + sender_peers.iter().for_each(|peer_id| { + mixnodes + .get_mut(*peer_id as usize) + .unwrap() + .receive(msg, None); + }); + sent_data_msgs.insert(msg, vtime); + outputs.add_sent_msg(&msg) + } else { + // Generate noise and add it to the sequence to calculate ordering coefficients later, + // but don't need to send it to the mix nodes + // because the mix nodes will anyway drop the noise, + // and we don't need to record what the mix nodes receive. + outputs.add_sent_noise(sender_idx); + } + all_sent_count += 1; + } + } + + // Each mix node add a new data message to its queue with a certain probability + if paramset.mix_data_msg_prob > 0.0 { + for node in mixnodes.iter_mut() { + if Self::try_probability(&mut data_msg_rng, paramset.mix_data_msg_prob) { + node.send(data_msg_gen.next(mix_msg_sender_id)); + // We don't put the msg into the sent_sequence + // because sent_sequence is only for recording messages sent by the senders, not the mixnode. + } + } + } + + // Each mix node relays a message (data or noise) to the next mix node or the receiver. + // As the receiver, record the time and order of the received messages. + AllMessagesToRelay::new(&mut mixnodes).into_iter().for_each( + |(relayer_id, msgs_to_relay)| { + msgs_to_relay.into_iter().for_each(|(peer_id, msg)| { + if peer_id == RECEIVER_NODE_ID { + match msg { + Message::Data(msg) => { + // If msg was sent by the sender (not by any mix) + if let Some(&sent_time) = sent_data_msgs.get(&msg) { + // If this is the first time to see the msg, + // update stats that must ignore duplicate messages. + if let Entry::Vacant(e) = recv_data_msgs.entry(msg) { + e.insert(vtime); + outputs.add_latency(&msg, sent_time, vtime); + } + } + // Record msg to the sequence + let conn_idx = receiver_peers.conn_idx(&relayer_id).unwrap(); + outputs.add_recv_msg(&msg, conn_idx); + } + Message::Noise => { + // Record noise to the sequence + let conn_idx = receiver_peers.conn_idx(&relayer_id).unwrap(); + outputs.add_recv_noise(conn_idx); + } + } + } else if let Message::Data(msg) = msg { + let peer = mixnodes.get_mut(peer_id as usize).unwrap(); + assert_eq!(peer.id, peer_id); + peer.receive(msg, Some(relayer_id)); + } + }); + }, + ); + + // Record the number of data messages in each mix node's queues + outputs.add_queue_data_msg_counts(vtime, &mixnodes); + + // If all senders finally emitted all data+noise messages, + // and If all data messages have been received by the receiver, + // stop the iteration. + if all_sent_count == all_sent_count_target + && sent_data_msgs.len() == recv_data_msgs.len() + { + break; + } + + vtime += transmission_interval; + } + + vtime + } + + fn try_probability(rng: &mut StdRng, prob: f32) -> bool { + assert!( + (0.0..=1.0).contains(&prob), + "Probability must be in [0, 1]." + ); + rng.gen::() < prob + } } struct AllMessagesToRelay(Vec<(NodeId, MessagesToRelay)>); diff --git a/mixnet-rs/ordering/src/main.rs b/mixnet-rs/ordering/src/main.rs index a57cc73..ff838bd 100644 --- a/mixnet-rs/ordering/src/main.rs +++ b/mixnet-rs/ordering/src/main.rs @@ -13,10 +13,10 @@ use std::{ use chrono::Utc; use clap::Parser; -use iteration::run_iteration; -use outputs::Outputs; +use iteration::Iteration; use paramset::{ExperimentId, ParamSet, SessionId, PARAMSET_CSV_COLUMNS}; use protocol::queue::QueueType; +use rayon::prelude::*; #[derive(Debug, Parser)] #[command(name = "Ordering Measurement")] @@ -30,6 +30,8 @@ struct Args { #[arg(short, long)] outdir: String, #[arg(short, long)] + num_threads: usize, + #[arg(short, long)] from_paramset: Option, #[arg(short, long)] to_paramset: Option, @@ -45,6 +47,7 @@ fn main() { session_id, queue_type, outdir, + num_threads, from_paramset, to_paramset, } = args; @@ -67,6 +70,7 @@ fn main() { let session_start_time = SystemTime::now(); + let mut iterations: Vec = Vec::new(); for paramset in paramsets { if paramset.id < from_paramset.unwrap_or(0) { tracing::info!("ParamSet:{} skipped", paramset.id); @@ -76,73 +80,30 @@ fn main() { break; } - let paramset_dir = format!("{outdir}/{subdir}/__WIP__paramset_{}", paramset.id); + let paramset_dir = format!("{outdir}/{subdir}/paramset_{}", paramset.id); std::fs::create_dir_all(paramset_dir.as_str()).unwrap(); save_paramset_info(¶mset, format!("{paramset_dir}/paramset.csv").as_str()).unwrap(); - let dur_path = format!("{paramset_dir}/__WIP__durations.csv"); - let mut dur_writer = csv::Writer::from_path(&dur_path).unwrap(); - dur_writer - .write_record(["iteration", "time_human", "time_sec", "vtime"]) - .unwrap(); - dur_writer.flush().unwrap(); - for i in 0..paramset.num_iterations { - let mut outputs = Outputs::new( - format!("{paramset_dir}/__WIP__iteration_{i}_latency.csv"), - (0..paramset.num_senders) - .map(|sender_idx| { - format!("{paramset_dir}/__WIP__iteration_{i}_sent_seq_{sender_idx}.csv") - }) - .collect(), - (0..paramset.peering_degree) - .map(|conn_idx| { - format!("{paramset_dir}/__WIP__iteration_{i}_recv_seq_{conn_idx}.csv") - }) - .collect(), - format!("{paramset_dir}/__WIP__iteration_{i}_data_msg_counts.csv"), - format!("{paramset_dir}/iteration_{i}_topology.csv"), - ); - - let start_time = SystemTime::now(); - - let vtime = run_iteration(paramset.clone(), i as u64, &mut outputs); - outputs.close(); - outputs.rename_paths("__WIP__iteration", "iteration"); - - let duration = SystemTime::now().duration_since(start_time).unwrap(); - let duration_human = format_duration(duration); - dur_writer - .write_record([ - i.to_string(), - duration_human.clone(), - duration.as_secs().to_string(), - vtime.to_string(), - ]) - .unwrap(); - dur_writer.flush().unwrap(); - - tracing::info!( - "ParamSet:{}, Iteration:{} completed. Duration:{}, vtime:{}", - paramset.id, - i, - duration_human, - vtime - ); + iterations.push(Iteration { + paramset: paramset.clone(), + iteration_idx: i, + rootdir: paramset_dir.clone(), + }); } - dur_writer.flush().unwrap(); - - let new_dur_path = dur_path.replace("__WIP__durations", "durations"); - std::fs::rename(&dur_path, &new_dur_path) - .expect("Failed to rename: {dur_path} -> {new_dur_dir}: {e}"); - - let new_paramset_dir = paramset_dir.replace("__WIP__paramset_", "paramset_"); - std::fs::rename(¶mset_dir, &new_paramset_dir) - .expect("Failed to rename: {paramset_dir} -> {new_paramset_dir}: {e}"); - - tracing::info!("ParamSet:{} completed", paramset.id); } + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(num_threads) + .build() + .unwrap(); + + pool.install(|| { + iterations.par_iter_mut().for_each(|iteration| { + iteration.start(); + }); + }); + let session_duration = SystemTime::now() .duration_since(session_start_time) .unwrap();