add analysis binaries

This commit is contained in:
Youngjoon Lee 2024-09-08 08:36:34 +09:00
parent befe73853b
commit 2e26142ab5
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
4 changed files with 275 additions and 0 deletions

View File

@ -13,6 +13,14 @@ rand = "0.8.5"
rustc-hash = "2.0.0"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
walkdir = "2.3"
glob = "0.3"
polars = { version = "0.42.0", features = [
"csv",
"diagonal_concat",
"polars-io",
"zip_with",
] }
[profile.release]
opt-level = 3 # max optimization

View File

@ -0,0 +1,120 @@
use std::{
env,
fs::File,
path::{Path, PathBuf},
};
use polars::prelude::*;
use walkdir::WalkDir;
fn aggregate(path: &str) {
let mut schema = Schema::new();
schema.with_column("paramset".into(), DataType::Int64);
schema.with_column("num_mixes".into(), DataType::Int64);
schema.with_column("num_paths".into(), DataType::Int64);
schema.with_column("random_topology".into(), DataType::Boolean);
schema.with_column("peering_degree".into(), DataType::Int64);
schema.with_column("min_queue_size".into(), DataType::Int64);
schema.with_column("transmission_rate".into(), DataType::Int64);
schema.with_column("num_senders".into(), DataType::Int64);
schema.with_column("num_sender_msgs".into(), DataType::Int64);
schema.with_column("sender_data_msg_prob".into(), DataType::Float32);
schema.with_column("mix_data_msg_prob".into(), DataType::Float32);
schema.with_column("queue_type".into(), DataType::String);
schema.with_column("num_iterations".into(), DataType::Int64);
let mut dataframes: Vec<DataFrame> = Vec::new();
for entry in WalkDir::new(path)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_dir())
{
let dir_name = entry.path().file_name().unwrap().to_string_lossy();
if dir_name.starts_with("paramset_") {
let mut df = CsvReadOptions::default()
.with_has_header(true)
.with_schema(Some(SchemaRef::new(schema.clone())))
.try_into_reader_with_file_path(Some(entry.path().join("paramset.csv")))
.unwrap()
.finish()
.unwrap();
add_stats_columns(
&mut df,
entry.path().join("data_msg_counts_stats.csv"),
"data_msg_count_",
);
add_stats_columns(&mut df, entry.path().join("latency_stats.csv"), "latency_");
dataframes.push(df);
}
}
if !dataframes.is_empty() {
let df = polars::functions::concat_df_diagonal(dataframes.as_slice()).unwrap();
let mut df = df
.sort(["paramset", "queue_type"], SortMultipleOptions::default())
.unwrap();
let outpath = Path::new(path).join("aggregated.csv");
let mut file = File::create(&outpath).unwrap();
CsvWriter::new(&mut file).finish(&mut df).unwrap();
println!("Saved {}", outpath.display());
}
}
fn add_stats_columns(df: &mut DataFrame, path: PathBuf, col_prefix: &str) {
let mut schema = Schema::new();
schema.with_column("min".into(), DataType::Float64);
schema.with_column("median".into(), DataType::Float64);
schema.with_column("mean".into(), DataType::Float64);
schema.with_column("std".into(), DataType::Float64);
schema.with_column("max".into(), DataType::Float64);
let stats_df = CsvReadOptions::default()
.with_has_header(true)
.with_schema(Some(SchemaRef::new(schema)))
.try_into_reader_with_file_path(Some(path))
.unwrap()
.finish()
.unwrap();
df.with_column(
stats_df["min"]
.head(Some(1))
.with_name(format!("{col_prefix}min").as_str()),
)
.unwrap();
df.with_column(
stats_df["median"]
.head(Some(1))
.with_name(format!("{col_prefix}median").as_str()),
)
.unwrap();
df.with_column(
stats_df["mean"]
.head(Some(1))
.with_name(format!("{col_prefix}mean").as_str()),
)
.unwrap();
df.with_column(
stats_df["std"]
.head(Some(1))
.with_name(format!("{col_prefix}std").as_str()),
)
.unwrap();
df.with_column(
stats_df["max"]
.head(Some(1))
.with_name(format!("{col_prefix}max").as_str()),
)
.unwrap();
}
fn main() {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
eprintln!("Usage: {} <path>", args[0]);
std::process::exit(1);
}
let path = &args[1];
aggregate(path);
}

View File

@ -0,0 +1,72 @@
use glob::glob;
use polars::prelude::*;
use std::env;
use std::fs::File;
use walkdir::WalkDir;
fn aggregate(path: &str) {
for entry in WalkDir::new(path)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_dir())
{
let dir_name = entry.path().file_name().unwrap().to_string_lossy();
if dir_name.starts_with("paramset_") {
let mut aggregated_series = Series::new_empty("", &DataType::Int64);
let pattern = format!("{}/**/data_msg_counts.csv", entry.path().display());
for file in glob(&pattern).unwrap().filter_map(Result::ok) {
let df = CsvReadOptions::default()
.with_has_header(true)
.try_into_reader_with_file_path(Some(file.clone()))
.unwrap()
.finish()
.unwrap();
// Drop the 'vtime' column and collect all remaining value columns
let df_without_vtime = df.drop("vtime").unwrap_or_else(|_| df.clone());
for col in df_without_vtime.get_columns() {
aggregated_series
.extend(&col.i64().unwrap().clone().into_series())
.unwrap();
}
println!("Processed {}", file.display());
}
let output_file = format!("{}/data_msg_counts_stats.csv", entry.path().display());
save_stats(&aggregated_series, &output_file);
}
}
}
fn save_stats(aggregated: &Series, outpath: &str) {
let min = aggregated.min::<i64>().unwrap();
let max = aggregated.max::<i64>().unwrap();
let mean = aggregated.mean().unwrap();
let median = aggregated.median().unwrap();
let std = aggregated.std(1).unwrap();
let mut df = DataFrame::new(vec![
Series::new("min", &[min]),
Series::new("median", &[median]),
Series::new("mean", &[mean]),
Series::new("std", &[std]),
Series::new("max", &[max]),
])
.unwrap();
let mut file = File::create(outpath).unwrap();
CsvWriter::new(&mut file).finish(&mut df).unwrap();
println!("Saved {}", outpath);
}
fn main() {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
eprintln!("Usage: {} <path>", args[0]);
std::process::exit(1);
}
let path = &args[1];
aggregate(path);
}

View File

@ -0,0 +1,75 @@
use glob::glob;
use polars::prelude::*;
use std::env;
use std::fs::File;
use walkdir::WalkDir;
fn aggregate(path: &str) {
for entry in WalkDir::new(path)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_dir())
{
let dir_name = entry.path().file_name().unwrap().to_string_lossy();
if dir_name.starts_with("paramset_") {
let mut aggregated_series = Series::new_empty("", &DataType::Int64);
let pattern = format!("{}/**/latency.csv", entry.path().display());
for file in glob(&pattern).unwrap().filter_map(Result::ok) {
let df = CsvReadOptions::default()
.with_has_header(true)
.try_into_reader_with_file_path(Some(file.clone()))
.unwrap()
.finish()
.unwrap();
aggregated_series
.extend(
&df.column("latency")
.unwrap()
.i64()
.unwrap()
.clone()
.into_series(),
)
.unwrap();
println!("Processed {}", file.display());
}
let output_file = format!("{}/latency_stats.csv", entry.path().display());
save_stats(&aggregated_series, &output_file);
}
}
}
fn save_stats(aggregated: &Series, outpath: &str) {
let min = aggregated.min::<i64>().unwrap();
let max = aggregated.max::<i64>().unwrap();
let mean = aggregated.mean().unwrap();
let median = aggregated.median().unwrap();
let std = aggregated.std(1).unwrap();
let mut df = DataFrame::new(vec![
Series::new("min", &[min]),
Series::new("median", &[median]),
Series::new("mean", &[mean]),
Series::new("std", &[std]),
Series::new("max", &[max]),
])
.unwrap();
let mut file = File::create(outpath).unwrap();
CsvWriter::new(&mut file).finish(&mut df).unwrap();
println!("Saved {}", outpath);
}
fn main() {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
eprintln!("Usage: {} <path>", args[0]);
std::process::exit(1);
}
let path = &args[1];
aggregate(path);
}