Pipe output data (#111)

This commit is contained in:
Al Liu 2023-04-04 20:26:57 +08:00 committed by GitHub
parent d07da7ba5a
commit a19278480d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 101 additions and 54 deletions

View File

@ -6,6 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
anyhow = "1"
clap = { version = "4", features = ["derive"] } clap = { version = "4", features = ["derive"] }
crc32fast = "1.3" crc32fast = "1.3"
crossbeam = { version = "0.8.2", features = ["crossbeam-channel"] } crossbeam = { version = "0.8.2", features = ["crossbeam-channel"] }

View File

@ -1,6 +1,5 @@
use std::collections::HashMap;
// std // std
use std::error::Error; use std::collections::HashMap;
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
use std::fs::File; use std::fs::File;
use std::io::Cursor; use std::io::Cursor;
@ -90,7 +89,7 @@ pub struct SimulationApp {
} }
impl SimulationApp { impl SimulationApp {
pub fn run(self) -> Result<(), Box<dyn Error>> { pub fn run(self) -> anyhow::Result<()> {
let Self { let Self {
input_settings, input_settings,
output_file, output_file,
@ -105,7 +104,7 @@ impl SimulationApp {
SimulationRunner::new(network, nodes, simulation_settings); SimulationRunner::new(network, nodes, simulation_settings);
// build up series vector // build up series vector
let mut out_data: Vec<OutData> = Vec::new(); let mut out_data: Vec<OutData> = Vec::new();
simulation_runner.simulate(Some(&mut out_data)); simulation_runner.simulate(Some(&mut out_data))?;
let mut dataframe: DataFrame = out_data_to_dataframe(out_data); let mut dataframe: DataFrame = out_data_to_dataframe(out_data);
dump_dataframe_to(output_format, &mut dataframe, &output_file)?; dump_dataframe_to(output_format, &mut dataframe, &output_file)?;
Ok(()) Ok(())
@ -125,44 +124,37 @@ fn out_data_to_dataframe(out_data: Vec<OutData>) -> DataFrame {
} }
/// Generically load a json file /// Generically load a json file
fn load_json_from_file<T: DeserializeOwned>(path: &Path) -> Result<T, Box<dyn Error>> { fn load_json_from_file<T: DeserializeOwned>(path: &Path) -> anyhow::Result<T> {
let f = File::open(path).map_err(Box::new)?; let f = File::open(path).map_err(Box::new)?;
serde_json::from_reader(f).map_err(|e| Box::new(e) as Box<dyn Error>) Ok(serde_json::from_reader(f)?)
} }
fn dump_dataframe_to_json(data: &mut DataFrame, out_path: &Path) -> Result<(), Box<dyn Error>> { fn dump_dataframe_to_json(data: &mut DataFrame, out_path: &Path) -> anyhow::Result<()> {
let out_path = out_path.with_extension("json"); let out_path = out_path.with_extension("json");
let f = File::create(out_path)?; let f = File::create(out_path)?;
let mut writer = polars::prelude::JsonWriter::new(f); let mut writer = polars::prelude::JsonWriter::new(f);
writer Ok(writer.finish(data)?)
.finish(data)
.map_err(|e| Box::new(e) as Box<dyn Error>)
} }
fn dump_dataframe_to_csv(data: &mut DataFrame, out_path: &Path) -> Result<(), Box<dyn Error>> { fn dump_dataframe_to_csv(data: &mut DataFrame, out_path: &Path) -> anyhow::Result<()> {
let out_path = out_path.with_extension("csv"); let out_path = out_path.with_extension("csv");
let f = File::create(out_path)?; let f = File::create(out_path)?;
let mut writer = polars::prelude::CsvWriter::new(f); let mut writer = polars::prelude::CsvWriter::new(f);
writer Ok(writer.finish(data)?)
.finish(data)
.map_err(|e| Box::new(e) as Box<dyn Error>)
} }
fn dump_dataframe_to_parquet(data: &mut DataFrame, out_path: &Path) -> Result<(), Box<dyn Error>> { fn dump_dataframe_to_parquet(data: &mut DataFrame, out_path: &Path) -> anyhow::Result<()> {
let out_path = out_path.with_extension("parquet"); let out_path = out_path.with_extension("parquet");
let f = File::create(out_path)?; let f = File::create(out_path)?;
let writer = polars::prelude::ParquetWriter::new(f); let writer = polars::prelude::ParquetWriter::new(f);
writer Ok(writer.finish(data).map(|_| ())?)
.finish(data)
.map(|_| ())
.map_err(|e| Box::new(e) as Box<dyn Error>)
} }
fn dump_dataframe_to( fn dump_dataframe_to(
output_format: OutputFormat, output_format: OutputFormat,
data: &mut DataFrame, data: &mut DataFrame,
out_path: &Path, out_path: &Path,
) -> Result<(), Box<dyn Error>> { ) -> anyhow::Result<()> {
match output_format { match output_format {
OutputFormat::Json => dump_dataframe_to_json(data, out_path), OutputFormat::Json => dump_dataframe_to_json(data, out_path),
OutputFormat::Csv => dump_dataframe_to_csv(data, out_path), OutputFormat::Csv => dump_dataframe_to_csv(data, out_path),
@ -170,7 +162,7 @@ fn dump_dataframe_to(
} }
} }
fn main() -> Result<(), Box<dyn Error>> { fn main() -> anyhow::Result<()> {
let app: SimulationApp = SimulationApp::parse(); let app: SimulationApp = SimulationApp::parse();
app.run()?; app.run()?;
Ok(()) Ok(())

View File

@ -1,10 +1,10 @@
// std // std
// crates // crates
use serde::Deserialize; use serde::{Deserialize, Serialize};
// internal // internal
use super::{Node, NodeId}; use super::{Node, NodeId};
#[derive(Default)] #[derive(Default, Serialize)]
pub struct CarnotState {} pub struct CarnotState {}
#[derive(Clone, Deserialize)] #[derive(Clone, Deserialize)]

View File

@ -1,7 +1,7 @@
// std // std
// crates // crates
use crossbeam::channel::{Receiver, Sender}; use crossbeam::channel::{Receiver, Sender};
use serde::Deserialize; use serde::{Deserialize, Serialize};
// internal // internal
use crate::{ use crate::{
network::{NetworkInterface, NetworkMessage}, network::{NetworkInterface, NetworkMessage},
@ -10,7 +10,7 @@ use crate::{
use super::{NetworkState, SharedState}; use super::{NetworkState, SharedState};
#[derive(Debug, Default)] #[derive(Debug, Default, Serialize)]
pub struct DummyState { pub struct DummyState {
pub current_view: usize, pub current_view: usize,
} }

View File

@ -3,7 +3,35 @@ use serde::Serialize;
pub type SerializedNodeState = serde_json::Value; pub type SerializedNodeState = serde_json::Value;
#[derive(Serialize)] #[derive(Serialize)]
pub struct OutData {} pub struct OutData(SerializedNodeState);
impl OutData {
#[inline]
pub const fn new(state: SerializedNodeState) -> Self {
Self(state)
}
}
impl<N> TryFrom<&crate::warding::SimulationState<N>> for OutData
where
N: crate::node::Node,
N::State: Serialize,
{
type Error = serde_json::Error;
fn try_from(state: &crate::warding::SimulationState<N>) -> Result<Self, Self::Error> {
serde_json::to_value(
state
.nodes
.read()
.expect("simulations: SimulationState panic when requiring a read lock")
.iter()
.map(N::state)
.collect::<Vec<_>>(),
)
.map(OutData::new)
}
}
pub trait NodeStateRecord { pub trait NodeStateRecord {
fn get_serialized_state_record(&self) -> SerializedNodeState { fn get_serialized_state_record(&self) -> SerializedNodeState {

View File

@ -5,6 +5,7 @@ use crate::runner::SimulationRunner;
use crate::warding::SimulationState; use crate::warding::SimulationState;
use rand::prelude::SliceRandom; use rand::prelude::SliceRandom;
use rayon::prelude::*; use rayon::prelude::*;
use serde::Serialize;
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
@ -12,10 +13,12 @@ pub fn simulate<M, N: Node, O: Overlay>(
runner: &mut SimulationRunner<M, N, O>, runner: &mut SimulationRunner<M, N, O>,
chunk_size: usize, chunk_size: usize,
mut out_data: Option<&mut Vec<OutData>>, mut out_data: Option<&mut Vec<OutData>>,
) where ) -> anyhow::Result<()>
where
M: Clone, M: Clone,
N::Settings: Clone, N::Settings: Clone,
N: Send + Sync, N: Send + Sync,
N::State: Serialize,
O::Settings: Clone, O::Settings: Clone,
{ {
let simulation_state = SimulationState::<N> { let simulation_state = SimulationState::<N> {
@ -30,7 +33,7 @@ pub fn simulate<M, N: Node, O: Overlay>(
.map(N::id) .map(N::id)
.collect(); .collect();
runner.dump_state_to_out_data(&simulation_state, &mut out_data); runner.dump_state_to_out_data(&simulation_state, &mut out_data)?;
loop { loop {
node_ids.shuffle(&mut runner.rng); node_ids.shuffle(&mut runner.rng);
@ -44,11 +47,12 @@ pub fn simulate<M, N: Node, O: Overlay>(
.filter(|n| ids.contains(&n.id())) .filter(|n| ids.contains(&n.id()))
.for_each(N::step); .for_each(N::step);
runner.dump_state_to_out_data(&simulation_state, &mut out_data); runner.dump_state_to_out_data(&simulation_state, &mut out_data)?;
} }
// check if any condition makes the simulation stop // check if any condition makes the simulation stop
if runner.check_wards(&simulation_state) { if runner.check_wards(&simulation_state) {
break; break;
} }
} }
Ok(())
} }

View File

@ -4,6 +4,7 @@ use crate::overlay::Overlay;
use crate::runner::SimulationRunner; use crate::runner::SimulationRunner;
use crate::warding::SimulationState; use crate::warding::SimulationState;
use rand::prelude::IteratorRandom; use rand::prelude::IteratorRandom;
use serde::Serialize;
use std::collections::BTreeSet; use std::collections::BTreeSet;
use std::sync::Arc; use std::sync::Arc;
@ -13,10 +14,12 @@ pub fn simulate<M, N: Node, O: Overlay>(
update_rate: usize, update_rate: usize,
maximum_iterations: usize, maximum_iterations: usize,
mut out_data: Option<&mut Vec<OutData>>, mut out_data: Option<&mut Vec<OutData>>,
) where ) -> anyhow::Result<()>
where
M: Clone, M: Clone,
N: Send + Sync, N: Send + Sync,
N::Settings: Clone, N::Settings: Clone,
N::State: Serialize,
O::Settings: Clone, O::Settings: Clone,
{ {
let simulation_state = SimulationState { let simulation_state = SimulationState {
@ -50,9 +53,13 @@ pub fn simulate<M, N: Node, O: Overlay>(
// check if any condition makes the simulation stop // check if any condition makes the simulation stop
if runner.check_wards(&simulation_state) { if runner.check_wards(&simulation_state) {
// we break the outer main loop, so we need to dump it before the breaking
runner.dump_state_to_out_data(&simulation_state, &mut out_data)?;
break 'main; break 'main;
} }
} }
runner.dump_state_to_out_data(&simulation_state, &mut out_data); // update_rate iterations reached, so dump state
runner.dump_state_to_out_data(&simulation_state, &mut out_data)?;
} }
Ok(())
} }

View File

@ -35,6 +35,7 @@ use std::sync::Arc;
use fixed_slice_deque::FixedSliceDeque; use fixed_slice_deque::FixedSliceDeque;
use rand::prelude::{IteratorRandom, SliceRandom}; use rand::prelude::{IteratorRandom, SliceRandom};
use rand::rngs::SmallRng; use rand::rngs::SmallRng;
use serde::Serialize;
// internal // internal
use crate::node::{Node, NodeId}; use crate::node::{Node, NodeId};
use crate::output_processors::OutData; use crate::output_processors::OutData;
@ -47,10 +48,12 @@ pub fn simulate<M, N: Node, O: Overlay>(
gap: usize, gap: usize,
distribution: Option<Vec<f32>>, distribution: Option<Vec<f32>>,
mut out_data: Option<&mut Vec<OutData>>, mut out_data: Option<&mut Vec<OutData>>,
) where ) -> anyhow::Result<()>
where
M: Clone, M: Clone,
N: Send + Sync, N: Send + Sync,
N::Settings: Clone, N::Settings: Clone,
N::State: Serialize,
O::Settings: Clone, O::Settings: Clone,
{ {
let distribution = let distribution =
@ -94,7 +97,7 @@ pub fn simulate<M, N: Node, O: Overlay>(
// compute the most advanced nodes again // compute the most advanced nodes again
if deque.first().unwrap().is_empty() { if deque.first().unwrap().is_empty() {
let _ = deque.push_back(BTreeSet::default()); let _ = deque.push_back(BTreeSet::default());
runner.dump_state_to_out_data(&simulation_state, &mut out_data); runner.dump_state_to_out_data(&simulation_state, &mut out_data)?;
} }
// if no more nodes to compute // if no more nodes to compute
@ -103,7 +106,8 @@ pub fn simulate<M, N: Node, O: Overlay>(
} }
} }
// write latest state // write latest state
runner.dump_state_to_out_data(&simulation_state, &mut out_data); runner.dump_state_to_out_data(&simulation_state, &mut out_data)?;
Ok(())
} }
fn choose_random_layer_and_node_id( fn choose_random_layer_and_node_id(

View File

@ -13,6 +13,7 @@ use crate::network::Network;
use rand::rngs::SmallRng; use rand::rngs::SmallRng;
use rand::{RngCore, SeedableRng}; use rand::{RngCore, SeedableRng};
use rayon::prelude::*; use rayon::prelude::*;
use serde::Serialize;
// internal // internal
use crate::node::Node; use crate::node::Node;
use crate::output_processors::OutData; use crate::output_processors::OutData;
@ -39,6 +40,7 @@ where
M: Clone, M: Clone,
N: Send + Sync, N: Send + Sync,
N::Settings: Clone, N::Settings: Clone,
N::State: Serialize,
O::Settings: Clone, O::Settings: Clone,
{ {
pub fn new( pub fn new(
@ -64,35 +66,30 @@ where
} }
} }
pub fn simulate(&mut self, out_data: Option<&mut Vec<OutData>>) { pub fn simulate(&mut self, out_data: Option<&mut Vec<OutData>>) -> anyhow::Result<()> {
match self.settings.runner_settings.clone() { match self.settings.runner_settings.clone() {
RunnerSettings::Sync => { RunnerSettings::Sync => sync_runner::simulate(self, out_data),
sync_runner::simulate(self, out_data); RunnerSettings::Async { chunks } => async_runner::simulate(self, chunks, out_data),
}
RunnerSettings::Async { chunks } => {
async_runner::simulate(self, chunks, out_data);
}
RunnerSettings::Glauber { RunnerSettings::Glauber {
maximum_iterations, maximum_iterations,
update_rate, update_rate,
} => { } => glauber_runner::simulate(self, update_rate, maximum_iterations, out_data),
glauber_runner::simulate(self, update_rate, maximum_iterations, out_data);
}
RunnerSettings::Layered { RunnerSettings::Layered {
rounds_gap, rounds_gap,
distribution, distribution,
} => { } => layered_runner::simulate(self, rounds_gap, distribution, out_data),
layered_runner::simulate(self, rounds_gap, distribution, out_data);
}
} }
} }
fn dump_state_to_out_data( fn dump_state_to_out_data(
&self, &self,
_simulation_state: &SimulationState<N>, simulation_state: &SimulationState<N>,
_out_data: &mut Option<&mut Vec<OutData>>, out_data: &mut Option<&mut Vec<OutData>>,
) { ) -> anyhow::Result<()> {
todo!("What data do we want to expose?") if let Some(out_data) = out_data {
out_data.push(OutData::try_from(simulation_state)?);
}
Ok(())
} }
fn check_wards(&mut self, state: &SimulationState<N>) -> bool { fn check_wards(&mut self, state: &SimulationState<N>) -> bool {

View File

@ -1,3 +1,5 @@
use serde::Serialize;
use super::SimulationRunner; use super::SimulationRunner;
use crate::node::Node; use crate::node::Node;
use crate::output_processors::OutData; use crate::output_processors::OutData;
@ -9,26 +11,29 @@ use std::sync::Arc;
pub fn simulate<M, N: Node, O: Overlay>( pub fn simulate<M, N: Node, O: Overlay>(
runner: &mut SimulationRunner<M, N, O>, runner: &mut SimulationRunner<M, N, O>,
mut out_data: Option<&mut Vec<OutData>>, mut out_data: Option<&mut Vec<OutData>>,
) where ) -> anyhow::Result<()>
where
M: Clone, M: Clone,
N: Send + Sync, N: Send + Sync,
N::Settings: Clone, N::Settings: Clone,
N::State: Serialize,
O::Settings: Clone, O::Settings: Clone,
{ {
let state = SimulationState { let state = SimulationState {
nodes: Arc::clone(&runner.nodes), nodes: Arc::clone(&runner.nodes),
}; };
runner.dump_state_to_out_data(&state, &mut out_data); runner.dump_state_to_out_data(&state, &mut out_data)?;
for _ in 1.. { for _ in 1.. {
runner.step(); runner.step();
runner.dump_state_to_out_data(&state, &mut out_data); runner.dump_state_to_out_data(&state, &mut out_data)?;
// check if any condition makes the simulation stop // check if any condition makes the simulation stop
if runner.check_wards(&state) { if runner.check_wards(&state) {
break; break;
} }
} }
Ok(())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -13,6 +13,15 @@ pub struct SimulationState<N> {
pub nodes: Arc<RwLock<Vec<N>>>, pub nodes: Arc<RwLock<Vec<N>>>,
} }
impl<N> SimulationState<N> {
#[inline]
pub fn new(nodes: Vec<N>) -> Self {
Self {
nodes: Arc::new(RwLock::new(nodes)),
}
}
}
/// A ward is a computation over the `NetworkState`, it must return true if the state satisfies /// A ward is a computation over the `NetworkState`, it must return true if the state satisfies
/// the warding conditions. It is used to stop the consensus simulation if such condition is reached. /// the warding conditions. It is used to stop the consensus simulation if such condition is reached.
pub trait SimulationWard<N> { pub trait SimulationWard<N> {