From 2e26142ab50d1b87eba7969702a91037b1af9956 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Sun, 8 Sep 2024 08:36:34 +0900 Subject: [PATCH] add analysis binaries --- mixnet-rs/ordering/Cargo.toml | 8 ++ mixnet-rs/ordering/src/bin/aggregate.rs | 120 +++++++++++++++++++++ mixnet-rs/ordering/src/bin/datamsgcount.rs | 72 +++++++++++++ mixnet-rs/ordering/src/bin/latency.rs | 75 +++++++++++++ 4 files changed, 275 insertions(+) create mode 100644 mixnet-rs/ordering/src/bin/aggregate.rs create mode 100644 mixnet-rs/ordering/src/bin/datamsgcount.rs create mode 100644 mixnet-rs/ordering/src/bin/latency.rs diff --git a/mixnet-rs/ordering/Cargo.toml b/mixnet-rs/ordering/Cargo.toml index 020e225..317d30f 100644 --- a/mixnet-rs/ordering/Cargo.toml +++ b/mixnet-rs/ordering/Cargo.toml @@ -13,6 +13,14 @@ rand = "0.8.5" rustc-hash = "2.0.0" tracing = "0.1.40" tracing-subscriber = "0.3.18" +walkdir = "2.3" +glob = "0.3" +polars = { version = "0.42.0", features = [ + "csv", + "diagonal_concat", + "polars-io", + "zip_with", +] } [profile.release] opt-level = 3 # max optimization diff --git a/mixnet-rs/ordering/src/bin/aggregate.rs b/mixnet-rs/ordering/src/bin/aggregate.rs new file mode 100644 index 0000000..5bc01b9 --- /dev/null +++ b/mixnet-rs/ordering/src/bin/aggregate.rs @@ -0,0 +1,120 @@ +use std::{ + env, + fs::File, + path::{Path, PathBuf}, +}; + +use polars::prelude::*; +use walkdir::WalkDir; + +fn aggregate(path: &str) { + let mut schema = Schema::new(); + schema.with_column("paramset".into(), DataType::Int64); + schema.with_column("num_mixes".into(), DataType::Int64); + schema.with_column("num_paths".into(), DataType::Int64); + schema.with_column("random_topology".into(), DataType::Boolean); + schema.with_column("peering_degree".into(), DataType::Int64); + schema.with_column("min_queue_size".into(), DataType::Int64); + schema.with_column("transmission_rate".into(), DataType::Int64); + schema.with_column("num_senders".into(), DataType::Int64); + schema.with_column("num_sender_msgs".into(), DataType::Int64); + schema.with_column("sender_data_msg_prob".into(), DataType::Float32); + schema.with_column("mix_data_msg_prob".into(), DataType::Float32); + schema.with_column("queue_type".into(), DataType::String); + schema.with_column("num_iterations".into(), DataType::Int64); + + let mut dataframes: Vec = Vec::new(); + for entry in WalkDir::new(path) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_dir()) + { + let dir_name = entry.path().file_name().unwrap().to_string_lossy(); + if dir_name.starts_with("paramset_") { + let mut df = CsvReadOptions::default() + .with_has_header(true) + .with_schema(Some(SchemaRef::new(schema.clone()))) + .try_into_reader_with_file_path(Some(entry.path().join("paramset.csv"))) + .unwrap() + .finish() + .unwrap(); + + add_stats_columns( + &mut df, + entry.path().join("data_msg_counts_stats.csv"), + "data_msg_count_", + ); + add_stats_columns(&mut df, entry.path().join("latency_stats.csv"), "latency_"); + + dataframes.push(df); + } + } + + if !dataframes.is_empty() { + let df = polars::functions::concat_df_diagonal(dataframes.as_slice()).unwrap(); + let mut df = df + .sort(["paramset", "queue_type"], SortMultipleOptions::default()) + .unwrap(); + let outpath = Path::new(path).join("aggregated.csv"); + let mut file = File::create(&outpath).unwrap(); + CsvWriter::new(&mut file).finish(&mut df).unwrap(); + println!("Saved {}", outpath.display()); + } +} + +fn add_stats_columns(df: &mut DataFrame, path: PathBuf, col_prefix: &str) { + let mut schema = Schema::new(); + schema.with_column("min".into(), DataType::Float64); + schema.with_column("median".into(), DataType::Float64); + schema.with_column("mean".into(), DataType::Float64); + schema.with_column("std".into(), DataType::Float64); + schema.with_column("max".into(), DataType::Float64); + + let stats_df = CsvReadOptions::default() + .with_has_header(true) + .with_schema(Some(SchemaRef::new(schema))) + .try_into_reader_with_file_path(Some(path)) + .unwrap() + .finish() + .unwrap(); + df.with_column( + stats_df["min"] + .head(Some(1)) + .with_name(format!("{col_prefix}min").as_str()), + ) + .unwrap(); + df.with_column( + stats_df["median"] + .head(Some(1)) + .with_name(format!("{col_prefix}median").as_str()), + ) + .unwrap(); + df.with_column( + stats_df["mean"] + .head(Some(1)) + .with_name(format!("{col_prefix}mean").as_str()), + ) + .unwrap(); + df.with_column( + stats_df["std"] + .head(Some(1)) + .with_name(format!("{col_prefix}std").as_str()), + ) + .unwrap(); + df.with_column( + stats_df["max"] + .head(Some(1)) + .with_name(format!("{col_prefix}max").as_str()), + ) + .unwrap(); +} + +fn main() { + let args: Vec = env::args().collect(); + if args.len() < 2 { + eprintln!("Usage: {} ", args[0]); + std::process::exit(1); + } + let path = &args[1]; + aggregate(path); +} diff --git a/mixnet-rs/ordering/src/bin/datamsgcount.rs b/mixnet-rs/ordering/src/bin/datamsgcount.rs new file mode 100644 index 0000000..6441ece --- /dev/null +++ b/mixnet-rs/ordering/src/bin/datamsgcount.rs @@ -0,0 +1,72 @@ +use glob::glob; +use polars::prelude::*; +use std::env; +use std::fs::File; +use walkdir::WalkDir; + +fn aggregate(path: &str) { + for entry in WalkDir::new(path) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_dir()) + { + let dir_name = entry.path().file_name().unwrap().to_string_lossy(); + if dir_name.starts_with("paramset_") { + let mut aggregated_series = Series::new_empty("", &DataType::Int64); + let pattern = format!("{}/**/data_msg_counts.csv", entry.path().display()); + + for file in glob(&pattern).unwrap().filter_map(Result::ok) { + let df = CsvReadOptions::default() + .with_has_header(true) + .try_into_reader_with_file_path(Some(file.clone())) + .unwrap() + .finish() + .unwrap(); + + // Drop the 'vtime' column and collect all remaining value columns + let df_without_vtime = df.drop("vtime").unwrap_or_else(|_| df.clone()); + for col in df_without_vtime.get_columns() { + aggregated_series + .extend(&col.i64().unwrap().clone().into_series()) + .unwrap(); + } + + println!("Processed {}", file.display()); + } + + let output_file = format!("{}/data_msg_counts_stats.csv", entry.path().display()); + save_stats(&aggregated_series, &output_file); + } + } +} + +fn save_stats(aggregated: &Series, outpath: &str) { + let min = aggregated.min::().unwrap(); + let max = aggregated.max::().unwrap(); + let mean = aggregated.mean().unwrap(); + let median = aggregated.median().unwrap(); + let std = aggregated.std(1).unwrap(); + + let mut df = DataFrame::new(vec![ + Series::new("min", &[min]), + Series::new("median", &[median]), + Series::new("mean", &[mean]), + Series::new("std", &[std]), + Series::new("max", &[max]), + ]) + .unwrap(); + + let mut file = File::create(outpath).unwrap(); + CsvWriter::new(&mut file).finish(&mut df).unwrap(); + println!("Saved {}", outpath); +} + +fn main() { + let args: Vec = env::args().collect(); + if args.len() < 2 { + eprintln!("Usage: {} ", args[0]); + std::process::exit(1); + } + let path = &args[1]; + aggregate(path); +} diff --git a/mixnet-rs/ordering/src/bin/latency.rs b/mixnet-rs/ordering/src/bin/latency.rs new file mode 100644 index 0000000..de51b4f --- /dev/null +++ b/mixnet-rs/ordering/src/bin/latency.rs @@ -0,0 +1,75 @@ +use glob::glob; +use polars::prelude::*; +use std::env; +use std::fs::File; +use walkdir::WalkDir; + +fn aggregate(path: &str) { + for entry in WalkDir::new(path) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_dir()) + { + let dir_name = entry.path().file_name().unwrap().to_string_lossy(); + if dir_name.starts_with("paramset_") { + let mut aggregated_series = Series::new_empty("", &DataType::Int64); + let pattern = format!("{}/**/latency.csv", entry.path().display()); + + for file in glob(&pattern).unwrap().filter_map(Result::ok) { + let df = CsvReadOptions::default() + .with_has_header(true) + .try_into_reader_with_file_path(Some(file.clone())) + .unwrap() + .finish() + .unwrap(); + + aggregated_series + .extend( + &df.column("latency") + .unwrap() + .i64() + .unwrap() + .clone() + .into_series(), + ) + .unwrap(); + + println!("Processed {}", file.display()); + } + + let output_file = format!("{}/latency_stats.csv", entry.path().display()); + save_stats(&aggregated_series, &output_file); + } + } +} + +fn save_stats(aggregated: &Series, outpath: &str) { + let min = aggregated.min::().unwrap(); + let max = aggregated.max::().unwrap(); + let mean = aggregated.mean().unwrap(); + let median = aggregated.median().unwrap(); + let std = aggregated.std(1).unwrap(); + + let mut df = DataFrame::new(vec![ + Series::new("min", &[min]), + Series::new("median", &[median]), + Series::new("mean", &[mean]), + Series::new("std", &[std]), + Series::new("max", &[max]), + ]) + .unwrap(); + + let mut file = File::create(outpath).unwrap(); + CsvWriter::new(&mut file).finish(&mut df).unwrap(); + println!("Saved {}", outpath); +} + +fn main() { + let args: Vec = env::args().collect(); + if args.len() < 2 { + eprintln!("Usage: {} ", args[0]); + std::process::exit(1); + } + let path = &args[1]; + aggregate(path); +}