mirror of
https://github.com/logos-blockchain/logos-blockchain-simulations.git
synced 2026-01-10 00:53:11 +00:00
use tokio for multithreading
This commit is contained in:
parent
c7b83c813d
commit
c7ac0340a3
@ -10,5 +10,6 @@ csv = "1.3.0"
|
||||
rand = "0.8.5"
|
||||
strum = "0.26.3"
|
||||
strum_macros = "0.26.4"
|
||||
tokio = { version = "1.39.2", features = ["rt", "rt-multi-thread", "sync"] }
|
||||
tracing = "0.1.40"
|
||||
tracing-subscriber = "0.3.18"
|
||||
|
||||
@ -1,6 +1,11 @@
|
||||
use chrono::Utc;
|
||||
use clap::Parser;
|
||||
use std::{error::Error, path::Path, time::SystemTime};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
error::Error,
|
||||
path::Path,
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
|
||||
use iteration::run_iteration;
|
||||
use paramset::{ExperimentId, ParamSet, SessionId, PARAMSET_CSV_COLUMNS};
|
||||
@ -23,6 +28,8 @@ struct Args {
|
||||
queue_type: QueueType,
|
||||
#[arg(short, long)]
|
||||
outdir: String,
|
||||
#[arg(short, long)]
|
||||
num_threads: usize,
|
||||
}
|
||||
|
||||
fn main() {
|
||||
@ -35,8 +42,15 @@ fn main() {
|
||||
session_id,
|
||||
queue_type,
|
||||
outdir,
|
||||
num_threads,
|
||||
} = args;
|
||||
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(num_threads)
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
// Create a directory and initialize a CSV file only with a header
|
||||
assert!(
|
||||
Path::new(&outdir).is_dir(),
|
||||
@ -55,28 +69,51 @@ fn main() {
|
||||
|
||||
let session_start_time = SystemTime::now();
|
||||
|
||||
for paramset in paramsets {
|
||||
let paramset_dir = format!("{outdir}/{subdir}/__WIP__paramset_{}", paramset.id);
|
||||
std::fs::create_dir_all(paramset_dir.as_str()).unwrap();
|
||||
save_paramset_info(¶mset, format!("{paramset_dir}/paramset.csv").as_str()).unwrap();
|
||||
runtime.block_on(async {
|
||||
let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::<(u16, u16)>();
|
||||
|
||||
for i in 0..paramset.num_iterations {
|
||||
let out_csv_path = format!("{paramset_dir}/__WIP__iteration_{i}.csv");
|
||||
let topology_path = format!("{paramset_dir}/topology_{i}.csv");
|
||||
run_iteration(paramset, i as u64, &out_csv_path, &topology_path);
|
||||
let mut waiting_iterations: HashMap<u16, (u16, String)> = HashMap::new();
|
||||
for paramset in paramsets {
|
||||
let paramset_dir = format!("{outdir}/{subdir}/__WIP__paramset_{}", paramset.id);
|
||||
std::fs::create_dir_all(paramset_dir.as_str()).unwrap();
|
||||
save_paramset_info(¶mset, format!("{paramset_dir}/paramset.csv").as_str()).unwrap();
|
||||
|
||||
let new_out_csv_path = out_csv_path.replace("__WIP__iteration_", "iteration_");
|
||||
std::fs::rename(&out_csv_path, &new_out_csv_path)
|
||||
.expect("Failed to rename: {out_csv_path} -> {new_out_csv_path}");
|
||||
for i in 0..paramset.num_iterations {
|
||||
let out_csv_path = format!("{paramset_dir}/__WIP__iteration_{i}.csv");
|
||||
let topology_path = format!("{paramset_dir}/topology_{i}.csv");
|
||||
let sender = sender.clone();
|
||||
tokio::task::spawn(async move {
|
||||
run_iteration(paramset, i as u64, &out_csv_path, &topology_path);
|
||||
|
||||
tracing::info!("ParamSet:{}, Iteration:{} completed.", paramset.id, i);
|
||||
let new_out_csv_path = out_csv_path.replace("__WIP__iteration_", "iteration_");
|
||||
std::fs::rename(&out_csv_path, &new_out_csv_path)
|
||||
.expect("Failed to rename: {out_csv_path} -> {new_out_csv_path}");
|
||||
tracing::info!("ParamSet:{}, Iteration:{} completed.", paramset.id, i);
|
||||
|
||||
sender.send((paramset.id, i)).unwrap();
|
||||
});
|
||||
}
|
||||
waiting_iterations.insert(paramset.id, (paramset.num_iterations, paramset_dir));
|
||||
}
|
||||
|
||||
let new_paramset_dir = paramset_dir.replace("__WIP__paramset_", "paramset_");
|
||||
std::fs::rename(¶mset_dir, &new_paramset_dir)
|
||||
.expect("Failed to rename: {paramset_dir} -> {new_paramset_dir}");
|
||||
tracing::info!("ParamSet:{} completed.", paramset.id);
|
||||
}
|
||||
while let Some((paramset_id, _)) = receiver.recv().await {
|
||||
let (remaining_iterations, _) = waiting_iterations.get_mut(¶mset_id).unwrap();
|
||||
*remaining_iterations -= 1;
|
||||
|
||||
if *remaining_iterations == 0 {
|
||||
let paramset_dir = waiting_iterations.remove(¶mset_id).unwrap().1;
|
||||
let new_paramset_dir = paramset_dir.replace("__WIP__paramset_", "paramset_");
|
||||
std::fs::rename(¶mset_dir, &new_paramset_dir)
|
||||
.expect("Failed to rename: {paramset_dir} -> {new_paramset_dir}: {e}");
|
||||
tracing::info!("ParamSet:{} completed.", paramset_id);
|
||||
}
|
||||
|
||||
// Exit loop if no more iterations are waiting
|
||||
if waiting_iterations.is_empty() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let session_duration = SystemTime::now()
|
||||
.duration_since(session_start_time)
|
||||
@ -85,7 +122,7 @@ fn main() {
|
||||
// Replace "__WIP__" and "__DUR__" in the subdir string
|
||||
let new_subdir = subdir
|
||||
.replace("__WIP__", "")
|
||||
.replace("__DUR__", &format!("{:?}", session_duration));
|
||||
.replace("__DUR__", &format_duration(session_duration));
|
||||
let old_path = format!("{}/{}", outdir, subdir);
|
||||
let new_path = format!("{}/{}", outdir, new_subdir);
|
||||
assert!(
|
||||
@ -94,6 +131,8 @@ fn main() {
|
||||
);
|
||||
std::fs::rename(&old_path, &new_path)
|
||||
.expect("Failed to rename the directory: {old_path} -> {new_path}");
|
||||
|
||||
tracing::info!("Session completed.");
|
||||
}
|
||||
|
||||
fn save_paramset_info(paramset: &ParamSet, path: &str) -> Result<(), Box<dyn Error>> {
|
||||
@ -110,3 +149,14 @@ fn save_paramset_info(paramset: &ParamSet, path: &str) -> Result<(), Box<dyn Err
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn format_duration(duration: Duration) -> String {
|
||||
let total_seconds = duration.as_secs();
|
||||
|
||||
let days = total_seconds / 86_400;
|
||||
let hours = (total_seconds % 86_400) / 3_600;
|
||||
let minutes = (total_seconds % 3_600) / 60;
|
||||
let seconds = total_seconds % 60;
|
||||
|
||||
format!("{}d{}h{}m{}s", days, hours, minutes, seconds)
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user