This commit is contained in:
Youngjoon Lee 2025-02-09 16:02:06 +09:00
parent c9fe4994c7
commit deefac1629
No known key found for this signature in database
GPG Key ID: D94003D91DE12141
8 changed files with 133 additions and 19 deletions

View File

@ -29,3 +29,4 @@ tracing-appender = "0.2"
cached = "0.54.0"
polars = "0.46.0"
humantime-serde = "1.1.1"
csv = "1.3.1"

View File

@ -64,6 +64,7 @@ pub fn analyze_latency(log_file: PathBuf, step_duration: Duration) -> Result<(),
#[derive(Serialize, Deserialize, Debug)]
struct Output {
count: usize,
min: i64,
min_payload_id: PayloadId,
q1: f64,
@ -81,6 +82,7 @@ impl Output {
let max = series.max::<i64>().unwrap().unwrap();
let max_payload_id = latency_to_message.get(&max).unwrap().clone();
Self {
count: series.len(),
min,
min_payload_id,
q1: quantile(series, 0.25),

View File

@ -0,0 +1,106 @@
use std::{fs, path::PathBuf, time::Duration};
use clap::Parser;
use serde::Deserialize;
use crate::{load_json_from_file, settings::SimSettings, SimulationApp};
#[derive(Parser)]
pub struct BatchApp {
#[clap(long, short)]
paramset_file: PathBuf,
#[clap(long, short)]
default_config_file: PathBuf,
#[clap(long, short)]
outdir: PathBuf,
}
impl BatchApp {
pub fn run(&self) -> anyhow::Result<()> {
self.prepare_config_files()?
.into_iter()
.try_for_each(|(config_path, log_path)| self.run_simulation(config_path, log_path))?;
Ok(())
}
fn prepare_config_files(&self) -> anyhow::Result<Vec<(PathBuf, PathBuf)>> {
let default_settings: SimSettings = load_json_from_file(&self.default_config_file)?;
let mut reader = csv::ReaderBuilder::new()
.has_headers(true)
.delimiter(b',')
.from_path(&self.paramset_file)?;
fs::create_dir_all(self.outdir.clone())?;
let mut config_and_log_paths = Vec::new();
for (i, record) in reader.deserialize().enumerate() {
let paramset: ParamSetRecord = record?;
let mut settings = default_settings.clone();
paramset.apply_to(&mut settings);
let dir_path = self.outdir.join(format!("sim-{}", i));
fs::create_dir_all(dir_path.clone())?;
let config_path = dir_path.join("config.json");
serde_json::to_writer_pretty(fs::File::create(config_path.clone())?, &settings)?;
println!("Wrote {}", config_path.display());
config_and_log_paths.push((config_path, dir_path.join("out.log")));
}
Ok(config_and_log_paths)
}
fn run_simulation(&self, config_path: PathBuf, log_path: PathBuf) -> anyhow::Result<()> {
println!(
"Running simulation with config file {}",
config_path.display()
);
let app = SimulationApp {
input_settings: config_path,
stream_type: None,
log_format: crate::log::LogFormat::Plain,
log_to: crate::log::LogOutput::File(log_path.clone()),
no_netcap: false,
with_metrics: false,
};
let maybe_guard = crate::log::config_tracing(app.log_format, &app.log_to, app.with_metrics);
let result = app.run();
if result.is_ok() {
println!(
"Simulation finished successfully, logs written to {}",
log_path.display()
);
}
drop(maybe_guard);
result
}
}
#[derive(Debug, Deserialize)]
struct ParamSetRecord {
network_size: usize,
peering_degree: usize,
blend_hops: usize,
cover_slots_per_epoch: usize,
cover_slot_duration: usize,
max_temporal_delay: usize,
}
impl ParamSetRecord {
fn apply_to(&self, settings: &mut SimSettings) {
settings.simulation_settings.node_count = self.network_size;
settings.connected_peers_count = self.peering_degree;
settings.number_of_hops = self.blend_hops;
settings.number_of_blend_layers = self.blend_hops;
settings.epoch_duration = Duration::from_secs(
(self.cover_slots_per_epoch * self.cover_slot_duration)
.try_into()
.unwrap(),
);
settings.slots_per_epoch = self.cover_slots_per_epoch;
settings.slot_duration = Duration::from_secs(self.cover_slot_duration.try_into().unwrap());
settings.max_delay_seconds = self.max_temporal_delay.try_into().unwrap();
}
}

View File

@ -0,0 +1 @@
pub mod batch;

View File

@ -3,7 +3,7 @@ use nomos_tracing::{
metrics::otlp::{create_otlp_metrics_layer, OtlpMetricsConfig},
};
use std::{path::PathBuf, str::FromStr};
use tracing::{level_filters::LevelFilter, Level};
use tracing::{level_filters::LevelFilter, subscriber::DefaultGuard, Level};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@ -50,7 +50,7 @@ pub fn config_tracing(
_fmt: LogFormat,
log_to: &LogOutput,
with_metrics: bool,
) -> Option<WorkerGuard> {
) -> Option<(WorkerGuard, DefaultGuard)> {
let mut layers: Vec<Box<dyn tracing_subscriber::Layer<_> + Send + Sync>> = vec![];
let (log_layer, guard) = match log_to {
@ -75,10 +75,10 @@ pub fn config_tracing(
layers.push(Box::new(metrics_layer));
}
tracing_subscriber::registry()
let a = tracing_subscriber::registry()
.with(LevelFilter::from(Level::INFO))
.with(layers)
.init();
.set_default();
Some(guard)
Some((guard, a))
}

View File

@ -10,6 +10,7 @@ use crate::node::blend::{BlendnodeSettings, SimMessage};
use analysis::history::analyze_message_history;
use analysis::latency::analyze_latency;
use anyhow::Ok;
use batch::batch::BatchApp;
use clap::{Parser, Subcommand};
use crossbeam::channel;
use multiaddr::Multiaddr;
@ -38,6 +39,7 @@ use crate::settings::SimSettings;
use netrunner::{runner::SimulationRunner, settings::SimulationSettings};
pub mod analysis;
pub mod batch;
mod log;
mod node;
mod settings;
@ -52,6 +54,8 @@ struct Cli {
enum Commands {
/// Run the simulation
Run(SimulationApp),
/// Run batch simulations
Batch(BatchApp),
/// Analyze the simulation results
Analyze {
#[command(subcommand)]
@ -360,6 +364,14 @@ fn main() -> anyhow::Result<()> {
}
Ok(())
}
Commands::Batch(app) => {
if let Err(e) = app.run() {
println!("FUCK: {}", e);
tracing::error!("error: {}", e);
std::process::exit(1);
}
Ok(())
}
Commands::Analyze { command } => match command {
AnalyzeCommands::Latency(app) => {
if let Err(e) = analyze_latency(app.log_file, app.step_duration) {

View File

@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
#[macro_export]
macro_rules! log {
($topic:expr, $msg:expr) => {
println!(
tracing::info!(
"{}",
serde_json::to_string(&$crate::node::blend::log::TopicLog {
topic: $topic.to_string(),

View File

@ -1,20 +1,20 @@
use netrunner::settings::SimulationSettings;
use nomos_blend::persistent_transmission::PersistentTransmissionSettings;
use serde::{Deserialize, Deserializer};
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct SimSettings {
#[serde(flatten)]
pub simulation_settings: SimulationSettings,
pub connected_peers_count: usize,
#[serde(deserialize_with = "deserialize_duration_with_human_time")]
#[serde(with = "humantime_serde")]
pub data_message_lottery_interval: Duration,
pub stake_proportion: f64,
// For tier 3: cover traffic
#[serde(deserialize_with = "deserialize_duration_with_human_time")]
#[serde(with = "humantime_serde")]
pub epoch_duration: Duration,
#[serde(deserialize_with = "deserialize_duration_with_human_time")]
#[serde(with = "humantime_serde")]
pub slot_duration: Duration,
pub slots_per_epoch: usize,
pub number_of_hops: usize,
@ -24,11 +24,3 @@ pub struct SimSettings {
pub number_of_blend_layers: usize,
pub max_delay_seconds: u64,
}
fn deserialize_duration_with_human_time<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
humantime::parse_duration(&s).map_err(serde::de::Error::custom)
}