diff --git a/simlib/blendnet-sims/Cargo.toml b/simlib/blendnet-sims/Cargo.toml index 6c3e8a8..8cb1b80 100644 --- a/simlib/blendnet-sims/Cargo.toml +++ b/simlib/blendnet-sims/Cargo.toml @@ -29,3 +29,4 @@ tracing-appender = "0.2" cached = "0.54.0" polars = "0.46.0" humantime-serde = "1.1.1" +csv = "1.3.1" diff --git a/simlib/blendnet-sims/src/analysis/latency.rs b/simlib/blendnet-sims/src/analysis/latency.rs index 3275b90..4540ef5 100644 --- a/simlib/blendnet-sims/src/analysis/latency.rs +++ b/simlib/blendnet-sims/src/analysis/latency.rs @@ -64,6 +64,7 @@ pub fn analyze_latency(log_file: PathBuf, step_duration: Duration) -> Result<(), #[derive(Serialize, Deserialize, Debug)] struct Output { + count: usize, min: i64, min_payload_id: PayloadId, q1: f64, @@ -81,6 +82,7 @@ impl Output { let max = series.max::().unwrap().unwrap(); let max_payload_id = latency_to_message.get(&max).unwrap().clone(); Self { + count: series.len(), min, min_payload_id, q1: quantile(series, 0.25), diff --git a/simlib/blendnet-sims/src/batch/batch.rs b/simlib/blendnet-sims/src/batch/batch.rs new file mode 100644 index 0000000..bab063a --- /dev/null +++ b/simlib/blendnet-sims/src/batch/batch.rs @@ -0,0 +1,106 @@ +use std::{fs, path::PathBuf, time::Duration}; + +use clap::Parser; +use serde::Deserialize; + +use crate::{load_json_from_file, settings::SimSettings, SimulationApp}; + +#[derive(Parser)] +pub struct BatchApp { + #[clap(long, short)] + paramset_file: PathBuf, + #[clap(long, short)] + default_config_file: PathBuf, + #[clap(long, short)] + outdir: PathBuf, +} + +impl BatchApp { + pub fn run(&self) -> anyhow::Result<()> { + self.prepare_config_files()? + .into_iter() + .try_for_each(|(config_path, log_path)| self.run_simulation(config_path, log_path))?; + + Ok(()) + } + + fn prepare_config_files(&self) -> anyhow::Result> { + let default_settings: SimSettings = load_json_from_file(&self.default_config_file)?; + let mut reader = csv::ReaderBuilder::new() + .has_headers(true) + .delimiter(b',') + .from_path(&self.paramset_file)?; + + fs::create_dir_all(self.outdir.clone())?; + + let mut config_and_log_paths = Vec::new(); + for (i, record) in reader.deserialize().enumerate() { + let paramset: ParamSetRecord = record?; + let mut settings = default_settings.clone(); + paramset.apply_to(&mut settings); + + let dir_path = self.outdir.join(format!("sim-{}", i)); + fs::create_dir_all(dir_path.clone())?; + + let config_path = dir_path.join("config.json"); + serde_json::to_writer_pretty(fs::File::create(config_path.clone())?, &settings)?; + println!("Wrote {}", config_path.display()); + + config_and_log_paths.push((config_path, dir_path.join("out.log"))); + } + Ok(config_and_log_paths) + } + + fn run_simulation(&self, config_path: PathBuf, log_path: PathBuf) -> anyhow::Result<()> { + println!( + "Running simulation with config file {}", + config_path.display() + ); + let app = SimulationApp { + input_settings: config_path, + stream_type: None, + log_format: crate::log::LogFormat::Plain, + log_to: crate::log::LogOutput::File(log_path.clone()), + no_netcap: false, + with_metrics: false, + }; + + let maybe_guard = crate::log::config_tracing(app.log_format, &app.log_to, app.with_metrics); + let result = app.run(); + if result.is_ok() { + println!( + "Simulation finished successfully, logs written to {}", + log_path.display() + ); + } + drop(maybe_guard); + result + } +} + +#[derive(Debug, Deserialize)] +struct ParamSetRecord { + network_size: usize, + peering_degree: usize, + blend_hops: usize, + cover_slots_per_epoch: usize, + cover_slot_duration: usize, + max_temporal_delay: usize, +} + +impl ParamSetRecord { + fn apply_to(&self, settings: &mut SimSettings) { + settings.simulation_settings.node_count = self.network_size; + settings.connected_peers_count = self.peering_degree; + settings.number_of_hops = self.blend_hops; + settings.number_of_blend_layers = self.blend_hops; + settings.epoch_duration = Duration::from_secs( + (self.cover_slots_per_epoch * self.cover_slot_duration) + .try_into() + .unwrap(), + ); + settings.slots_per_epoch = self.cover_slots_per_epoch; + settings.slot_duration = Duration::from_secs(self.cover_slot_duration.try_into().unwrap()); + settings.max_delay_seconds = self.max_temporal_delay.try_into().unwrap(); + } +} diff --git a/simlib/blendnet-sims/src/batch/mod.rs b/simlib/blendnet-sims/src/batch/mod.rs new file mode 100644 index 0000000..f02defe --- /dev/null +++ b/simlib/blendnet-sims/src/batch/mod.rs @@ -0,0 +1 @@ +pub mod batch; diff --git a/simlib/blendnet-sims/src/log.rs b/simlib/blendnet-sims/src/log.rs index 5111ab2..8a73b6b 100644 --- a/simlib/blendnet-sims/src/log.rs +++ b/simlib/blendnet-sims/src/log.rs @@ -3,7 +3,7 @@ use nomos_tracing::{ metrics::otlp::{create_otlp_metrics_layer, OtlpMetricsConfig}, }; use std::{path::PathBuf, str::FromStr}; -use tracing::{level_filters::LevelFilter, Level}; +use tracing::{level_filters::LevelFilter, subscriber::DefaultGuard, Level}; use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -50,7 +50,7 @@ pub fn config_tracing( _fmt: LogFormat, log_to: &LogOutput, with_metrics: bool, -) -> Option { +) -> Option<(WorkerGuard, DefaultGuard)> { let mut layers: Vec + Send + Sync>> = vec![]; let (log_layer, guard) = match log_to { @@ -75,10 +75,10 @@ pub fn config_tracing( layers.push(Box::new(metrics_layer)); } - tracing_subscriber::registry() + let a = tracing_subscriber::registry() .with(LevelFilter::from(Level::INFO)) .with(layers) - .init(); + .set_default(); - Some(guard) + Some((guard, a)) } diff --git a/simlib/blendnet-sims/src/main.rs b/simlib/blendnet-sims/src/main.rs index af6102d..628afa9 100644 --- a/simlib/blendnet-sims/src/main.rs +++ b/simlib/blendnet-sims/src/main.rs @@ -10,6 +10,7 @@ use crate::node::blend::{BlendnodeSettings, SimMessage}; use analysis::history::analyze_message_history; use analysis::latency::analyze_latency; use anyhow::Ok; +use batch::batch::BatchApp; use clap::{Parser, Subcommand}; use crossbeam::channel; use multiaddr::Multiaddr; @@ -38,6 +39,7 @@ use crate::settings::SimSettings; use netrunner::{runner::SimulationRunner, settings::SimulationSettings}; pub mod analysis; +pub mod batch; mod log; mod node; mod settings; @@ -52,6 +54,8 @@ struct Cli { enum Commands { /// Run the simulation Run(SimulationApp), + /// Run batch simulations + Batch(BatchApp), /// Analyze the simulation results Analyze { #[command(subcommand)] @@ -360,6 +364,14 @@ fn main() -> anyhow::Result<()> { } Ok(()) } + Commands::Batch(app) => { + if let Err(e) = app.run() { + println!("FUCK: {}", e); + tracing::error!("error: {}", e); + std::process::exit(1); + } + Ok(()) + } Commands::Analyze { command } => match command { AnalyzeCommands::Latency(app) => { if let Err(e) = analyze_latency(app.log_file, app.step_duration) { diff --git a/simlib/blendnet-sims/src/node/blend/log.rs b/simlib/blendnet-sims/src/node/blend/log.rs index b1f5bed..607899d 100644 --- a/simlib/blendnet-sims/src/node/blend/log.rs +++ b/simlib/blendnet-sims/src/node/blend/log.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; #[macro_export] macro_rules! log { ($topic:expr, $msg:expr) => { - println!( + tracing::info!( "{}", serde_json::to_string(&$crate::node::blend::log::TopicLog { topic: $topic.to_string(), diff --git a/simlib/blendnet-sims/src/settings.rs b/simlib/blendnet-sims/src/settings.rs index ef63637..525bd1a 100644 --- a/simlib/blendnet-sims/src/settings.rs +++ b/simlib/blendnet-sims/src/settings.rs @@ -1,20 +1,20 @@ use netrunner::settings::SimulationSettings; use nomos_blend::persistent_transmission::PersistentTransmissionSettings; -use serde::{Deserialize, Deserializer}; +use serde::{Deserialize, Serialize}; use std::time::Duration; -#[derive(Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct SimSettings { #[serde(flatten)] pub simulation_settings: SimulationSettings, pub connected_peers_count: usize, - #[serde(deserialize_with = "deserialize_duration_with_human_time")] + #[serde(with = "humantime_serde")] pub data_message_lottery_interval: Duration, pub stake_proportion: f64, // For tier 3: cover traffic - #[serde(deserialize_with = "deserialize_duration_with_human_time")] + #[serde(with = "humantime_serde")] pub epoch_duration: Duration, - #[serde(deserialize_with = "deserialize_duration_with_human_time")] + #[serde(with = "humantime_serde")] pub slot_duration: Duration, pub slots_per_epoch: usize, pub number_of_hops: usize, @@ -24,11 +24,3 @@ pub struct SimSettings { pub number_of_blend_layers: usize, pub max_delay_seconds: u64, } - -fn deserialize_duration_with_human_time<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - let s = String::deserialize(deserializer)?; - humantime::parse_duration(&s).map_err(serde::de::Error::custom) -}