From 92ef9e5a778f2a67729378287e31dbf06459c944 Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Date: Fri, 24 Mar 2023 21:21:10 +0100 Subject: [PATCH] Simulation app runners (#105) * Integrate new runners, fit types...make it compile * Added missing runners * Fix rebased changes * Make tests pass * Clippy happy * More clippy happy --- simulations/Cargo.toml | 18 +- simulations/src/bin/app.rs | 239 +++++++------ simulations/src/config.rs | 19 - simulations/src/lib.rs | 4 +- simulations/src/node/carnot.rs | 297 ---------------- simulations/src/node/carnot/mod.rs | 48 +++ simulations/src/node/mod.rs | 10 +- simulations/src/output_processors/mod.rs | 12 + simulations/src/overlay/flat.rs | 4 +- simulations/src/overlay/mod.rs | 5 +- simulations/src/overlay/tree.rs | 45 +-- simulations/src/runner.rs | 428 ----------------------- simulations/src/runner/async_runner.rs | 53 +++ simulations/src/runner/glauber_runner.rs | 57 +++ simulations/src/runner/layered_runner.rs | 155 ++++++++ simulations/src/runner/mod.rs | 125 +++++++ simulations/src/runner/sync_runner.rs | 31 ++ simulations/src/settings.rs | 36 ++ simulations/src/warding/mod.rs | 44 +++ simulations/src/warding/ttf.rs | 63 ++++ 20 files changed, 795 insertions(+), 898 deletions(-) delete mode 100644 simulations/src/config.rs delete mode 100644 simulations/src/node/carnot.rs create mode 100644 simulations/src/node/carnot/mod.rs create mode 100644 simulations/src/output_processors/mod.rs delete mode 100644 simulations/src/runner.rs create mode 100644 simulations/src/runner/async_runner.rs create mode 100644 simulations/src/runner/glauber_runner.rs create mode 100644 simulations/src/runner/layered_runner.rs create mode 100644 simulations/src/runner/mod.rs create mode 100644 simulations/src/runner/sync_runner.rs create mode 100644 simulations/src/settings.rs create mode 100644 simulations/src/warding/mod.rs create mode 100644 simulations/src/warding/ttf.rs diff --git a/simulations/Cargo.toml b/simulations/Cargo.toml index 39005617..f04c87df 100644 --- a/simulations/Cargo.toml +++ b/simulations/Cargo.toml @@ -3,12 +3,18 @@ name = "simulations" version = "0.1.0" edition = "2021" -[target.'cfg(target_arch = "wasm32")'.dependencies] -getrandom = { version = "0.2", features = ["js"] } +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -rand = { version = "0.8", features = ["small_rng"] } -serde = { version = "1.0", features = ["derive", "rc"] } -serde_with = "2" -serde_json = "1.0" clap = { version = "4", features = ["derive"] } +fixed-slice-deque = "0.1.0-beta2" +nomos-core = { path = "../nomos-core" } +polars = { version = "0.27", features = ["serde", "object", "json", "csv-file", "parquet", "dtype-struct"] } +rand = { version = "0.8", features = ["small_rng"] } +rayon = "1.7" +serde = { version = "1.0", features = ["derive", "rc"] } +serde_with = "2.3" +serde_json = "1.0" + +[target.'cfg(target_arch = "wasm32")'.dependencies] +getrandom = { version = "0.2", features = ["js"] } diff --git a/simulations/src/bin/app.rs b/simulations/src/bin/app.rs index bbcd19f8..b7ab6ca7 100644 --- a/simulations/src/bin/app.rs +++ b/simulations/src/bin/app.rs @@ -1,58 +1,23 @@ -use std::{path::PathBuf, str::FromStr}; - +// std +use std::error::Error; +use std::fmt::{Display, Formatter}; +use std::fs::File; +use std::io::Cursor; +use std::path::{Path, PathBuf}; +use std::str::FromStr; +// crates use clap::Parser; -use rand::thread_rng; +use polars::io::SerWriter; +use polars::prelude::{DataFrame, JsonReader, SerReader}; +use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; +use simulations::overlay::tree::TreeOverlay; +// internal use simulations::{ - config::Config, - node::{ - carnot::{CarnotNode, CarnotStep, CarnotStepSolverType}, - Node, StepTime, - }, - overlay::{flat::FlatOverlay, Overlay}, - runner::ConsensusRunner, + node::carnot::CarnotNode, output_processors::OutData, runner::SimulationRunner, + settings::SimulationSettings, }; -/// Simple program to greet a person -#[derive(Parser, Debug)] -#[command(author, version, about, long_about = None)] -struct Args { - /// Path for a yaml-encoded network config file - config: std::path::PathBuf, - #[arg(long, default_value_t = OverlayType::Flat)] - overlay_type: OverlayType, - #[arg(long, default_value_t = NodeType::Carnot)] - node_type: NodeType, - #[arg(short, long, default_value_t = OutputType::StdOut)] - output: OutputType, -} - -#[derive(clap::ValueEnum, Debug, Copy, Clone, Serialize, Deserialize)] -enum OverlayType { - Flat, -} - -impl core::fmt::Display for OverlayType { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - match self { - Self::Flat => write!(f, "flat"), - } - } -} - -#[derive(clap::ValueEnum, Debug, Copy, Clone, Serialize, Deserialize)] -enum NodeType { - Carnot, -} - -impl core::fmt::Display for NodeType { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - match self { - Self::Carnot => write!(f, "carnot"), - } - } -} - #[derive(Debug, Clone, Serialize, Deserialize)] enum OutputType { File(PathBuf), @@ -70,65 +35,137 @@ impl core::fmt::Display for OutputType { } } -impl FromStr for OutputType { - type Err = String; +/// Output format selector enum +#[derive(Clone, Debug, Default)] +enum OutputFormat { + Json, + Csv, + #[default] + Parquet, +} + +impl Display for OutputFormat { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let tag = match self { + OutputFormat::Json => "json", + OutputFormat::Csv => "csv", + OutputFormat::Parquet => "parquet", + }; + write!(f, "{tag}") + } +} + +impl FromStr for OutputFormat { + type Err = std::io::Error; fn from_str(s: &str) -> Result { - match s.to_lowercase().as_str() { - "stdout" => Ok(Self::StdOut), - "stderr" => Ok(Self::StdErr), - path => Ok(Self::File(PathBuf::from(path))), + match s.to_ascii_lowercase().as_str() { + "json" => Ok(Self::Json), + "csv" => Ok(Self::Csv), + "parquet" => Ok(Self::Parquet), + tag => Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("Invalid {tag} tag, only [json, csv, polars] are supported",), + )), } } } -pub fn main() -> Result<(), Box> { - let Args { - config, - overlay_type, - node_type, - output, - } = Args::parse(); +/// Main simulation wrapper +/// Pipes together the cli arguments with the execution +#[derive(Parser)] +pub struct SimulationApp { + /// Json file path, on `SimulationSettings` format + #[clap(long, short)] + input_settings: PathBuf, + /// Output file path + #[clap(long, short)] + output_file: PathBuf, + /// Output format selector + #[clap(long, short = 'f', default_value_t)] + output_format: OutputFormat, +} - let report = match (overlay_type, node_type) { - (OverlayType::Flat, NodeType::Carnot) => { - let cfg = serde_json::from_reader::< - _, - Config< - ::Settings, - >::Settings, - CarnotStep, - CarnotStepSolverType, - >, - >(std::fs::File::open(config)?)?; - #[allow(clippy::unit_arg)] - let overlay = FlatOverlay::new(cfg.overlay_settings); - let node_ids = (0..cfg.node_count).map(From::from).collect::>(); - let mut rng = thread_rng(); - let layout = overlay.layout(&node_ids, &mut rng); - let leaders = overlay.leaders(&node_ids, 1, &mut rng).collect(); +impl SimulationApp { + pub fn run(self) -> Result<(), Box> { + let Self { + input_settings, + output_file, + output_format, + } = self; + let simulation_settings: SimulationSettings<_, _> = load_json_from_file(&input_settings)?; - let mut runner: simulations::runner::ConsensusRunner = - ConsensusRunner::new(&mut rng, layout, leaders, cfg.node_settings); - runner.run(Box::new(|times: &[StepTime]| *times.iter().max().unwrap()) - as Box StepTime>) - } - }; - - let json = serde_json::to_string_pretty(&report)?; - match output { - OutputType::File(f) => { - use std::{fs::OpenOptions, io::Write}; - - let mut file = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(f)?; - file.write_all(json.as_bytes())?; - } - OutputType::StdOut => println!("{json}"), - OutputType::StdErr => eprintln!("{json}"), + let mut simulation_runner: SimulationRunner = + SimulationRunner::new(simulation_settings); + // build up series vector + let mut out_data: Vec = Vec::new(); + simulation_runner.simulate(Some(&mut out_data)); + let mut dataframe: DataFrame = out_data_to_dataframe(out_data); + dump_dataframe_to(output_format, &mut dataframe, &output_file)?; + Ok(()) } +} + +fn out_data_to_dataframe(out_data: Vec) -> DataFrame { + let mut cursor = Cursor::new(Vec::new()); + serde_json::to_writer(&mut cursor, &out_data).expect("Dump data to json "); + let dataframe = JsonReader::new(cursor) + .finish() + .expect("Load dataframe from intermediary json"); + + dataframe + .unnest(["state"]) + .expect("Node state should be unnest") +} + +/// Generically load a json file +fn load_json_from_file(path: &Path) -> Result> { + let f = File::open(path).map_err(Box::new)?; + serde_json::from_reader(f).map_err(|e| Box::new(e) as Box) +} + +fn dump_dataframe_to_json(data: &mut DataFrame, out_path: &Path) -> Result<(), Box> { + let out_path = out_path.with_extension("json"); + let f = File::create(out_path)?; + let mut writer = polars::prelude::JsonWriter::new(f); + writer + .finish(data) + .map_err(|e| Box::new(e) as Box) +} + +fn dump_dataframe_to_csv(data: &mut DataFrame, out_path: &Path) -> Result<(), Box> { + let out_path = out_path.with_extension("csv"); + let f = File::create(out_path)?; + let mut writer = polars::prelude::CsvWriter::new(f); + writer + .finish(data) + .map_err(|e| Box::new(e) as Box) +} + +fn dump_dataframe_to_parquet(data: &mut DataFrame, out_path: &Path) -> Result<(), Box> { + let out_path = out_path.with_extension("parquet"); + let f = File::create(out_path)?; + let writer = polars::prelude::ParquetWriter::new(f); + writer + .finish(data) + .map(|_| ()) + .map_err(|e| Box::new(e) as Box) +} + +fn dump_dataframe_to( + output_format: OutputFormat, + data: &mut DataFrame, + out_path: &Path, +) -> Result<(), Box> { + match output_format { + OutputFormat::Json => dump_dataframe_to_json(data, out_path), + OutputFormat::Csv => dump_dataframe_to_csv(data, out_path), + OutputFormat::Parquet => dump_dataframe_to_parquet(data, out_path), + } +} + +fn main() -> Result<(), Box> { + let app: SimulationApp = SimulationApp::parse(); + app.run()?; Ok(()) } diff --git a/simulations/src/config.rs b/simulations/src/config.rs deleted file mode 100644 index 4d821243..00000000 --- a/simulations/src/config.rs +++ /dev/null @@ -1,19 +0,0 @@ -use crate::network::regions::Region; -use crate::node::StepTime; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; - -#[derive(Serialize, Deserialize)] -pub struct Config -where - S: core::str::FromStr, - C: core::str::FromStr, -{ - pub network_behaviors: HashMap<(Region, Region), StepTime>, - pub regions: Vec, - pub overlay_settings: O, - pub node_settings: N, - pub node_count: usize, - pub committee_size: usize, - pub step_costs: Vec<(S, C)>, -} diff --git a/simulations/src/lib.rs b/simulations/src/lib.rs index f538d575..78e22ee9 100644 --- a/simulations/src/lib.rs +++ b/simulations/src/lib.rs @@ -1,5 +1,7 @@ -pub mod config; pub mod network; pub mod node; +pub mod output_processors; pub mod overlay; pub mod runner; +pub mod settings; +pub mod warding; diff --git a/simulations/src/node/carnot.rs b/simulations/src/node/carnot.rs deleted file mode 100644 index 2c4fa393..00000000 --- a/simulations/src/node/carnot.rs +++ /dev/null @@ -1,297 +0,0 @@ -// std -use std::collections::HashMap; -use std::hash::Hash; -use std::rc::Rc; -use std::time::Duration; -// crates -use rand::prelude::SmallRng; -use rand::{Rng, SeedableRng}; -// internal -use crate::network::Network; -use crate::node::{Node, NodeId, StepTime}; -use crate::overlay::{Committee, Layout}; - -pub type RootCommitteeReceiverSolver = fn(&mut SmallRng, NodeId, &[NodeId], &Network) -> StepTime; - -pub type ParentCommitteeReceiverSolver = - fn(&mut SmallRng, NodeId, &Committee, &Network) -> StepTime; - -pub type ChildCommitteeReceiverSolver = - fn(&mut SmallRng, NodeId, &[&Committee], &Network) -> StepTime; - -fn leader_receive_proposal( - rng: &mut SmallRng, - node: NodeId, - leader_nodes: &[NodeId], - network: &Network, -) -> StepTime { - assert!(!leader_nodes.is_empty()); - leader_nodes - .iter() - .filter_map(|&sender| network.send_message_cost(rng, sender, node)) - .max() - .map(From::from) - .unwrap() -} - -fn receive_proposal( - rng: &mut SmallRng, - node: NodeId, - committee: &Committee, - network: &Network, -) -> StepTime { - assert!(!committee.is_empty()); - committee - .nodes - .iter() - .filter_map(|&sender| network.send_message_cost(rng, sender, node)) - .max() - .unwrap() - .into() -} - -fn receive_commit( - rng: &mut SmallRng, - node: NodeId, - committees: &[&Committee], - network: &Network, -) -> StepTime { - assert!(!committees.is_empty()); - committees - .iter() - .filter_map(|committee| { - committee - .nodes - .iter() - .filter_map(|&sender| network.send_message_cost(rng, sender, node)) - .max() - }) - .max() - .unwrap() - .into() -} - -#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum CarnotStep { - RootReceiveProposal, - ReceiveProposal, - ValidateProposal, - LeaderReceiveVote, - ReceiveVote, - ValidateVote, - Ignore, -} - -impl core::str::FromStr for CarnotStep { - type Err = String; - - fn from_str(s: &str) -> Result { - match s.trim().to_lowercase().as_str() { - "receiveproposal" => Ok(Self::ReceiveProposal), - "validateproposal" => Ok(Self::ValidateProposal), - "receivevote" => Ok(Self::ReceiveVote), - "validatevote" => Ok(Self::ValidateVote), - _ => Err(format!("Unknown step: {s}")), - } - } -} - -#[derive(Clone)] -pub enum CarnotStepSolver { - Plain(StepTime), - ParentCommitteeReceiverSolver(ParentCommitteeReceiverSolver), - ChildCommitteeReceiverSolver(ChildCommitteeReceiverSolver), - LeaderToCommitteeReceiverSolver(ChildCommitteeReceiverSolver), - RootCommitteeReceiverSolver(RootCommitteeReceiverSolver), -} - -#[serde_with::serde_as] -#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum CarnotStepSolverType { - Plain(#[serde_as(as = "serde_with::DurationMilliSeconds")] StepTime), - ParentCommitteeReceiverSolver, - ChildCommitteeReceiverSolver, - LeaderToCommitteeReceiverSolver, - RootCommitteeReceiverSolver, -} - -impl core::str::FromStr for CarnotStepSolverType { - type Err = String; - - fn from_str(s: &str) -> Result { - match s.trim().replace(['_', '-'], "").to_lowercase().as_str() { - "plain" => Ok(Self::Plain(StepTime::from_millis(1))), - "parentcommitteereceiversolver" => Ok(Self::ParentCommitteeReceiverSolver), - "childcommitteereceiversolver" => Ok(Self::ChildCommitteeReceiverSolver), - x => { - let millis = x - .parse::() - .map_err(|_| format!("Unknown step solver type: {s}"))?; - Ok(Self::Plain(StepTime::from_millis(millis))) - } - } - } -} - -impl CarnotStepSolverType { - pub fn to_solver(self) -> CarnotStepSolver { - match self { - Self::Plain(time) => CarnotStepSolver::Plain(time), - Self::ParentCommitteeReceiverSolver => { - CarnotStepSolver::ParentCommitteeReceiverSolver(receive_proposal) - } - Self::ChildCommitteeReceiverSolver => { - CarnotStepSolver::ChildCommitteeReceiverSolver(receive_commit) - } - Self::LeaderToCommitteeReceiverSolver => { - CarnotStepSolver::LeaderToCommitteeReceiverSolver(receive_commit) - } - Self::RootCommitteeReceiverSolver => { - CarnotStepSolver::RootCommitteeReceiverSolver(leader_receive_proposal) - } - } - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct CarnotNodeSettings { - pub steps_costs: HashMap, - pub network: Network, - pub layout: Layout, - pub leaders: Vec, -} - -#[derive(Clone)] -pub struct CarnotNode { - id: NodeId, - rng: SmallRng, - settings: Rc, -} - -#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] -pub enum CarnotRole { - Leader, - Root, - Intermediate, - Leaf, -} - -pub const CARNOT_STEPS_COSTS: &[(CarnotStep, CarnotStepSolverType)] = &[ - ( - CarnotStep::RootReceiveProposal, - CarnotStepSolverType::RootCommitteeReceiverSolver, - ), - ( - CarnotStep::ReceiveProposal, - CarnotStepSolverType::ParentCommitteeReceiverSolver, - ), - ( - CarnotStep::ValidateProposal, - CarnotStepSolverType::Plain(StepTime::from_secs(1)), - ), - ( - CarnotStep::LeaderReceiveVote, - CarnotStepSolverType::LeaderToCommitteeReceiverSolver, - ), - ( - CarnotStep::ReceiveVote, - CarnotStepSolverType::ChildCommitteeReceiverSolver, - ), - ( - CarnotStep::ValidateVote, - CarnotStepSolverType::Plain(StepTime::from_secs(1)), - ), - ( - CarnotStep::Ignore, - CarnotStepSolverType::Plain(StepTime::from_secs(0)), - ), -]; - -pub const CARNOT_LEADER_STEPS: &[CarnotStep] = - &[CarnotStep::LeaderReceiveVote, CarnotStep::ValidateVote]; - -pub const CARNOT_ROOT_STEPS: &[CarnotStep] = &[ - CarnotStep::RootReceiveProposal, - CarnotStep::ValidateProposal, - CarnotStep::ReceiveVote, - CarnotStep::ValidateVote, -]; - -pub const CARNOT_INTERMEDIATE_STEPS: &[CarnotStep] = &[ - CarnotStep::ReceiveProposal, - CarnotStep::ValidateProposal, - CarnotStep::ReceiveVote, - CarnotStep::ValidateVote, -]; - -pub const CARNOT_LEAF_STEPS: &[CarnotStep] = - &[CarnotStep::ReceiveProposal, CarnotStep::ValidateProposal]; - -pub const CARNOT_UNKNOWN_MESSAGE_RECEIVED_STEPS: &[CarnotStep] = &[CarnotStep::Ignore]; - -impl Node for CarnotNode { - type Settings = Rc; - - fn new(rng: &mut R, id: NodeId, settings: Self::Settings) -> Self { - let rng = SmallRng::from_rng(rng).unwrap(); - Self { id, rng, settings } - } - - fn id(&self) -> NodeId { - self.id - } - - fn run_steps(&mut self, steps: &[CarnotStep]) -> StepTime { - use CarnotStepSolver::*; - - let mut overall_steps_time = Duration::ZERO; - for step in steps.iter() { - let step_time = match self.settings.steps_costs.get(step).unwrap().to_solver() { - Plain(t) => t, - ParentCommitteeReceiverSolver(solver) => { - match self - .settings - .layout - .parent_nodes(self.settings.layout.committee(self.id)) - { - Some(parent) => { - solver(&mut self.rng, self.id, &parent, &self.settings.network) - } - None => Duration::ZERO.into(), - } - } - ChildCommitteeReceiverSolver(solver) => solver( - &mut self.rng, - self.id, - &self - .settings - .layout - .children_nodes(self.settings.layout.committee(self.id)), - &self.settings.network, - ), - RootCommitteeReceiverSolver(solver) => solver( - &mut self.rng, - self.id, - &self.settings.leaders, - &self.settings.network, - ), - LeaderToCommitteeReceiverSolver(solver) => { - let committees: Vec<&Committee> = - self.settings.layout.committees.values().collect(); - solver( - &mut self.rng, - self.id, - &committees[..], - &self.settings.network, - ) - } - }; - - overall_steps_time += step_time.0 - } - - overall_steps_time.into() - } -} diff --git a/simulations/src/node/carnot/mod.rs b/simulations/src/node/carnot/mod.rs new file mode 100644 index 00000000..7c594f95 --- /dev/null +++ b/simulations/src/node/carnot/mod.rs @@ -0,0 +1,48 @@ +// std +// crates +use rand::Rng; +use serde::Deserialize; +// internal +use crate::node::{Node, NodeId}; + +#[derive(Default)] +pub struct CarnotState {} + +#[derive(Clone, Deserialize)] +pub struct CarnotSettings {} + +#[allow(dead_code)] // TODO: remove when handling settings +pub struct CarnotNode { + id: NodeId, + state: CarnotState, + settings: CarnotSettings, +} + +impl Node for CarnotNode { + type Settings = CarnotSettings; + type State = CarnotState; + + fn new(_rng: &mut R, id: NodeId, settings: Self::Settings) -> Self { + Self { + id, + state: Default::default(), + settings, + } + } + + fn id(&self) -> NodeId { + self.id + } + + fn current_view(&self) -> usize { + todo!() + } + + fn state(&self) -> &CarnotState { + &self.state + } + + fn step(&mut self) { + todo!() + } +} diff --git a/simulations/src/node/mod.rs b/simulations/src/node/mod.rs index 128a7aeb..d0d656a0 100644 --- a/simulations/src/node/mod.rs +++ b/simulations/src/node/mod.rs @@ -8,8 +8,6 @@ use std::{ // crates use rand::Rng; use serde::{Deserialize, Serialize}; - -use self::carnot::CarnotStep; // internal #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] @@ -116,9 +114,13 @@ impl core::iter::Sum for Duration { } } -pub trait Node: Clone { +pub trait Node { type Settings; + type State; fn new(rng: &mut R, id: NodeId, settings: Self::Settings) -> Self; fn id(&self) -> NodeId; - fn run_steps(&mut self, steps: &[CarnotStep]) -> StepTime; + // TODO: View must be view whenever we integrate consensus engine + fn current_view(&self) -> usize; + fn state(&self) -> &Self::State; + fn step(&mut self); } diff --git a/simulations/src/output_processors/mod.rs b/simulations/src/output_processors/mod.rs new file mode 100644 index 00000000..78df4112 --- /dev/null +++ b/simulations/src/output_processors/mod.rs @@ -0,0 +1,12 @@ +use serde::Serialize; + +pub type SerializedNodeState = serde_json::Value; + +#[derive(Serialize)] +pub struct OutData {} + +pub trait NodeStateRecord { + fn get_serialized_state_record(&self) -> SerializedNodeState { + SerializedNodeState::Null + } +} diff --git a/simulations/src/overlay/flat.rs b/simulations/src/overlay/flat.rs index a9948084..7cd2e27a 100644 --- a/simulations/src/overlay/flat.rs +++ b/simulations/src/overlay/flat.rs @@ -4,13 +4,12 @@ use rand::prelude::IteratorRandom; use rand::Rng; // internal use super::Overlay; -use crate::node::carnot::{CarnotNode, CarnotRole}; use crate::node::NodeId; use crate::overlay::{Committee, Layout}; pub struct FlatOverlay; -impl Overlay for FlatOverlay { +impl Overlay for FlatOverlay { type Settings = (); fn new(_settings: Self::Settings) -> Self { @@ -36,7 +35,6 @@ impl Overlay for FlatOverlay { 0.into(), Committee { nodes: nodes.iter().copied().collect(), - role: CarnotRole::Leader, }, )) .collect(); diff --git a/simulations/src/overlay/mod.rs b/simulations/src/overlay/mod.rs index 7d40418a..7170a435 100644 --- a/simulations/src/overlay/mod.rs +++ b/simulations/src/overlay/mod.rs @@ -6,12 +6,11 @@ use std::collections::{BTreeSet, HashMap}; // crates use rand::Rng; // internal -use crate::node::{carnot::CarnotRole, CommitteeId, Node, NodeId}; +use crate::node::{CommitteeId, NodeId}; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct Committee { pub nodes: BTreeSet, - pub role: CarnotRole, } impl Committee { @@ -90,7 +89,7 @@ impl Layout { } } -pub trait Overlay { +pub trait Overlay { type Settings; fn new(settings: Self::Settings) -> Self; diff --git a/simulations/src/overlay/tree.rs b/simulations/src/overlay/tree.rs index e660121a..da3e3ff5 100644 --- a/simulations/src/overlay/tree.rs +++ b/simulations/src/overlay/tree.rs @@ -2,17 +2,17 @@ use std::collections::HashMap; // crates use rand::seq::IteratorRandom; +use serde::Deserialize; // internal use super::{Committee, Layout, Overlay}; -use crate::node::{ - carnot::{CarnotNode, CarnotRole}, - CommitteeId, NodeId, -}; +use crate::node::{CommitteeId, NodeId}; +#[derive(Clone, Deserialize)] pub enum TreeType { FullBinaryTree, } +#[derive(Clone, Deserialize)] pub struct TreeSettings { pub tree_type: TreeType, pub committee_size: usize, @@ -51,7 +51,8 @@ impl TreeOverlay { .chunks(settings.committee_size) .enumerate() { - let mut has_children = false; + // TODO: Why do we have has_children here? + let mut _has_children = false; let left_child_id = 2 * committee_id + 1; let right_child_id = left_child_id + 1; @@ -61,7 +62,7 @@ impl TreeOverlay { committee_id.into(), vec![left_child_id.into(), right_child_id.into()], ); - has_children = true; + _has_children = true; } // Root node has no parent. @@ -70,15 +71,8 @@ impl TreeOverlay { parents.insert(committee_id.into(), parent_id.into()); } - let role = match (committee_id, has_children) { - (0, _) => CarnotRole::Root, - (_, true) => CarnotRole::Intermediate, - (_, false) => CarnotRole::Leaf, - }; - let committee = Committee { nodes: nodes.iter().copied().copied().collect(), - role, }; committees.insert(committee_id.into(), committee); @@ -93,7 +87,7 @@ impl TreeOverlay { } } -impl Overlay for TreeOverlay { +impl Overlay for TreeOverlay { type Settings = TreeSettings; fn new(settings: Self::Settings) -> Self { @@ -239,28 +233,7 @@ mod tests { committee_size: 1, }); let nodes = overlay.nodes(); - let layout = overlay.layout(&nodes, &mut rng); - - assert_eq!( - layout.committees[&CommitteeId::new(0)].role, - CarnotRole::Root - ); - assert_eq!( - layout.committees[&CommitteeId::new(1)].role, - CarnotRole::Intermediate - ); - assert_eq!( - layout.committees[&CommitteeId::new(2)].role, - CarnotRole::Intermediate - ); - assert_eq!( - layout.committees[&CommitteeId::new(3)].role, - CarnotRole::Leaf - ); - assert_eq!( - layout.committees[&CommitteeId::new(6)].role, - CarnotRole::Leaf - ); + let _layout = overlay.layout(&nodes, &mut rng); } #[test] diff --git a/simulations/src/runner.rs b/simulations/src/runner.rs deleted file mode 100644 index 7938539b..00000000 --- a/simulations/src/runner.rs +++ /dev/null @@ -1,428 +0,0 @@ -use crate::node::carnot::{ - CarnotRole, CARNOT_INTERMEDIATE_STEPS, CARNOT_LEADER_STEPS, CARNOT_LEAF_STEPS, - CARNOT_ROOT_STEPS, CARNOT_UNKNOWN_MESSAGE_RECEIVED_STEPS, -}; -use crate::node::{CommitteeId, Node, NodeId, StepTime}; -use crate::overlay::Layout; -use rand::Rng; -use std::collections::HashMap; -use std::time::Duration; - -pub struct ConsensusRunner { - nodes: HashMap, - leaders: Vec, - layout: Layout, -} - -#[allow(dead_code)] -#[derive(Debug, serde::Serialize)] -pub struct Report { - round_time: Duration, -} - -type Reducer = Box StepTime>; - -impl ConsensusRunner -where - N::Settings: Clone, -{ - pub fn new( - mut rng: R, - layout: Layout, - leaders: Vec, - node_settings: N::Settings, - ) -> Self { - let nodes = layout - .node_ids() - .map(|id| { - let node = N::new(&mut rng, id, node_settings.clone()); - (id, node) - }) - .collect(); - Self { - nodes, - layout, - leaders, - } - } - - pub fn run(&mut self, reducer: Reducer) -> Report { - let leaders = &self.leaders; - let layout = &self.layout; - - let mut leader_times = leaders - .iter() - .map(|leader_node| { - vec![self - .nodes - .get_mut(leader_node) - .unwrap() - .run_steps(CARNOT_LEADER_STEPS)] - }) - .collect(); - - let mut layer_times = Vec::new(); - for layer_nodes in layout - .layers - .values() - .map(|committees| get_layer_nodes(committees, layout)) - { - let times: Vec = layer_nodes - .iter() - .map(|(committee_id, node_id)| { - let steps = match layout.committees[committee_id].role { - CarnotRole::Root => CARNOT_ROOT_STEPS, - CarnotRole::Intermediate => CARNOT_INTERMEDIATE_STEPS, - CarnotRole::Leaf => CARNOT_LEAF_STEPS, - _ => { - // TODO: Should leader act as a leaf in a flat overlay? - CARNOT_UNKNOWN_MESSAGE_RECEIVED_STEPS - } - }; - self.nodes.get_mut(node_id).unwrap().run_steps(steps) - }) - .collect(); - - layer_times.push(times) - } - - layer_times.append(&mut leader_times); - let round_time = layer_times.iter().map(|d| reducer(d)).sum(); - - Report { round_time } - } -} - -fn get_layer_nodes( - layer_committees: &[CommitteeId], - layout: &Layout, -) -> Vec<(CommitteeId, NodeId)> { - layer_committees - .iter() - .flat_map(|committee_id| get_committee_nodes(committee_id, layout)) - .collect() -} - -fn get_committee_nodes(committee: &CommitteeId, layout: &Layout) -> Vec<(CommitteeId, NodeId)> { - layout.committees[committee] - .nodes - .clone() - .into_iter() - .map(|node_id| (*committee, node_id)) - .collect() -} - -#[cfg(test)] -mod test { - use crate::{ - network::{ - behaviour::NetworkBehaviour, - regions::{Region, RegionsData}, - Network, - }, - node::{ - carnot::{CarnotNode, CarnotNodeSettings, CARNOT_STEPS_COSTS}, - NodeId, StepTime, - }, - overlay::{ - flat::FlatOverlay, - tree::{TreeOverlay, TreeSettings, TreeType}, - Overlay, - }, - runner::{ConsensusRunner, Reducer}, - }; - use rand::{rngs::mock::StepRng, Rng}; - use std::{collections::HashMap, rc::Rc, time::Duration}; - - fn setup_runner>( - mut rng: &mut R, - overlay: &O, - ) -> ConsensusRunner { - let node_ids = overlay.nodes(); - let layout = overlay.layout(&node_ids, &mut rng); - let leaders: Vec = overlay.leaders(&node_ids, 1, &mut rng).collect(); - - let regions = std::iter::once((Region::Europe, node_ids.clone())).collect(); - let network_behaviour = std::iter::once(( - (Region::Europe, Region::Europe), - NetworkBehaviour::new(Duration::from_millis(100), 0.0), - )) - .collect(); - let node_settings: CarnotNodeSettings = CarnotNodeSettings { - steps_costs: CARNOT_STEPS_COSTS.iter().cloned().collect(), - network: Network::new(RegionsData::new(regions, network_behaviour)), - layout: overlay.layout(&node_ids, &mut rng), - leaders: leaders.clone(), - }; - - ConsensusRunner::new(&mut rng, layout, leaders, Rc::new(node_settings)) - } - - #[test] - fn run_flat_single_leader_steps() { - let mut rng = StepRng::new(1, 0); - let overlay = FlatOverlay::new(()); - - let mut runner = setup_runner(&mut rng, &overlay); - - assert_eq!( - Duration::from_millis(1100), - runner - .run(Box::new(|times: &[StepTime]| *times.iter().max().unwrap()) as Reducer) - .round_time - ); - } - - #[test] - fn run_tree_committee_1() { - let mut rng = StepRng::new(1, 0); - - let overlay = TreeOverlay::new(TreeSettings { - tree_type: TreeType::FullBinaryTree, - depth: 3, - committee_size: 1, - }); - - let mut runner: ConsensusRunner = setup_runner(&mut rng, &overlay); - - // # Leader (1 node): - // - // - 100ms - LeaderReceiveVote, - // - 1s - ValidateVote, - // - // Expected times [1.1s] - - // # Root (1 node): - // - // - 100ms - RootReceiveProposal, - // - 1s - ValidateProposal, - // - 100ms - ReceiveVote, - // - 1s - ValidateVote, - // - // Expected times [2.2s] - - // # Intermediary (2 nodes): - // - // - 100ms - ReceiveProposal, - // - 1s - ValidateProposal, - // - 100ms - ReceiveVote, - // - 1s - ValidateVote, - // - // Expected times [2.2s, 2.2s] - - // # Leaf (4 nodes): - // - // - 100ms - ReceiveProposal - // - 1s - ValidateProposal - // - // Expected times [1.1s, 1.1s, 1.1s, 1.1s] - - assert_eq!( - Duration::from_millis(6600), - runner - .run(Box::new(|times: &[StepTime]| *times.iter().max().unwrap()) as Reducer) - .round_time - ); - } - - #[test] - fn run_tree_committee_100() { - let mut rng = StepRng::new(1, 0); - - let overlay = TreeOverlay::new(TreeSettings { - tree_type: TreeType::FullBinaryTree, - depth: 3, - committee_size: 100, - }); - - let mut runner: ConsensusRunner = setup_runner(&mut rng, &overlay); - - assert_eq!( - Duration::from_millis(6600), - runner - .run(Box::new(|times: &[StepTime]| *times.iter().max().unwrap()) as Reducer) - .round_time - ); - } - - #[test] - fn run_tree_network_config_1() { - let mut rng = StepRng::new(1, 0); - - let overlay = TreeOverlay::new(TreeSettings { - tree_type: TreeType::FullBinaryTree, - depth: 3, - committee_size: 1, - }); - - let node_ids = overlay.nodes(); - let layout = overlay.layout(&node_ids, &mut rng); - // Normaly a leaders would be selected randomly, here, we're assuming it's NodeID 1. - // let leaders: Vec = overlay.leaders(&node_ids, 1, &mut rng).collect(); - let leaders = vec![1]; - - let first_five = &node_ids[..5]; - let rest = &node_ids[5..]; - - // 0 - // 1 2 - // 3 4 5 6 - // - // # Leader - NodeID 1 - // - // Sends vote to all committees. - // - // LeaderReceiveVote: - // - 100ms - Asia - Asia (1 to 0, 1, 2, 3, 4) - // - 500ms - Asia - Europe (1 to 5, 6) - // - // ValidateVote: 1s - // - // Expected times: [1.5s] - - // # Root - NodeID 0 - // - // Sends vote to child committees. - // - // RootReceiveProposal: - // - 100ms - Asia - Asia (0 to 1) - // - // ReceiveVote: - // - 100ms - Asia - Asia (0 to 1, 2) - // - // No network: - // - 1s - ValidateVote - // - 1s - ValidateProposal - // - // Expected times: [2.2s] - - // # Intermediary - NodeID 1, 2: - // - // ReceiveVote: - // - 100ms - Asia - Asia (1 to 3, 4) - // - 500ms - Asia - Europe (2 to 5, 6) - // - // ReceiveProposal: - // - 100ms - Asia - Asia (1 to 0, 2 to 0) - // - // No network: - // - 1s - ValidateVote - // - 1s - ValidateProposal - // - // Expected times [2.2s, 2.6s] - - // # Leaf - NodeID 3, 4, 5, 6 - // - // ReceiveProposal: - // - 100ms - Asia - Asia ( 3, 4 to 1) - // - 500ms - Asia - Europe ( 5, 6 to 2) - // - // No network: - // - 1s - ValidateProposal - // - // Expected times [1.1s, 1.1s, 1.5s, 1.5s] - - let regions = HashMap::from([ - (Region::Asia, first_five.to_vec()), - (Region::Europe, rest.to_vec()), - ]); - - let network_behaviour = HashMap::from([ - ( - (Region::Asia, Region::Asia), - NetworkBehaviour::new(Duration::from_millis(100), 0.0), - ), - ( - (Region::Asia, Region::Europe), - NetworkBehaviour::new(Duration::from_millis(500), 0.0), - ), - ( - (Region::Europe, Region::Europe), - NetworkBehaviour::new(Duration::from_millis(100), 0.0), - ), - ]); - - let node_settings: CarnotNodeSettings = CarnotNodeSettings { - steps_costs: CARNOT_STEPS_COSTS.iter().cloned().collect(), - network: Network::new(RegionsData::new(regions, network_behaviour)), - layout: overlay.layout(&node_ids, &mut rng), - leaders: leaders.clone().into_iter().map(From::from).collect(), - }; - - let mut runner = ConsensusRunner::::new( - &mut rng, - layout, - leaders.into_iter().map(From::from).collect(), - Rc::new(node_settings), - ); - - assert_eq!( - Duration::from_millis(7800), - runner - .run(Box::new(|times: &[StepTime]| *times.iter().max().unwrap()) as Reducer) - .round_time - ); - } - - #[test] - fn run_tree_network_config_100() { - let mut rng = StepRng::new(1, 0); - - let overlay = TreeOverlay::new(TreeSettings { - tree_type: TreeType::FullBinaryTree, - depth: 3, - committee_size: 100, - }); - - let node_ids = overlay.nodes(); - let layout = overlay.layout(&node_ids, &mut rng); - // Normaly a leaders would be selected randomly, here, we're assuming it's NodeID 1. - // let leaders: Vec = overlay.leaders(&node_ids, 1, &mut rng).collect(); - let leaders = vec![1]; - - let two_thirds = node_ids.len() as f32 * 0.66; - let rest = &node_ids[two_thirds as usize..]; - let two_thirds = &node_ids[..two_thirds as usize]; - - let regions = HashMap::from([ - (Region::Asia, two_thirds.to_vec()), - (Region::Europe, rest.to_vec()), - ]); - - let network_behaviour = HashMap::from([ - ( - (Region::Asia, Region::Asia), - NetworkBehaviour::new(Duration::from_millis(100), 0.0), - ), - ( - (Region::Asia, Region::Europe), - NetworkBehaviour::new(Duration::from_millis(500), 0.0), - ), - ( - (Region::Europe, Region::Europe), - NetworkBehaviour::new(Duration::from_millis(100), 0.0), - ), - ]); - - let node_settings: CarnotNodeSettings = CarnotNodeSettings { - steps_costs: CARNOT_STEPS_COSTS.iter().cloned().collect(), - network: Network::new(RegionsData::new(regions, network_behaviour)), - layout: overlay.layout(&node_ids, &mut rng), - leaders: leaders.clone().into_iter().map(From::from).collect(), - }; - - let mut runner = ConsensusRunner::::new( - &mut rng, - layout, - leaders.into_iter().map(From::from).collect(), - Rc::new(node_settings), - ); - - assert_eq!( - Duration::from_millis(7800), - runner - .run(Box::new(|times: &[StepTime]| *times.iter().max().unwrap()) as Reducer) - .round_time - ); - } -} diff --git a/simulations/src/runner/async_runner.rs b/simulations/src/runner/async_runner.rs new file mode 100644 index 00000000..f9660b7a --- /dev/null +++ b/simulations/src/runner/async_runner.rs @@ -0,0 +1,53 @@ +use crate::node::{Node, NodeId}; +use crate::output_processors::OutData; +use crate::overlay::Overlay; +use crate::runner::SimulationRunner; +use crate::warding::SimulationState; +use rand::prelude::SliceRandom; +use rayon::prelude::*; +use std::collections::HashSet; +use std::sync::Arc; + +pub fn simulate( + runner: &mut SimulationRunner, + chunk_size: usize, + mut out_data: Option<&mut Vec>, +) where + N::Settings: Clone, + N: Send + Sync, + O::Settings: Clone, +{ + let simulation_state = SimulationState:: { + nodes: Arc::clone(&runner.nodes), + }; + + let mut node_ids: Vec = runner + .nodes + .read() + .expect("Read access to nodes vector") + .iter() + .map(N::id) + .collect(); + + runner.dump_state_to_out_data(&simulation_state, &mut out_data); + + loop { + node_ids.shuffle(&mut runner.rng); + for ids_chunk in node_ids.chunks(chunk_size) { + let ids: HashSet = ids_chunk.iter().copied().collect(); + runner + .nodes + .write() + .expect("Write access to nodes vector") + .par_iter_mut() + .filter(|n| ids.contains(&n.id())) + .for_each(N::step); + + runner.dump_state_to_out_data(&simulation_state, &mut out_data); + } + // check if any condition makes the simulation stop + if runner.check_wards(&simulation_state) { + break; + } + } +} diff --git a/simulations/src/runner/glauber_runner.rs b/simulations/src/runner/glauber_runner.rs new file mode 100644 index 00000000..1cb5bb4b --- /dev/null +++ b/simulations/src/runner/glauber_runner.rs @@ -0,0 +1,57 @@ +use crate::node::{Node, NodeId}; +use crate::output_processors::OutData; +use crate::overlay::Overlay; +use crate::runner::SimulationRunner; +use crate::warding::SimulationState; +use rand::prelude::IteratorRandom; +use std::collections::BTreeSet; +use std::sync::Arc; + +/// [Glauber dynamics simulation](https://en.wikipedia.org/wiki/Glauber_dynamics) +pub fn simulate( + runner: &mut SimulationRunner, + update_rate: usize, + maximum_iterations: usize, + mut out_data: Option<&mut Vec>, +) where + N: Send + Sync, + N::Settings: Clone, + O::Settings: Clone, +{ + let simulation_state = SimulationState { + nodes: Arc::clone(&runner.nodes), + }; + let nodes_remaining: BTreeSet = (0..runner + .nodes + .read() + .expect("Read access to nodes vector") + .len()) + .map(From::from) + .collect(); + let iterations: Vec<_> = (0..maximum_iterations).collect(); + 'main: for chunk in iterations.chunks(update_rate) { + for _ in chunk { + if nodes_remaining.is_empty() { + break 'main; + } + + let node_id = *nodes_remaining.iter().choose(&mut runner.rng).expect( + "Some id to be selected as it should be impossible for the set to be empty here", + ); + + { + let mut shared_nodes = runner.nodes.write().expect("Write access to nodes vector"); + let node: &mut N = shared_nodes + .get_mut(node_id.inner()) + .expect("Node should be present"); + node.step(); + } + + // check if any condition makes the simulation stop + if runner.check_wards(&simulation_state) { + break 'main; + } + } + runner.dump_state_to_out_data(&simulation_state, &mut out_data); + } +} diff --git a/simulations/src/runner/layered_runner.rs b/simulations/src/runner/layered_runner.rs new file mode 100644 index 00000000..482bed7f --- /dev/null +++ b/simulations/src/runner/layered_runner.rs @@ -0,0 +1,155 @@ +//! # Layered simulation runner +//! +//! A revision of the [`glauber`](super::glauber_runner) simulation runner. +//! +//! **`glauber`** simulations have some drawbacks: +//! +//! * Completely random, difficult to control +//! * Not close to how real nodes would perform in reality +//! * Difficult to analise recorded data, as data it is updated by chunks of iterations +//! +//! To solve this we can use a concept of layered *glauber* executions. +//! The algorithm roughly works as follows: +//! +//! ```python +//! nodes <- [nodes] +//! layers <- [[nodes_ids], [], ...] +//! while nodes_to_compute(layers): +//! layer_index <- pick_rand_layer(layers) +//! node_index <- pop_rand_node(rand_layer) +//! step(nodes[node_index]) +//! if not node_decided(node): +//! push(layers[layer_index+1], node_index) +//! ``` +//! +//! From within this, controlling the *number of layers*, and *weighting* them (how often are they picked), +//! we can control the flow of the simulations. +//! Also we can consider that once the bottom layer is empty a fully step have been concluded and we can record +//! the data of that step simulation. + +// std +use std::collections::BTreeSet; +use std::ops::Not; +use std::sync::Arc; +// crates +use fixed_slice_deque::FixedSliceDeque; +use rand::prelude::{IteratorRandom, SliceRandom}; +use rand::rngs::SmallRng; +// internal +use crate::node::{Node, NodeId}; +use crate::output_processors::OutData; +use crate::overlay::Overlay; +use crate::runner::SimulationRunner; +use crate::warding::SimulationState; + +pub fn simulate( + runner: &mut SimulationRunner, + gap: usize, + distribution: Option>, + mut out_data: Option<&mut Vec>, +) where + N: Send + Sync, + N::Settings: Clone, + O::Settings: Clone, +{ + let distribution = + distribution.unwrap_or_else(|| std::iter::repeat(1.0f32).take(gap).collect()); + + let layers: Vec = (0..gap).collect(); + + let mut deque = build_node_ids_deque(gap, runner); + + let simulation_state = SimulationState { + nodes: Arc::clone(&runner.nodes), + }; + + loop { + let (group_index, node_id) = + choose_random_layer_and_node_id(&mut runner.rng, &distribution, &layers, &mut deque); + + // remove node_id from group + deque.get_mut(group_index).unwrap().remove(&node_id); + + { + let mut shared_nodes = runner.nodes.write().expect("Write access to nodes vector"); + let node: &mut N = shared_nodes + .get_mut(node_id.inner()) + .expect("Node should be present"); + let prev_view = node.current_view(); + node.step(); + let after_view = node.current_view(); + if after_view > prev_view { + // pass node to next step group + deque.get_mut(group_index + 1).unwrap().insert(node_id); + } + } + + // check if any condition makes the simulation stop + if runner.check_wards(&simulation_state) { + break; + } + + // if initial is empty then we finished a full round, append a new set to the end so we can + // compute the most advanced nodes again + if deque.first().unwrap().is_empty() { + let _ = deque.push_back(BTreeSet::default()); + runner.dump_state_to_out_data(&simulation_state, &mut out_data); + } + + // if no more nodes to compute + if deque.iter().all(BTreeSet::is_empty) { + break; + } + } + // write latest state + runner.dump_state_to_out_data(&simulation_state, &mut out_data); +} + +fn choose_random_layer_and_node_id( + rng: &mut SmallRng, + distribution: &[f32], + layers: &[usize], + deque: &mut FixedSliceDeque>, +) -> (usize, NodeId) { + let i = *layers + .iter() + // filter out empty round groups + .filter_map(|&i| { + let g = deque.get(i).unwrap(); + g.is_empty().not().then_some(i) + }) + // intermediate collect necessary for choose_weighted + .collect::>() + .choose_weighted(rng, |&i| distribution.get(i).unwrap()) + .expect("Distribution choose to work"); + + let group: &mut BTreeSet = deque.get_mut(i).unwrap(); + + let node_id = group.iter().choose(rng).unwrap(); + (i, *node_id) +} + +fn build_node_ids_deque( + gap: usize, + runner: &SimulationRunner, +) -> FixedSliceDeque> +where + N: Node, + O: Overlay, +{ + // add a +1 so we always have + let mut deque = FixedSliceDeque::new(gap + 1); + // push first layer + let node_ids: BTreeSet = runner + .nodes + .write() + .expect("Single access to runner nodes") + .iter() + .map(|node| node.id()) + .collect(); + + deque.push_back(node_ids); + // allocate default sets + while deque.try_push_back(BTreeSet::new()).is_ok() {} + deque +} diff --git a/simulations/src/runner/mod.rs b/simulations/src/runner/mod.rs new file mode 100644 index 00000000..7705afc0 --- /dev/null +++ b/simulations/src/runner/mod.rs @@ -0,0 +1,125 @@ +mod async_runner; +mod glauber_runner; +mod layered_runner; +mod sync_runner; + +use std::marker::PhantomData; +// std +use std::sync::{Arc, RwLock}; +// crates +use rand::rngs::SmallRng; +use rand::{RngCore, SeedableRng}; +use rayon::prelude::*; +// internal +use crate::node::Node; +use crate::output_processors::OutData; +use crate::overlay::Overlay; +use crate::settings::{RunnerSettings, SimulationSettings}; +use crate::warding::{SimulationState, SimulationWard}; + +/// Encapsulation solution for the simulations runner +/// Holds the network state, the simulating nodes and the simulation settings. +pub struct SimulationRunner +where + N: Node, + O: Overlay, +{ + nodes: Arc>>, + settings: SimulationSettings, + rng: SmallRng, + _overlay: PhantomData, +} + +impl SimulationRunner +where + N: Send + Sync, + N::Settings: Clone, + O::Settings: Clone, +{ + pub fn new(settings: SimulationSettings) -> Self { + let seed = settings + .seed + .unwrap_or_else(|| rand::thread_rng().next_u64()); + + println!("Seed: {seed}"); + + let mut rng = SmallRng::seed_from_u64(seed); + let overlay = O::new(settings.overlay_settings.clone()); + let nodes = Self::nodes_from_initial_settings(&settings, overlay, &mut rng); + + let nodes = Arc::new(RwLock::new(nodes)); + + Self { + nodes, + settings, + rng, + _overlay: Default::default(), + } + } + + /// Initialize nodes from settings and calculate initial network state. + fn nodes_from_initial_settings( + settings: &SimulationSettings, + _overlay: O, // TODO: attach overlay information to nodes + seed: &mut SmallRng, + ) -> Vec { + let SimulationSettings { + node_settings, + node_count, + .. + } = settings; + + (0..*node_count) + .map(|id| N::new(seed, id.into(), node_settings.clone())) + .collect() + } + + pub fn simulate(&mut self, out_data: Option<&mut Vec>) { + match self.settings.runner_settings.clone() { + RunnerSettings::Sync => { + sync_runner::simulate(self, out_data); + } + RunnerSettings::Async { chunks } => { + async_runner::simulate(self, chunks, out_data); + } + RunnerSettings::Glauber { + maximum_iterations, + update_rate, + } => { + glauber_runner::simulate(self, update_rate, maximum_iterations, out_data); + } + RunnerSettings::Layered { + rounds_gap, + distribution, + } => { + layered_runner::simulate(self, rounds_gap, distribution, out_data); + } + } + } + + fn dump_state_to_out_data( + &self, + _simulation_state: &SimulationState, + _out_ata: &mut Option<&mut Vec>, + ) { + todo!("What data do we want to expose?") + } + + fn check_wards(&mut self, state: &SimulationState) -> bool { + self.settings + .wards + .par_iter_mut() + .map(|ward| ward.analyze(state)) + .any(|x| x) + } + + fn step(&mut self) { + self.nodes + .write() + .expect("Single access to nodes vector") + .par_iter_mut() + .for_each(|node| { + node.step(); + }); + } +} diff --git a/simulations/src/runner/sync_runner.rs b/simulations/src/runner/sync_runner.rs new file mode 100644 index 00000000..d82de1e8 --- /dev/null +++ b/simulations/src/runner/sync_runner.rs @@ -0,0 +1,31 @@ +use super::SimulationRunner; +use crate::node::Node; +use crate::output_processors::OutData; +use crate::overlay::Overlay; +use crate::warding::SimulationState; +use std::sync::Arc; + +/// Simulate with option of dumping the network state as a `::polars::Series` +pub fn simulate( + runner: &mut SimulationRunner, + mut out_data: Option<&mut Vec>, +) where + N: Send + Sync, + N::Settings: Clone, + O::Settings: Clone, +{ + let state = SimulationState { + nodes: Arc::clone(&runner.nodes), + }; + + runner.dump_state_to_out_data(&state, &mut out_data); + + for _ in 1.. { + runner.step(); + runner.dump_state_to_out_data(&state, &mut out_data); + // check if any condition makes the simulation stop + if runner.check_wards(&state) { + break; + } + } +} diff --git a/simulations/src/settings.rs b/simulations/src/settings.rs new file mode 100644 index 00000000..09f247bd --- /dev/null +++ b/simulations/src/settings.rs @@ -0,0 +1,36 @@ +use crate::network::regions::Region; +use crate::node::StepTime; +use crate::warding::Ward; +use serde::Deserialize; +use std::collections::HashMap; + +#[derive(Clone, Debug, Deserialize, Default)] +pub enum RunnerSettings { + #[default] + Sync, + Async { + chunks: usize, + }, + Glauber { + maximum_iterations: usize, + update_rate: usize, + }, + Layered { + rounds_gap: usize, + distribution: Option>, + }, +} + +#[derive(Deserialize)] +pub struct SimulationSettings { + pub network_behaviors: HashMap<(Region, Region), StepTime>, + pub regions: Vec, + #[serde(default)] + pub wards: Vec, + pub overlay_settings: O, + pub node_settings: N, + pub runner_settings: RunnerSettings, + pub node_count: usize, + pub committee_size: usize, + pub seed: Option, +} diff --git a/simulations/src/warding/mod.rs b/simulations/src/warding/mod.rs new file mode 100644 index 00000000..9887f98c --- /dev/null +++ b/simulations/src/warding/mod.rs @@ -0,0 +1,44 @@ +// std +use std::sync::{Arc, RwLock}; +// crates +use serde::Deserialize; +// internal +use crate::node::Node; + +mod ttf; + +pub struct SimulationState { + pub nodes: Arc>>, +} + +/// A ward is a computation over the `NetworkState`, it must return true if the state satisfies +/// the warding conditions. It is used to stop the consensus simulation if such condition is reached. +pub trait SimulationWard { + type SimulationState; + fn analyze(&mut self, state: &Self::SimulationState) -> bool; +} + +/// Ward dispatcher +/// Enum to avoid Boxing (Box) wards. +#[derive(Debug, Deserialize)] +pub enum Ward { + #[serde(rename = "time_to_finality")] + MaxView(ttf::MaxViewWard), +} + +impl Ward { + pub fn simulation_ward_mut( + &mut self, + ) -> &mut dyn SimulationWard> { + match self { + Ward::MaxView(ward) => ward, + } + } +} + +impl SimulationWard for Ward { + type SimulationState = SimulationState; + fn analyze(&mut self, state: &Self::SimulationState) -> bool { + self.simulation_ward_mut().analyze(state) + } +} diff --git a/simulations/src/warding/ttf.rs b/simulations/src/warding/ttf.rs new file mode 100644 index 00000000..512cbed1 --- /dev/null +++ b/simulations/src/warding/ttf.rs @@ -0,0 +1,63 @@ +use crate::node::Node; +use crate::warding::{SimulationState, SimulationWard}; +use serde::Deserialize; + +/// Time to finality ward. It monitors the amount of rounds of the simulations, triggers when surpassing +/// the set threshold. +#[derive(Debug, Deserialize, Copy, Clone)] +pub struct MaxViewWard { + _max_view: usize, +} + +impl SimulationWard for MaxViewWard { + type SimulationState = SimulationState; + fn analyze(&mut self, _state: &Self::SimulationState) -> bool { + true // TODO: implement using simulation state + } +} + +#[cfg(test)] +mod test { + use crate::node::{Node, NodeId}; + use crate::warding::ttf::MaxViewWard; + use crate::warding::{SimulationState, SimulationWard}; + use rand::Rng; + use std::ops::AddAssign; + use std::sync::{Arc, RwLock}; + + #[test] + fn rebase_threshold() { + impl Node for usize { + type Settings = (); + type State = Self; + + fn new(rng: &mut R, id: NodeId, settings: Self::Settings) -> Self { + id.inner() + } + + fn id(&self) -> NodeId { + (*self).into() + } + + fn current_view(&self) -> usize { + *self + } + + fn state(&self) -> &Self::State { + self + } + + fn step(&mut self) { + self.add_assign(1); + } + } + let mut ttf = MaxViewWard { _max_view: 10 }; + let mut cond = false; + + let node = 11; + let mut state = SimulationState { + nodes: Arc::new(RwLock::new(vec![node])), + }; + assert!(ttf.analyze(&state)); + } +}