From a19278480d054400d5f3a774d6f61fc0846253ad Mon Sep 17 00:00:00 2001 From: Al Liu Date: Tue, 4 Apr 2023 20:26:57 +0800 Subject: [PATCH] Pipe output data (#111) --- simulations/Cargo.toml | 1 + simulations/src/bin/app.rs | 34 +++++++++--------------- simulations/src/node/carnot/mod.rs | 4 +-- simulations/src/node/dummy.rs | 4 +-- simulations/src/output_processors/mod.rs | 30 ++++++++++++++++++++- simulations/src/runner/async_runner.rs | 10 ++++--- simulations/src/runner/glauber_runner.rs | 11 ++++++-- simulations/src/runner/layered_runner.rs | 10 ++++--- simulations/src/runner/mod.rs | 31 ++++++++++----------- simulations/src/runner/sync_runner.rs | 11 +++++--- simulations/src/warding/mod.rs | 9 +++++++ 11 files changed, 101 insertions(+), 54 deletions(-) diff --git a/simulations/Cargo.toml b/simulations/Cargo.toml index 3a010183..3104c597 100644 --- a/simulations/Cargo.toml +++ b/simulations/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = "1" clap = { version = "4", features = ["derive"] } crc32fast = "1.3" crossbeam = { version = "0.8.2", features = ["crossbeam-channel"] } diff --git a/simulations/src/bin/app.rs b/simulations/src/bin/app.rs index 7f98ff81..246d03f1 100644 --- a/simulations/src/bin/app.rs +++ b/simulations/src/bin/app.rs @@ -1,6 +1,5 @@ -use std::collections::HashMap; // std -use std::error::Error; +use std::collections::HashMap; use std::fmt::{Display, Formatter}; use std::fs::File; use std::io::Cursor; @@ -90,7 +89,7 @@ pub struct SimulationApp { } impl SimulationApp { - pub fn run(self) -> Result<(), Box> { + pub fn run(self) -> anyhow::Result<()> { let Self { input_settings, output_file, @@ -105,7 +104,7 @@ impl SimulationApp { SimulationRunner::new(network, nodes, simulation_settings); // build up series vector let mut out_data: Vec = Vec::new(); - simulation_runner.simulate(Some(&mut out_data)); + simulation_runner.simulate(Some(&mut out_data))?; let mut dataframe: DataFrame = out_data_to_dataframe(out_data); dump_dataframe_to(output_format, &mut dataframe, &output_file)?; Ok(()) @@ -125,44 +124,37 @@ fn out_data_to_dataframe(out_data: Vec) -> DataFrame { } /// Generically load a json file -fn load_json_from_file(path: &Path) -> Result> { +fn load_json_from_file(path: &Path) -> anyhow::Result { let f = File::open(path).map_err(Box::new)?; - serde_json::from_reader(f).map_err(|e| Box::new(e) as Box) + Ok(serde_json::from_reader(f)?) } -fn dump_dataframe_to_json(data: &mut DataFrame, out_path: &Path) -> Result<(), Box> { +fn dump_dataframe_to_json(data: &mut DataFrame, out_path: &Path) -> anyhow::Result<()> { let out_path = out_path.with_extension("json"); let f = File::create(out_path)?; let mut writer = polars::prelude::JsonWriter::new(f); - writer - .finish(data) - .map_err(|e| Box::new(e) as Box) + Ok(writer.finish(data)?) } -fn dump_dataframe_to_csv(data: &mut DataFrame, out_path: &Path) -> Result<(), Box> { +fn dump_dataframe_to_csv(data: &mut DataFrame, out_path: &Path) -> anyhow::Result<()> { let out_path = out_path.with_extension("csv"); let f = File::create(out_path)?; let mut writer = polars::prelude::CsvWriter::new(f); - writer - .finish(data) - .map_err(|e| Box::new(e) as Box) + Ok(writer.finish(data)?) } -fn dump_dataframe_to_parquet(data: &mut DataFrame, out_path: &Path) -> Result<(), Box> { +fn dump_dataframe_to_parquet(data: &mut DataFrame, out_path: &Path) -> anyhow::Result<()> { let out_path = out_path.with_extension("parquet"); let f = File::create(out_path)?; let writer = polars::prelude::ParquetWriter::new(f); - writer - .finish(data) - .map(|_| ()) - .map_err(|e| Box::new(e) as Box) + Ok(writer.finish(data).map(|_| ())?) } fn dump_dataframe_to( output_format: OutputFormat, data: &mut DataFrame, out_path: &Path, -) -> Result<(), Box> { +) -> anyhow::Result<()> { match output_format { OutputFormat::Json => dump_dataframe_to_json(data, out_path), OutputFormat::Csv => dump_dataframe_to_csv(data, out_path), @@ -170,7 +162,7 @@ fn dump_dataframe_to( } } -fn main() -> Result<(), Box> { +fn main() -> anyhow::Result<()> { let app: SimulationApp = SimulationApp::parse(); app.run()?; Ok(()) diff --git a/simulations/src/node/carnot/mod.rs b/simulations/src/node/carnot/mod.rs index df1d8b4f..c44e4f95 100644 --- a/simulations/src/node/carnot/mod.rs +++ b/simulations/src/node/carnot/mod.rs @@ -1,10 +1,10 @@ // std // crates -use serde::Deserialize; +use serde::{Deserialize, Serialize}; // internal use super::{Node, NodeId}; -#[derive(Default)] +#[derive(Default, Serialize)] pub struct CarnotState {} #[derive(Clone, Deserialize)] diff --git a/simulations/src/node/dummy.rs b/simulations/src/node/dummy.rs index 0c74ede9..93a6b23e 100644 --- a/simulations/src/node/dummy.rs +++ b/simulations/src/node/dummy.rs @@ -1,7 +1,7 @@ // std // crates use crossbeam::channel::{Receiver, Sender}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; // internal use crate::{ network::{NetworkInterface, NetworkMessage}, @@ -10,7 +10,7 @@ use crate::{ use super::{NetworkState, SharedState}; -#[derive(Debug, Default)] +#[derive(Debug, Default, Serialize)] pub struct DummyState { pub current_view: usize, } diff --git a/simulations/src/output_processors/mod.rs b/simulations/src/output_processors/mod.rs index 78df4112..b823c108 100644 --- a/simulations/src/output_processors/mod.rs +++ b/simulations/src/output_processors/mod.rs @@ -3,7 +3,35 @@ use serde::Serialize; pub type SerializedNodeState = serde_json::Value; #[derive(Serialize)] -pub struct OutData {} +pub struct OutData(SerializedNodeState); + +impl OutData { + #[inline] + pub const fn new(state: SerializedNodeState) -> Self { + Self(state) + } +} + +impl TryFrom<&crate::warding::SimulationState> for OutData +where + N: crate::node::Node, + N::State: Serialize, +{ + type Error = serde_json::Error; + + fn try_from(state: &crate::warding::SimulationState) -> Result { + serde_json::to_value( + state + .nodes + .read() + .expect("simulations: SimulationState panic when requiring a read lock") + .iter() + .map(N::state) + .collect::>(), + ) + .map(OutData::new) + } +} pub trait NodeStateRecord { fn get_serialized_state_record(&self) -> SerializedNodeState { diff --git a/simulations/src/runner/async_runner.rs b/simulations/src/runner/async_runner.rs index e48cd074..184dc363 100644 --- a/simulations/src/runner/async_runner.rs +++ b/simulations/src/runner/async_runner.rs @@ -5,6 +5,7 @@ use crate::runner::SimulationRunner; use crate::warding::SimulationState; use rand::prelude::SliceRandom; use rayon::prelude::*; +use serde::Serialize; use std::collections::HashSet; use std::sync::Arc; @@ -12,10 +13,12 @@ pub fn simulate( runner: &mut SimulationRunner, chunk_size: usize, mut out_data: Option<&mut Vec>, -) where +) -> anyhow::Result<()> +where M: Clone, N::Settings: Clone, N: Send + Sync, + N::State: Serialize, O::Settings: Clone, { let simulation_state = SimulationState:: { @@ -30,7 +33,7 @@ pub fn simulate( .map(N::id) .collect(); - runner.dump_state_to_out_data(&simulation_state, &mut out_data); + runner.dump_state_to_out_data(&simulation_state, &mut out_data)?; loop { node_ids.shuffle(&mut runner.rng); @@ -44,11 +47,12 @@ pub fn simulate( .filter(|n| ids.contains(&n.id())) .for_each(N::step); - runner.dump_state_to_out_data(&simulation_state, &mut out_data); + runner.dump_state_to_out_data(&simulation_state, &mut out_data)?; } // check if any condition makes the simulation stop if runner.check_wards(&simulation_state) { break; } } + Ok(()) } diff --git a/simulations/src/runner/glauber_runner.rs b/simulations/src/runner/glauber_runner.rs index 863d0e01..2433a88d 100644 --- a/simulations/src/runner/glauber_runner.rs +++ b/simulations/src/runner/glauber_runner.rs @@ -4,6 +4,7 @@ use crate::overlay::Overlay; use crate::runner::SimulationRunner; use crate::warding::SimulationState; use rand::prelude::IteratorRandom; +use serde::Serialize; use std::collections::BTreeSet; use std::sync::Arc; @@ -13,10 +14,12 @@ pub fn simulate( update_rate: usize, maximum_iterations: usize, mut out_data: Option<&mut Vec>, -) where +) -> anyhow::Result<()> +where M: Clone, N: Send + Sync, N::Settings: Clone, + N::State: Serialize, O::Settings: Clone, { let simulation_state = SimulationState { @@ -50,9 +53,13 @@ pub fn simulate( // check if any condition makes the simulation stop if runner.check_wards(&simulation_state) { + // we break the outer main loop, so we need to dump it before the breaking + runner.dump_state_to_out_data(&simulation_state, &mut out_data)?; break 'main; } } - runner.dump_state_to_out_data(&simulation_state, &mut out_data); + // update_rate iterations reached, so dump state + runner.dump_state_to_out_data(&simulation_state, &mut out_data)?; } + Ok(()) } diff --git a/simulations/src/runner/layered_runner.rs b/simulations/src/runner/layered_runner.rs index 99d98e1a..5c2f8d84 100644 --- a/simulations/src/runner/layered_runner.rs +++ b/simulations/src/runner/layered_runner.rs @@ -35,6 +35,7 @@ use std::sync::Arc; use fixed_slice_deque::FixedSliceDeque; use rand::prelude::{IteratorRandom, SliceRandom}; use rand::rngs::SmallRng; +use serde::Serialize; // internal use crate::node::{Node, NodeId}; use crate::output_processors::OutData; @@ -47,10 +48,12 @@ pub fn simulate( gap: usize, distribution: Option>, mut out_data: Option<&mut Vec>, -) where +) -> anyhow::Result<()> +where M: Clone, N: Send + Sync, N::Settings: Clone, + N::State: Serialize, O::Settings: Clone, { let distribution = @@ -94,7 +97,7 @@ pub fn simulate( // compute the most advanced nodes again if deque.first().unwrap().is_empty() { let _ = deque.push_back(BTreeSet::default()); - runner.dump_state_to_out_data(&simulation_state, &mut out_data); + runner.dump_state_to_out_data(&simulation_state, &mut out_data)?; } // if no more nodes to compute @@ -103,7 +106,8 @@ pub fn simulate( } } // write latest state - runner.dump_state_to_out_data(&simulation_state, &mut out_data); + runner.dump_state_to_out_data(&simulation_state, &mut out_data)?; + Ok(()) } fn choose_random_layer_and_node_id( diff --git a/simulations/src/runner/mod.rs b/simulations/src/runner/mod.rs index a386d0a2..b427c59b 100644 --- a/simulations/src/runner/mod.rs +++ b/simulations/src/runner/mod.rs @@ -13,6 +13,7 @@ use crate::network::Network; use rand::rngs::SmallRng; use rand::{RngCore, SeedableRng}; use rayon::prelude::*; +use serde::Serialize; // internal use crate::node::Node; use crate::output_processors::OutData; @@ -39,6 +40,7 @@ where M: Clone, N: Send + Sync, N::Settings: Clone, + N::State: Serialize, O::Settings: Clone, { pub fn new( @@ -64,35 +66,30 @@ where } } - pub fn simulate(&mut self, out_data: Option<&mut Vec>) { + pub fn simulate(&mut self, out_data: Option<&mut Vec>) -> anyhow::Result<()> { match self.settings.runner_settings.clone() { - RunnerSettings::Sync => { - sync_runner::simulate(self, out_data); - } - RunnerSettings::Async { chunks } => { - async_runner::simulate(self, chunks, out_data); - } + RunnerSettings::Sync => sync_runner::simulate(self, out_data), + RunnerSettings::Async { chunks } => async_runner::simulate(self, chunks, out_data), RunnerSettings::Glauber { maximum_iterations, update_rate, - } => { - glauber_runner::simulate(self, update_rate, maximum_iterations, out_data); - } + } => glauber_runner::simulate(self, update_rate, maximum_iterations, out_data), RunnerSettings::Layered { rounds_gap, distribution, - } => { - layered_runner::simulate(self, rounds_gap, distribution, out_data); - } + } => layered_runner::simulate(self, rounds_gap, distribution, out_data), } } fn dump_state_to_out_data( &self, - _simulation_state: &SimulationState, - _out_data: &mut Option<&mut Vec>, - ) { - todo!("What data do we want to expose?") + simulation_state: &SimulationState, + out_data: &mut Option<&mut Vec>, + ) -> anyhow::Result<()> { + if let Some(out_data) = out_data { + out_data.push(OutData::try_from(simulation_state)?); + } + Ok(()) } fn check_wards(&mut self, state: &SimulationState) -> bool { diff --git a/simulations/src/runner/sync_runner.rs b/simulations/src/runner/sync_runner.rs index 9103f452..f683fcb5 100644 --- a/simulations/src/runner/sync_runner.rs +++ b/simulations/src/runner/sync_runner.rs @@ -1,3 +1,5 @@ +use serde::Serialize; + use super::SimulationRunner; use crate::node::Node; use crate::output_processors::OutData; @@ -9,26 +11,29 @@ use std::sync::Arc; pub fn simulate( runner: &mut SimulationRunner, mut out_data: Option<&mut Vec>, -) where +) -> anyhow::Result<()> +where M: Clone, N: Send + Sync, N::Settings: Clone, + N::State: Serialize, O::Settings: Clone, { let state = SimulationState { nodes: Arc::clone(&runner.nodes), }; - runner.dump_state_to_out_data(&state, &mut out_data); + runner.dump_state_to_out_data(&state, &mut out_data)?; for _ in 1.. { runner.step(); - runner.dump_state_to_out_data(&state, &mut out_data); + runner.dump_state_to_out_data(&state, &mut out_data)?; // check if any condition makes the simulation stop if runner.check_wards(&state) { break; } } + Ok(()) } #[cfg(test)] diff --git a/simulations/src/warding/mod.rs b/simulations/src/warding/mod.rs index 675c88ed..a1a9a4d2 100644 --- a/simulations/src/warding/mod.rs +++ b/simulations/src/warding/mod.rs @@ -13,6 +13,15 @@ pub struct SimulationState { pub nodes: Arc>>, } +impl SimulationState { + #[inline] + pub fn new(nodes: Vec) -> Self { + Self { + nodes: Arc::new(RwLock::new(nodes)), + } + } +} + /// A ward is a computation over the `NetworkState`, it must return true if the state satisfies /// the warding conditions. It is used to stop the consensus simulation if such condition is reached. pub trait SimulationWard {