diff --git a/mixnet-rs/dissemination/Cargo.toml b/mixnet-rs/dissemination/Cargo.toml index 61c8c78..e60ebee 100644 --- a/mixnet-rs/dissemination/Cargo.toml +++ b/mixnet-rs/dissemination/Cargo.toml @@ -10,5 +10,6 @@ csv = "1.3.0" rand = "0.8.5" strum = "0.26.3" strum_macros = "0.26.4" +tokio = { version = "1.39.2", features = ["rt", "rt-multi-thread", "sync"] } tracing = "0.1.40" tracing-subscriber = "0.3.18" diff --git a/mixnet-rs/dissemination/src/main.rs b/mixnet-rs/dissemination/src/main.rs index 2cec3fc..fa150f7 100644 --- a/mixnet-rs/dissemination/src/main.rs +++ b/mixnet-rs/dissemination/src/main.rs @@ -1,6 +1,11 @@ use chrono::Utc; use clap::Parser; -use std::{error::Error, path::Path, time::SystemTime}; +use std::{ + collections::HashMap, + error::Error, + path::Path, + time::{Duration, SystemTime}, +}; use iteration::run_iteration; use paramset::{ExperimentId, ParamSet, SessionId, PARAMSET_CSV_COLUMNS}; @@ -23,6 +28,8 @@ struct Args { queue_type: QueueType, #[arg(short, long)] outdir: String, + #[arg(short, long)] + num_threads: usize, } fn main() { @@ -35,8 +42,15 @@ fn main() { session_id, queue_type, outdir, + num_threads, } = args; + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(num_threads) + .enable_all() + .build() + .unwrap(); + // Create a directory and initialize a CSV file only with a header assert!( Path::new(&outdir).is_dir(), @@ -55,28 +69,51 @@ fn main() { let session_start_time = SystemTime::now(); - for paramset in paramsets { - let paramset_dir = format!("{outdir}/{subdir}/__WIP__paramset_{}", paramset.id); - std::fs::create_dir_all(paramset_dir.as_str()).unwrap(); - save_paramset_info(¶mset, format!("{paramset_dir}/paramset.csv").as_str()).unwrap(); + runtime.block_on(async { + let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::<(u16, u16)>(); - for i in 0..paramset.num_iterations { - let out_csv_path = format!("{paramset_dir}/__WIP__iteration_{i}.csv"); - let topology_path = format!("{paramset_dir}/topology_{i}.csv"); - run_iteration(paramset, i as u64, &out_csv_path, &topology_path); + let mut waiting_iterations: HashMap = HashMap::new(); + for paramset in paramsets { + let paramset_dir = format!("{outdir}/{subdir}/__WIP__paramset_{}", paramset.id); + std::fs::create_dir_all(paramset_dir.as_str()).unwrap(); + save_paramset_info(¶mset, format!("{paramset_dir}/paramset.csv").as_str()).unwrap(); - let new_out_csv_path = out_csv_path.replace("__WIP__iteration_", "iteration_"); - std::fs::rename(&out_csv_path, &new_out_csv_path) - .expect("Failed to rename: {out_csv_path} -> {new_out_csv_path}"); + for i in 0..paramset.num_iterations { + let out_csv_path = format!("{paramset_dir}/__WIP__iteration_{i}.csv"); + let topology_path = format!("{paramset_dir}/topology_{i}.csv"); + let sender = sender.clone(); + tokio::task::spawn(async move { + run_iteration(paramset, i as u64, &out_csv_path, &topology_path); - tracing::info!("ParamSet:{}, Iteration:{} completed.", paramset.id, i); + let new_out_csv_path = out_csv_path.replace("__WIP__iteration_", "iteration_"); + std::fs::rename(&out_csv_path, &new_out_csv_path) + .expect("Failed to rename: {out_csv_path} -> {new_out_csv_path}"); + tracing::info!("ParamSet:{}, Iteration:{} completed.", paramset.id, i); + + sender.send((paramset.id, i)).unwrap(); + }); + } + waiting_iterations.insert(paramset.id, (paramset.num_iterations, paramset_dir)); } - let new_paramset_dir = paramset_dir.replace("__WIP__paramset_", "paramset_"); - std::fs::rename(¶mset_dir, &new_paramset_dir) - .expect("Failed to rename: {paramset_dir} -> {new_paramset_dir}"); - tracing::info!("ParamSet:{} completed.", paramset.id); - } + while let Some((paramset_id, _)) = receiver.recv().await { + let (remaining_iterations, _) = waiting_iterations.get_mut(¶mset_id).unwrap(); + *remaining_iterations -= 1; + + if *remaining_iterations == 0 { + let paramset_dir = waiting_iterations.remove(¶mset_id).unwrap().1; + let new_paramset_dir = paramset_dir.replace("__WIP__paramset_", "paramset_"); + std::fs::rename(¶mset_dir, &new_paramset_dir) + .expect("Failed to rename: {paramset_dir} -> {new_paramset_dir}: {e}"); + tracing::info!("ParamSet:{} completed.", paramset_id); + } + + // Exit loop if no more iterations are waiting + if waiting_iterations.is_empty() { + break; + } + } + }); let session_duration = SystemTime::now() .duration_since(session_start_time) @@ -85,7 +122,7 @@ fn main() { // Replace "__WIP__" and "__DUR__" in the subdir string let new_subdir = subdir .replace("__WIP__", "") - .replace("__DUR__", &format!("{:?}", session_duration)); + .replace("__DUR__", &format_duration(session_duration)); let old_path = format!("{}/{}", outdir, subdir); let new_path = format!("{}/{}", outdir, new_subdir); assert!( @@ -94,6 +131,8 @@ fn main() { ); std::fs::rename(&old_path, &new_path) .expect("Failed to rename the directory: {old_path} -> {new_path}"); + + tracing::info!("Session completed."); } fn save_paramset_info(paramset: &ParamSet, path: &str) -> Result<(), Box> { @@ -110,3 +149,14 @@ fn save_paramset_info(paramset: &ParamSet, path: &str) -> Result<(), Box String { + let total_seconds = duration.as_secs(); + + let days = total_seconds / 86_400; + let hours = (total_seconds % 86_400) / 3_600; + let minutes = (total_seconds % 3_600) / 60; + let seconds = total_seconds % 60; + + format!("{}d{}h{}m{}s", days, hours, minutes, seconds) +}