Simulation app runners (#105)

* Integrate new runners, fit types...make it compile

* Added missing runners

* Fix rebased changes

* Make tests pass

* Clippy happy

* More clippy happy
This commit is contained in:
Daniel Sanchez 2023-03-24 21:21:10 +01:00 committed by GitHub
parent 4a90ba6926
commit 92ef9e5a77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 795 additions and 898 deletions

View File

@ -3,12 +3,18 @@ name = "simulations"
version = "0.1.0"
edition = "2021"
[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2", features = ["js"] }
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
rand = { version = "0.8", features = ["small_rng"] }
serde = { version = "1.0", features = ["derive", "rc"] }
serde_with = "2"
serde_json = "1.0"
clap = { version = "4", features = ["derive"] }
fixed-slice-deque = "0.1.0-beta2"
nomos-core = { path = "../nomos-core" }
polars = { version = "0.27", features = ["serde", "object", "json", "csv-file", "parquet", "dtype-struct"] }
rand = { version = "0.8", features = ["small_rng"] }
rayon = "1.7"
serde = { version = "1.0", features = ["derive", "rc"] }
serde_with = "2.3"
serde_json = "1.0"
[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2", features = ["js"] }

View File

@ -1,58 +1,23 @@
use std::{path::PathBuf, str::FromStr};
// std
use std::error::Error;
use std::fmt::{Display, Formatter};
use std::fs::File;
use std::io::Cursor;
use std::path::{Path, PathBuf};
use std::str::FromStr;
// crates
use clap::Parser;
use rand::thread_rng;
use polars::io::SerWriter;
use polars::prelude::{DataFrame, JsonReader, SerReader};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use simulations::overlay::tree::TreeOverlay;
// internal
use simulations::{
config::Config,
node::{
carnot::{CarnotNode, CarnotStep, CarnotStepSolverType},
Node, StepTime,
},
overlay::{flat::FlatOverlay, Overlay},
runner::ConsensusRunner,
node::carnot::CarnotNode, output_processors::OutData, runner::SimulationRunner,
settings::SimulationSettings,
};
/// Simple program to greet a person
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Path for a yaml-encoded network config file
config: std::path::PathBuf,
#[arg(long, default_value_t = OverlayType::Flat)]
overlay_type: OverlayType,
#[arg(long, default_value_t = NodeType::Carnot)]
node_type: NodeType,
#[arg(short, long, default_value_t = OutputType::StdOut)]
output: OutputType,
}
#[derive(clap::ValueEnum, Debug, Copy, Clone, Serialize, Deserialize)]
enum OverlayType {
Flat,
}
impl core::fmt::Display for OverlayType {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::Flat => write!(f, "flat"),
}
}
}
#[derive(clap::ValueEnum, Debug, Copy, Clone, Serialize, Deserialize)]
enum NodeType {
Carnot,
}
impl core::fmt::Display for NodeType {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::Carnot => write!(f, "carnot"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
enum OutputType {
File(PathBuf),
@ -70,65 +35,137 @@ impl core::fmt::Display for OutputType {
}
}
impl FromStr for OutputType {
type Err = String;
/// Output format selector enum
#[derive(Clone, Debug, Default)]
enum OutputFormat {
Json,
Csv,
#[default]
Parquet,
}
impl Display for OutputFormat {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let tag = match self {
OutputFormat::Json => "json",
OutputFormat::Csv => "csv",
OutputFormat::Parquet => "parquet",
};
write!(f, "{tag}")
}
}
impl FromStr for OutputFormat {
type Err = std::io::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"stdout" => Ok(Self::StdOut),
"stderr" => Ok(Self::StdErr),
path => Ok(Self::File(PathBuf::from(path))),
match s.to_ascii_lowercase().as_str() {
"json" => Ok(Self::Json),
"csv" => Ok(Self::Csv),
"parquet" => Ok(Self::Parquet),
tag => Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Invalid {tag} tag, only [json, csv, polars] are supported",),
)),
}
}
}
pub fn main() -> Result<(), Box<dyn std::error::Error>> {
let Args {
config,
overlay_type,
node_type,
output,
} = Args::parse();
/// Main simulation wrapper
/// Pipes together the cli arguments with the execution
#[derive(Parser)]
pub struct SimulationApp {
/// Json file path, on `SimulationSettings` format
#[clap(long, short)]
input_settings: PathBuf,
/// Output file path
#[clap(long, short)]
output_file: PathBuf,
/// Output format selector
#[clap(long, short = 'f', default_value_t)]
output_format: OutputFormat,
}
let report = match (overlay_type, node_type) {
(OverlayType::Flat, NodeType::Carnot) => {
let cfg = serde_json::from_reader::<
_,
Config<
<CarnotNode as Node>::Settings,
<FlatOverlay as Overlay<CarnotNode>>::Settings,
CarnotStep,
CarnotStepSolverType,
>,
>(std::fs::File::open(config)?)?;
#[allow(clippy::unit_arg)]
let overlay = FlatOverlay::new(cfg.overlay_settings);
let node_ids = (0..cfg.node_count).map(From::from).collect::<Vec<_>>();
let mut rng = thread_rng();
let layout = overlay.layout(&node_ids, &mut rng);
let leaders = overlay.leaders(&node_ids, 1, &mut rng).collect();
impl SimulationApp {
pub fn run(self) -> Result<(), Box<dyn Error>> {
let Self {
input_settings,
output_file,
output_format,
} = self;
let simulation_settings: SimulationSettings<_, _> = load_json_from_file(&input_settings)?;
let mut runner: simulations::runner::ConsensusRunner<CarnotNode> =
ConsensusRunner::new(&mut rng, layout, leaders, cfg.node_settings);
runner.run(Box::new(|times: &[StepTime]| *times.iter().max().unwrap())
as Box<dyn Fn(&[StepTime]) -> StepTime>)
let mut simulation_runner: SimulationRunner<CarnotNode, TreeOverlay> =
SimulationRunner::new(simulation_settings);
// build up series vector
let mut out_data: Vec<OutData> = Vec::new();
simulation_runner.simulate(Some(&mut out_data));
let mut dataframe: DataFrame = out_data_to_dataframe(out_data);
dump_dataframe_to(output_format, &mut dataframe, &output_file)?;
Ok(())
}
};
}
let json = serde_json::to_string_pretty(&report)?;
match output {
OutputType::File(f) => {
use std::{fs::OpenOptions, io::Write};
fn out_data_to_dataframe(out_data: Vec<OutData>) -> DataFrame {
let mut cursor = Cursor::new(Vec::new());
serde_json::to_writer(&mut cursor, &out_data).expect("Dump data to json ");
let dataframe = JsonReader::new(cursor)
.finish()
.expect("Load dataframe from intermediary json");
let mut file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(f)?;
file.write_all(json.as_bytes())?;
}
OutputType::StdOut => println!("{json}"),
OutputType::StdErr => eprintln!("{json}"),
dataframe
.unnest(["state"])
.expect("Node state should be unnest")
}
/// Generically load a json file
fn load_json_from_file<T: DeserializeOwned>(path: &Path) -> Result<T, Box<dyn Error>> {
let f = File::open(path).map_err(Box::new)?;
serde_json::from_reader(f).map_err(|e| Box::new(e) as Box<dyn Error>)
}
fn dump_dataframe_to_json(data: &mut DataFrame, out_path: &Path) -> Result<(), Box<dyn Error>> {
let out_path = out_path.with_extension("json");
let f = File::create(out_path)?;
let mut writer = polars::prelude::JsonWriter::new(f);
writer
.finish(data)
.map_err(|e| Box::new(e) as Box<dyn Error>)
}
fn dump_dataframe_to_csv(data: &mut DataFrame, out_path: &Path) -> Result<(), Box<dyn Error>> {
let out_path = out_path.with_extension("csv");
let f = File::create(out_path)?;
let mut writer = polars::prelude::CsvWriter::new(f);
writer
.finish(data)
.map_err(|e| Box::new(e) as Box<dyn Error>)
}
fn dump_dataframe_to_parquet(data: &mut DataFrame, out_path: &Path) -> Result<(), Box<dyn Error>> {
let out_path = out_path.with_extension("parquet");
let f = File::create(out_path)?;
let writer = polars::prelude::ParquetWriter::new(f);
writer
.finish(data)
.map(|_| ())
.map_err(|e| Box::new(e) as Box<dyn Error>)
}
fn dump_dataframe_to(
output_format: OutputFormat,
data: &mut DataFrame,
out_path: &Path,
) -> Result<(), Box<dyn Error>> {
match output_format {
OutputFormat::Json => dump_dataframe_to_json(data, out_path),
OutputFormat::Csv => dump_dataframe_to_csv(data, out_path),
OutputFormat::Parquet => dump_dataframe_to_parquet(data, out_path),
}
}
fn main() -> Result<(), Box<dyn Error>> {
let app: SimulationApp = SimulationApp::parse();
app.run()?;
Ok(())
}

View File

@ -1,19 +0,0 @@
use crate::network::regions::Region;
use crate::node::StepTime;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Serialize, Deserialize)]
pub struct Config<N, O, S, C>
where
S: core::str::FromStr,
C: core::str::FromStr,
{
pub network_behaviors: HashMap<(Region, Region), StepTime>,
pub regions: Vec<Region>,
pub overlay_settings: O,
pub node_settings: N,
pub node_count: usize,
pub committee_size: usize,
pub step_costs: Vec<(S, C)>,
}

View File

@ -1,5 +1,7 @@
pub mod config;
pub mod network;
pub mod node;
pub mod output_processors;
pub mod overlay;
pub mod runner;
pub mod settings;
pub mod warding;

View File

@ -1,297 +0,0 @@
// std
use std::collections::HashMap;
use std::hash::Hash;
use std::rc::Rc;
use std::time::Duration;
// crates
use rand::prelude::SmallRng;
use rand::{Rng, SeedableRng};
// internal
use crate::network::Network;
use crate::node::{Node, NodeId, StepTime};
use crate::overlay::{Committee, Layout};
pub type RootCommitteeReceiverSolver = fn(&mut SmallRng, NodeId, &[NodeId], &Network) -> StepTime;
pub type ParentCommitteeReceiverSolver =
fn(&mut SmallRng, NodeId, &Committee, &Network) -> StepTime;
pub type ChildCommitteeReceiverSolver =
fn(&mut SmallRng, NodeId, &[&Committee], &Network) -> StepTime;
fn leader_receive_proposal(
rng: &mut SmallRng,
node: NodeId,
leader_nodes: &[NodeId],
network: &Network,
) -> StepTime {
assert!(!leader_nodes.is_empty());
leader_nodes
.iter()
.filter_map(|&sender| network.send_message_cost(rng, sender, node))
.max()
.map(From::from)
.unwrap()
}
fn receive_proposal(
rng: &mut SmallRng,
node: NodeId,
committee: &Committee,
network: &Network,
) -> StepTime {
assert!(!committee.is_empty());
committee
.nodes
.iter()
.filter_map(|&sender| network.send_message_cost(rng, sender, node))
.max()
.unwrap()
.into()
}
fn receive_commit(
rng: &mut SmallRng,
node: NodeId,
committees: &[&Committee],
network: &Network,
) -> StepTime {
assert!(!committees.is_empty());
committees
.iter()
.filter_map(|committee| {
committee
.nodes
.iter()
.filter_map(|&sender| network.send_message_cost(rng, sender, node))
.max()
})
.max()
.unwrap()
.into()
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CarnotStep {
RootReceiveProposal,
ReceiveProposal,
ValidateProposal,
LeaderReceiveVote,
ReceiveVote,
ValidateVote,
Ignore,
}
impl core::str::FromStr for CarnotStep {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.trim().to_lowercase().as_str() {
"receiveproposal" => Ok(Self::ReceiveProposal),
"validateproposal" => Ok(Self::ValidateProposal),
"receivevote" => Ok(Self::ReceiveVote),
"validatevote" => Ok(Self::ValidateVote),
_ => Err(format!("Unknown step: {s}")),
}
}
}
#[derive(Clone)]
pub enum CarnotStepSolver {
Plain(StepTime),
ParentCommitteeReceiverSolver(ParentCommitteeReceiverSolver),
ChildCommitteeReceiverSolver(ChildCommitteeReceiverSolver),
LeaderToCommitteeReceiverSolver(ChildCommitteeReceiverSolver),
RootCommitteeReceiverSolver(RootCommitteeReceiverSolver),
}
#[serde_with::serde_as]
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CarnotStepSolverType {
Plain(#[serde_as(as = "serde_with::DurationMilliSeconds")] StepTime),
ParentCommitteeReceiverSolver,
ChildCommitteeReceiverSolver,
LeaderToCommitteeReceiverSolver,
RootCommitteeReceiverSolver,
}
impl core::str::FromStr for CarnotStepSolverType {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.trim().replace(['_', '-'], "").to_lowercase().as_str() {
"plain" => Ok(Self::Plain(StepTime::from_millis(1))),
"parentcommitteereceiversolver" => Ok(Self::ParentCommitteeReceiverSolver),
"childcommitteereceiversolver" => Ok(Self::ChildCommitteeReceiverSolver),
x => {
let millis = x
.parse::<u64>()
.map_err(|_| format!("Unknown step solver type: {s}"))?;
Ok(Self::Plain(StepTime::from_millis(millis)))
}
}
}
}
impl CarnotStepSolverType {
pub fn to_solver(self) -> CarnotStepSolver {
match self {
Self::Plain(time) => CarnotStepSolver::Plain(time),
Self::ParentCommitteeReceiverSolver => {
CarnotStepSolver::ParentCommitteeReceiverSolver(receive_proposal)
}
Self::ChildCommitteeReceiverSolver => {
CarnotStepSolver::ChildCommitteeReceiverSolver(receive_commit)
}
Self::LeaderToCommitteeReceiverSolver => {
CarnotStepSolver::LeaderToCommitteeReceiverSolver(receive_commit)
}
Self::RootCommitteeReceiverSolver => {
CarnotStepSolver::RootCommitteeReceiverSolver(leader_receive_proposal)
}
}
}
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct CarnotNodeSettings {
pub steps_costs: HashMap<CarnotStep, CarnotStepSolverType>,
pub network: Network,
pub layout: Layout,
pub leaders: Vec<NodeId>,
}
#[derive(Clone)]
pub struct CarnotNode {
id: NodeId,
rng: SmallRng,
settings: Rc<CarnotNodeSettings>,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
pub enum CarnotRole {
Leader,
Root,
Intermediate,
Leaf,
}
pub const CARNOT_STEPS_COSTS: &[(CarnotStep, CarnotStepSolverType)] = &[
(
CarnotStep::RootReceiveProposal,
CarnotStepSolverType::RootCommitteeReceiverSolver,
),
(
CarnotStep::ReceiveProposal,
CarnotStepSolverType::ParentCommitteeReceiverSolver,
),
(
CarnotStep::ValidateProposal,
CarnotStepSolverType::Plain(StepTime::from_secs(1)),
),
(
CarnotStep::LeaderReceiveVote,
CarnotStepSolverType::LeaderToCommitteeReceiverSolver,
),
(
CarnotStep::ReceiveVote,
CarnotStepSolverType::ChildCommitteeReceiverSolver,
),
(
CarnotStep::ValidateVote,
CarnotStepSolverType::Plain(StepTime::from_secs(1)),
),
(
CarnotStep::Ignore,
CarnotStepSolverType::Plain(StepTime::from_secs(0)),
),
];
pub const CARNOT_LEADER_STEPS: &[CarnotStep] =
&[CarnotStep::LeaderReceiveVote, CarnotStep::ValidateVote];
pub const CARNOT_ROOT_STEPS: &[CarnotStep] = &[
CarnotStep::RootReceiveProposal,
CarnotStep::ValidateProposal,
CarnotStep::ReceiveVote,
CarnotStep::ValidateVote,
];
pub const CARNOT_INTERMEDIATE_STEPS: &[CarnotStep] = &[
CarnotStep::ReceiveProposal,
CarnotStep::ValidateProposal,
CarnotStep::ReceiveVote,
CarnotStep::ValidateVote,
];
pub const CARNOT_LEAF_STEPS: &[CarnotStep] =
&[CarnotStep::ReceiveProposal, CarnotStep::ValidateProposal];
pub const CARNOT_UNKNOWN_MESSAGE_RECEIVED_STEPS: &[CarnotStep] = &[CarnotStep::Ignore];
impl Node for CarnotNode {
type Settings = Rc<CarnotNodeSettings>;
fn new<R: Rng>(rng: &mut R, id: NodeId, settings: Self::Settings) -> Self {
let rng = SmallRng::from_rng(rng).unwrap();
Self { id, rng, settings }
}
fn id(&self) -> NodeId {
self.id
}
fn run_steps(&mut self, steps: &[CarnotStep]) -> StepTime {
use CarnotStepSolver::*;
let mut overall_steps_time = Duration::ZERO;
for step in steps.iter() {
let step_time = match self.settings.steps_costs.get(step).unwrap().to_solver() {
Plain(t) => t,
ParentCommitteeReceiverSolver(solver) => {
match self
.settings
.layout
.parent_nodes(self.settings.layout.committee(self.id))
{
Some(parent) => {
solver(&mut self.rng, self.id, &parent, &self.settings.network)
}
None => Duration::ZERO.into(),
}
}
ChildCommitteeReceiverSolver(solver) => solver(
&mut self.rng,
self.id,
&self
.settings
.layout
.children_nodes(self.settings.layout.committee(self.id)),
&self.settings.network,
),
RootCommitteeReceiverSolver(solver) => solver(
&mut self.rng,
self.id,
&self.settings.leaders,
&self.settings.network,
),
LeaderToCommitteeReceiverSolver(solver) => {
let committees: Vec<&Committee> =
self.settings.layout.committees.values().collect();
solver(
&mut self.rng,
self.id,
&committees[..],
&self.settings.network,
)
}
};
overall_steps_time += step_time.0
}
overall_steps_time.into()
}
}

View File

@ -0,0 +1,48 @@
// std
// crates
use rand::Rng;
use serde::Deserialize;
// internal
use crate::node::{Node, NodeId};
#[derive(Default)]
pub struct CarnotState {}
#[derive(Clone, Deserialize)]
pub struct CarnotSettings {}
#[allow(dead_code)] // TODO: remove when handling settings
pub struct CarnotNode {
id: NodeId,
state: CarnotState,
settings: CarnotSettings,
}
impl Node for CarnotNode {
type Settings = CarnotSettings;
type State = CarnotState;
fn new<R: Rng>(_rng: &mut R, id: NodeId, settings: Self::Settings) -> Self {
Self {
id,
state: Default::default(),
settings,
}
}
fn id(&self) -> NodeId {
self.id
}
fn current_view(&self) -> usize {
todo!()
}
fn state(&self) -> &CarnotState {
&self.state
}
fn step(&mut self) {
todo!()
}
}

View File

@ -8,8 +8,6 @@ use std::{
// crates
use rand::Rng;
use serde::{Deserialize, Serialize};
use self::carnot::CarnotStep;
// internal
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
@ -116,9 +114,13 @@ impl core::iter::Sum<StepTime> for Duration {
}
}
pub trait Node: Clone {
pub trait Node {
type Settings;
type State;
fn new<R: Rng>(rng: &mut R, id: NodeId, settings: Self::Settings) -> Self;
fn id(&self) -> NodeId;
fn run_steps(&mut self, steps: &[CarnotStep]) -> StepTime;
// TODO: View must be view whenever we integrate consensus engine
fn current_view(&self) -> usize;
fn state(&self) -> &Self::State;
fn step(&mut self);
}

View File

@ -0,0 +1,12 @@
use serde::Serialize;
pub type SerializedNodeState = serde_json::Value;
#[derive(Serialize)]
pub struct OutData {}
pub trait NodeStateRecord {
fn get_serialized_state_record(&self) -> SerializedNodeState {
SerializedNodeState::Null
}
}

View File

@ -4,13 +4,12 @@ use rand::prelude::IteratorRandom;
use rand::Rng;
// internal
use super::Overlay;
use crate::node::carnot::{CarnotNode, CarnotRole};
use crate::node::NodeId;
use crate::overlay::{Committee, Layout};
pub struct FlatOverlay;
impl Overlay<CarnotNode> for FlatOverlay {
impl Overlay for FlatOverlay {
type Settings = ();
fn new(_settings: Self::Settings) -> Self {
@ -36,7 +35,6 @@ impl Overlay<CarnotNode> for FlatOverlay {
0.into(),
Committee {
nodes: nodes.iter().copied().collect(),
role: CarnotRole::Leader,
},
))
.collect();

View File

@ -6,12 +6,11 @@ use std::collections::{BTreeSet, HashMap};
// crates
use rand::Rng;
// internal
use crate::node::{carnot::CarnotRole, CommitteeId, Node, NodeId};
use crate::node::{CommitteeId, NodeId};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Committee {
pub nodes: BTreeSet<NodeId>,
pub role: CarnotRole,
}
impl Committee {
@ -90,7 +89,7 @@ impl Layout {
}
}
pub trait Overlay<N: Node> {
pub trait Overlay {
type Settings;
fn new(settings: Self::Settings) -> Self;

View File

@ -2,17 +2,17 @@
use std::collections::HashMap;
// crates
use rand::seq::IteratorRandom;
use serde::Deserialize;
// internal
use super::{Committee, Layout, Overlay};
use crate::node::{
carnot::{CarnotNode, CarnotRole},
CommitteeId, NodeId,
};
use crate::node::{CommitteeId, NodeId};
#[derive(Clone, Deserialize)]
pub enum TreeType {
FullBinaryTree,
}
#[derive(Clone, Deserialize)]
pub struct TreeSettings {
pub tree_type: TreeType,
pub committee_size: usize,
@ -51,7 +51,8 @@ impl TreeOverlay {
.chunks(settings.committee_size)
.enumerate()
{
let mut has_children = false;
// TODO: Why do we have has_children here?
let mut _has_children = false;
let left_child_id = 2 * committee_id + 1;
let right_child_id = left_child_id + 1;
@ -61,7 +62,7 @@ impl TreeOverlay {
committee_id.into(),
vec![left_child_id.into(), right_child_id.into()],
);
has_children = true;
_has_children = true;
}
// Root node has no parent.
@ -70,15 +71,8 @@ impl TreeOverlay {
parents.insert(committee_id.into(), parent_id.into());
}
let role = match (committee_id, has_children) {
(0, _) => CarnotRole::Root,
(_, true) => CarnotRole::Intermediate,
(_, false) => CarnotRole::Leaf,
};
let committee = Committee {
nodes: nodes.iter().copied().copied().collect(),
role,
};
committees.insert(committee_id.into(), committee);
@ -93,7 +87,7 @@ impl TreeOverlay {
}
}
impl Overlay<CarnotNode> for TreeOverlay {
impl Overlay for TreeOverlay {
type Settings = TreeSettings;
fn new(settings: Self::Settings) -> Self {
@ -239,28 +233,7 @@ mod tests {
committee_size: 1,
});
let nodes = overlay.nodes();
let layout = overlay.layout(&nodes, &mut rng);
assert_eq!(
layout.committees[&CommitteeId::new(0)].role,
CarnotRole::Root
);
assert_eq!(
layout.committees[&CommitteeId::new(1)].role,
CarnotRole::Intermediate
);
assert_eq!(
layout.committees[&CommitteeId::new(2)].role,
CarnotRole::Intermediate
);
assert_eq!(
layout.committees[&CommitteeId::new(3)].role,
CarnotRole::Leaf
);
assert_eq!(
layout.committees[&CommitteeId::new(6)].role,
CarnotRole::Leaf
);
let _layout = overlay.layout(&nodes, &mut rng);
}
#[test]

View File

@ -1,428 +0,0 @@
use crate::node::carnot::{
CarnotRole, CARNOT_INTERMEDIATE_STEPS, CARNOT_LEADER_STEPS, CARNOT_LEAF_STEPS,
CARNOT_ROOT_STEPS, CARNOT_UNKNOWN_MESSAGE_RECEIVED_STEPS,
};
use crate::node::{CommitteeId, Node, NodeId, StepTime};
use crate::overlay::Layout;
use rand::Rng;
use std::collections::HashMap;
use std::time::Duration;
pub struct ConsensusRunner<N: Node> {
nodes: HashMap<NodeId, N>,
leaders: Vec<NodeId>,
layout: Layout,
}
#[allow(dead_code)]
#[derive(Debug, serde::Serialize)]
pub struct Report {
round_time: Duration,
}
type Reducer = Box<dyn Fn(&[StepTime]) -> StepTime>;
impl<N: Node> ConsensusRunner<N>
where
N::Settings: Clone,
{
pub fn new<R: Rng>(
mut rng: R,
layout: Layout,
leaders: Vec<NodeId>,
node_settings: N::Settings,
) -> Self {
let nodes = layout
.node_ids()
.map(|id| {
let node = N::new(&mut rng, id, node_settings.clone());
(id, node)
})
.collect();
Self {
nodes,
layout,
leaders,
}
}
pub fn run(&mut self, reducer: Reducer) -> Report {
let leaders = &self.leaders;
let layout = &self.layout;
let mut leader_times = leaders
.iter()
.map(|leader_node| {
vec![self
.nodes
.get_mut(leader_node)
.unwrap()
.run_steps(CARNOT_LEADER_STEPS)]
})
.collect();
let mut layer_times = Vec::new();
for layer_nodes in layout
.layers
.values()
.map(|committees| get_layer_nodes(committees, layout))
{
let times: Vec<StepTime> = layer_nodes
.iter()
.map(|(committee_id, node_id)| {
let steps = match layout.committees[committee_id].role {
CarnotRole::Root => CARNOT_ROOT_STEPS,
CarnotRole::Intermediate => CARNOT_INTERMEDIATE_STEPS,
CarnotRole::Leaf => CARNOT_LEAF_STEPS,
_ => {
// TODO: Should leader act as a leaf in a flat overlay?
CARNOT_UNKNOWN_MESSAGE_RECEIVED_STEPS
}
};
self.nodes.get_mut(node_id).unwrap().run_steps(steps)
})
.collect();
layer_times.push(times)
}
layer_times.append(&mut leader_times);
let round_time = layer_times.iter().map(|d| reducer(d)).sum();
Report { round_time }
}
}
fn get_layer_nodes(
layer_committees: &[CommitteeId],
layout: &Layout,
) -> Vec<(CommitteeId, NodeId)> {
layer_committees
.iter()
.flat_map(|committee_id| get_committee_nodes(committee_id, layout))
.collect()
}
fn get_committee_nodes(committee: &CommitteeId, layout: &Layout) -> Vec<(CommitteeId, NodeId)> {
layout.committees[committee]
.nodes
.clone()
.into_iter()
.map(|node_id| (*committee, node_id))
.collect()
}
#[cfg(test)]
mod test {
use crate::{
network::{
behaviour::NetworkBehaviour,
regions::{Region, RegionsData},
Network,
},
node::{
carnot::{CarnotNode, CarnotNodeSettings, CARNOT_STEPS_COSTS},
NodeId, StepTime,
},
overlay::{
flat::FlatOverlay,
tree::{TreeOverlay, TreeSettings, TreeType},
Overlay,
},
runner::{ConsensusRunner, Reducer},
};
use rand::{rngs::mock::StepRng, Rng};
use std::{collections::HashMap, rc::Rc, time::Duration};
fn setup_runner<R: Rng, O: Overlay<CarnotNode>>(
mut rng: &mut R,
overlay: &O,
) -> ConsensusRunner<CarnotNode> {
let node_ids = overlay.nodes();
let layout = overlay.layout(&node_ids, &mut rng);
let leaders: Vec<NodeId> = overlay.leaders(&node_ids, 1, &mut rng).collect();
let regions = std::iter::once((Region::Europe, node_ids.clone())).collect();
let network_behaviour = std::iter::once((
(Region::Europe, Region::Europe),
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
))
.collect();
let node_settings: CarnotNodeSettings = CarnotNodeSettings {
steps_costs: CARNOT_STEPS_COSTS.iter().cloned().collect(),
network: Network::new(RegionsData::new(regions, network_behaviour)),
layout: overlay.layout(&node_ids, &mut rng),
leaders: leaders.clone(),
};
ConsensusRunner::new(&mut rng, layout, leaders, Rc::new(node_settings))
}
#[test]
fn run_flat_single_leader_steps() {
let mut rng = StepRng::new(1, 0);
let overlay = FlatOverlay::new(());
let mut runner = setup_runner(&mut rng, &overlay);
assert_eq!(
Duration::from_millis(1100),
runner
.run(Box::new(|times: &[StepTime]| *times.iter().max().unwrap()) as Reducer)
.round_time
);
}
#[test]
fn run_tree_committee_1() {
let mut rng = StepRng::new(1, 0);
let overlay = TreeOverlay::new(TreeSettings {
tree_type: TreeType::FullBinaryTree,
depth: 3,
committee_size: 1,
});
let mut runner: ConsensusRunner<CarnotNode> = setup_runner(&mut rng, &overlay);
// # Leader (1 node):
//
// - 100ms - LeaderReceiveVote,
// - 1s - ValidateVote,
//
// Expected times [1.1s]
// # Root (1 node):
//
// - 100ms - RootReceiveProposal,
// - 1s - ValidateProposal,
// - 100ms - ReceiveVote,
// - 1s - ValidateVote,
//
// Expected times [2.2s]
// # Intermediary (2 nodes):
//
// - 100ms - ReceiveProposal,
// - 1s - ValidateProposal,
// - 100ms - ReceiveVote,
// - 1s - ValidateVote,
//
// Expected times [2.2s, 2.2s]
// # Leaf (4 nodes):
//
// - 100ms - ReceiveProposal
// - 1s - ValidateProposal
//
// Expected times [1.1s, 1.1s, 1.1s, 1.1s]
assert_eq!(
Duration::from_millis(6600),
runner
.run(Box::new(|times: &[StepTime]| *times.iter().max().unwrap()) as Reducer)
.round_time
);
}
#[test]
fn run_tree_committee_100() {
let mut rng = StepRng::new(1, 0);
let overlay = TreeOverlay::new(TreeSettings {
tree_type: TreeType::FullBinaryTree,
depth: 3,
committee_size: 100,
});
let mut runner: ConsensusRunner<CarnotNode> = setup_runner(&mut rng, &overlay);
assert_eq!(
Duration::from_millis(6600),
runner
.run(Box::new(|times: &[StepTime]| *times.iter().max().unwrap()) as Reducer)
.round_time
);
}
#[test]
fn run_tree_network_config_1() {
let mut rng = StepRng::new(1, 0);
let overlay = TreeOverlay::new(TreeSettings {
tree_type: TreeType::FullBinaryTree,
depth: 3,
committee_size: 1,
});
let node_ids = overlay.nodes();
let layout = overlay.layout(&node_ids, &mut rng);
// Normaly a leaders would be selected randomly, here, we're assuming it's NodeID 1.
// let leaders: Vec<NodeId> = overlay.leaders(&node_ids, 1, &mut rng).collect();
let leaders = vec![1];
let first_five = &node_ids[..5];
let rest = &node_ids[5..];
// 0
// 1 2
// 3 4 5 6
//
// # Leader - NodeID 1
//
// Sends vote to all committees.
//
// LeaderReceiveVote:
// - 100ms - Asia - Asia (1 to 0, 1, 2, 3, 4)
// - 500ms - Asia - Europe (1 to 5, 6)
//
// ValidateVote: 1s
//
// Expected times: [1.5s]
// # Root - NodeID 0
//
// Sends vote to child committees.
//
// RootReceiveProposal:
// - 100ms - Asia - Asia (0 to 1)
//
// ReceiveVote:
// - 100ms - Asia - Asia (0 to 1, 2)
//
// No network:
// - 1s - ValidateVote
// - 1s - ValidateProposal
//
// Expected times: [2.2s]
// # Intermediary - NodeID 1, 2:
//
// ReceiveVote:
// - 100ms - Asia - Asia (1 to 3, 4)
// - 500ms - Asia - Europe (2 to 5, 6)
//
// ReceiveProposal:
// - 100ms - Asia - Asia (1 to 0, 2 to 0)
//
// No network:
// - 1s - ValidateVote
// - 1s - ValidateProposal
//
// Expected times [2.2s, 2.6s]
// # Leaf - NodeID 3, 4, 5, 6
//
// ReceiveProposal:
// - 100ms - Asia - Asia ( 3, 4 to 1)
// - 500ms - Asia - Europe ( 5, 6 to 2)
//
// No network:
// - 1s - ValidateProposal
//
// Expected times [1.1s, 1.1s, 1.5s, 1.5s]
let regions = HashMap::from([
(Region::Asia, first_five.to_vec()),
(Region::Europe, rest.to_vec()),
]);
let network_behaviour = HashMap::from([
(
(Region::Asia, Region::Asia),
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
),
(
(Region::Asia, Region::Europe),
NetworkBehaviour::new(Duration::from_millis(500), 0.0),
),
(
(Region::Europe, Region::Europe),
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
),
]);
let node_settings: CarnotNodeSettings = CarnotNodeSettings {
steps_costs: CARNOT_STEPS_COSTS.iter().cloned().collect(),
network: Network::new(RegionsData::new(regions, network_behaviour)),
layout: overlay.layout(&node_ids, &mut rng),
leaders: leaders.clone().into_iter().map(From::from).collect(),
};
let mut runner = ConsensusRunner::<CarnotNode>::new(
&mut rng,
layout,
leaders.into_iter().map(From::from).collect(),
Rc::new(node_settings),
);
assert_eq!(
Duration::from_millis(7800),
runner
.run(Box::new(|times: &[StepTime]| *times.iter().max().unwrap()) as Reducer)
.round_time
);
}
#[test]
fn run_tree_network_config_100() {
let mut rng = StepRng::new(1, 0);
let overlay = TreeOverlay::new(TreeSettings {
tree_type: TreeType::FullBinaryTree,
depth: 3,
committee_size: 100,
});
let node_ids = overlay.nodes();
let layout = overlay.layout(&node_ids, &mut rng);
// Normaly a leaders would be selected randomly, here, we're assuming it's NodeID 1.
// let leaders: Vec<NodeId> = overlay.leaders(&node_ids, 1, &mut rng).collect();
let leaders = vec![1];
let two_thirds = node_ids.len() as f32 * 0.66;
let rest = &node_ids[two_thirds as usize..];
let two_thirds = &node_ids[..two_thirds as usize];
let regions = HashMap::from([
(Region::Asia, two_thirds.to_vec()),
(Region::Europe, rest.to_vec()),
]);
let network_behaviour = HashMap::from([
(
(Region::Asia, Region::Asia),
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
),
(
(Region::Asia, Region::Europe),
NetworkBehaviour::new(Duration::from_millis(500), 0.0),
),
(
(Region::Europe, Region::Europe),
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
),
]);
let node_settings: CarnotNodeSettings = CarnotNodeSettings {
steps_costs: CARNOT_STEPS_COSTS.iter().cloned().collect(),
network: Network::new(RegionsData::new(regions, network_behaviour)),
layout: overlay.layout(&node_ids, &mut rng),
leaders: leaders.clone().into_iter().map(From::from).collect(),
};
let mut runner = ConsensusRunner::<CarnotNode>::new(
&mut rng,
layout,
leaders.into_iter().map(From::from).collect(),
Rc::new(node_settings),
);
assert_eq!(
Duration::from_millis(7800),
runner
.run(Box::new(|times: &[StepTime]| *times.iter().max().unwrap()) as Reducer)
.round_time
);
}
}

View File

@ -0,0 +1,53 @@
use crate::node::{Node, NodeId};
use crate::output_processors::OutData;
use crate::overlay::Overlay;
use crate::runner::SimulationRunner;
use crate::warding::SimulationState;
use rand::prelude::SliceRandom;
use rayon::prelude::*;
use std::collections::HashSet;
use std::sync::Arc;
pub fn simulate<N: Node, O: Overlay>(
runner: &mut SimulationRunner<N, O>,
chunk_size: usize,
mut out_data: Option<&mut Vec<OutData>>,
) where
N::Settings: Clone,
N: Send + Sync,
O::Settings: Clone,
{
let simulation_state = SimulationState::<N> {
nodes: Arc::clone(&runner.nodes),
};
let mut node_ids: Vec<NodeId> = runner
.nodes
.read()
.expect("Read access to nodes vector")
.iter()
.map(N::id)
.collect();
runner.dump_state_to_out_data(&simulation_state, &mut out_data);
loop {
node_ids.shuffle(&mut runner.rng);
for ids_chunk in node_ids.chunks(chunk_size) {
let ids: HashSet<NodeId> = ids_chunk.iter().copied().collect();
runner
.nodes
.write()
.expect("Write access to nodes vector")
.par_iter_mut()
.filter(|n| ids.contains(&n.id()))
.for_each(N::step);
runner.dump_state_to_out_data(&simulation_state, &mut out_data);
}
// check if any condition makes the simulation stop
if runner.check_wards(&simulation_state) {
break;
}
}
}

View File

@ -0,0 +1,57 @@
use crate::node::{Node, NodeId};
use crate::output_processors::OutData;
use crate::overlay::Overlay;
use crate::runner::SimulationRunner;
use crate::warding::SimulationState;
use rand::prelude::IteratorRandom;
use std::collections::BTreeSet;
use std::sync::Arc;
/// [Glauber dynamics simulation](https://en.wikipedia.org/wiki/Glauber_dynamics)
pub fn simulate<N: Node, O: Overlay>(
runner: &mut SimulationRunner<N, O>,
update_rate: usize,
maximum_iterations: usize,
mut out_data: Option<&mut Vec<OutData>>,
) where
N: Send + Sync,
N::Settings: Clone,
O::Settings: Clone,
{
let simulation_state = SimulationState {
nodes: Arc::clone(&runner.nodes),
};
let nodes_remaining: BTreeSet<NodeId> = (0..runner
.nodes
.read()
.expect("Read access to nodes vector")
.len())
.map(From::from)
.collect();
let iterations: Vec<_> = (0..maximum_iterations).collect();
'main: for chunk in iterations.chunks(update_rate) {
for _ in chunk {
if nodes_remaining.is_empty() {
break 'main;
}
let node_id = *nodes_remaining.iter().choose(&mut runner.rng).expect(
"Some id to be selected as it should be impossible for the set to be empty here",
);
{
let mut shared_nodes = runner.nodes.write().expect("Write access to nodes vector");
let node: &mut N = shared_nodes
.get_mut(node_id.inner())
.expect("Node should be present");
node.step();
}
// check if any condition makes the simulation stop
if runner.check_wards(&simulation_state) {
break 'main;
}
}
runner.dump_state_to_out_data(&simulation_state, &mut out_data);
}
}

View File

@ -0,0 +1,155 @@
//! # Layered simulation runner
//!
//! A revision of the [`glauber`](super::glauber_runner) simulation runner.
//!
//! **`glauber`** simulations have some drawbacks:
//!
//! * Completely random, difficult to control
//! * Not close to how real nodes would perform in reality
//! * Difficult to analise recorded data, as data it is updated by chunks of iterations
//!
//! To solve this we can use a concept of layered *glauber* executions.
//! The algorithm roughly works as follows:
//!
//! ```python
//! nodes <- [nodes]
//! layers <- [[nodes_ids], [], ...]
//! while nodes_to_compute(layers):
//! layer_index <- pick_rand_layer(layers)
//! node_index <- pop_rand_node(rand_layer)
//! step(nodes[node_index])
//! if not node_decided(node):
//! push(layers[layer_index+1], node_index)
//! ```
//!
//! From within this, controlling the *number of layers*, and *weighting* them (how often are they picked),
//! we can control the flow of the simulations.
//! Also we can consider that once the bottom layer is empty a fully step have been concluded and we can record
//! the data of that step simulation.
// std
use std::collections::BTreeSet;
use std::ops::Not;
use std::sync::Arc;
// crates
use fixed_slice_deque::FixedSliceDeque;
use rand::prelude::{IteratorRandom, SliceRandom};
use rand::rngs::SmallRng;
// internal
use crate::node::{Node, NodeId};
use crate::output_processors::OutData;
use crate::overlay::Overlay;
use crate::runner::SimulationRunner;
use crate::warding::SimulationState;
pub fn simulate<N: Node, O: Overlay>(
runner: &mut SimulationRunner<N, O>,
gap: usize,
distribution: Option<Vec<f32>>,
mut out_data: Option<&mut Vec<OutData>>,
) where
N: Send + Sync,
N::Settings: Clone,
O::Settings: Clone,
{
let distribution =
distribution.unwrap_or_else(|| std::iter::repeat(1.0f32).take(gap).collect());
let layers: Vec<usize> = (0..gap).collect();
let mut deque = build_node_ids_deque(gap, runner);
let simulation_state = SimulationState {
nodes: Arc::clone(&runner.nodes),
};
loop {
let (group_index, node_id) =
choose_random_layer_and_node_id(&mut runner.rng, &distribution, &layers, &mut deque);
// remove node_id from group
deque.get_mut(group_index).unwrap().remove(&node_id);
{
let mut shared_nodes = runner.nodes.write().expect("Write access to nodes vector");
let node: &mut N = shared_nodes
.get_mut(node_id.inner())
.expect("Node should be present");
let prev_view = node.current_view();
node.step();
let after_view = node.current_view();
if after_view > prev_view {
// pass node to next step group
deque.get_mut(group_index + 1).unwrap().insert(node_id);
}
}
// check if any condition makes the simulation stop
if runner.check_wards(&simulation_state) {
break;
}
// if initial is empty then we finished a full round, append a new set to the end so we can
// compute the most advanced nodes again
if deque.first().unwrap().is_empty() {
let _ = deque.push_back(BTreeSet::default());
runner.dump_state_to_out_data(&simulation_state, &mut out_data);
}
// if no more nodes to compute
if deque.iter().all(BTreeSet::is_empty) {
break;
}
}
// write latest state
runner.dump_state_to_out_data(&simulation_state, &mut out_data);
}
fn choose_random_layer_and_node_id(
rng: &mut SmallRng,
distribution: &[f32],
layers: &[usize],
deque: &mut FixedSliceDeque<BTreeSet<NodeId>>,
) -> (usize, NodeId) {
let i = *layers
.iter()
// filter out empty round groups
.filter_map(|&i| {
let g = deque.get(i).unwrap();
g.is_empty().not().then_some(i)
})
// intermediate collect necessary for choose_weighted
.collect::<Vec<_>>()
.choose_weighted(rng, |&i| distribution.get(i).unwrap())
.expect("Distribution choose to work");
let group: &mut BTreeSet<NodeId> = deque.get_mut(i).unwrap();
let node_id = group.iter().choose(rng).unwrap();
(i, *node_id)
}
fn build_node_ids_deque<N, O>(
gap: usize,
runner: &SimulationRunner<N, O>,
) -> FixedSliceDeque<BTreeSet<NodeId>>
where
N: Node,
O: Overlay,
{
// add a +1 so we always have
let mut deque = FixedSliceDeque::new(gap + 1);
// push first layer
let node_ids: BTreeSet<NodeId> = runner
.nodes
.write()
.expect("Single access to runner nodes")
.iter()
.map(|node| node.id())
.collect();
deque.push_back(node_ids);
// allocate default sets
while deque.try_push_back(BTreeSet::new()).is_ok() {}
deque
}

View File

@ -0,0 +1,125 @@
mod async_runner;
mod glauber_runner;
mod layered_runner;
mod sync_runner;
use std::marker::PhantomData;
// std
use std::sync::{Arc, RwLock};
// crates
use rand::rngs::SmallRng;
use rand::{RngCore, SeedableRng};
use rayon::prelude::*;
// internal
use crate::node::Node;
use crate::output_processors::OutData;
use crate::overlay::Overlay;
use crate::settings::{RunnerSettings, SimulationSettings};
use crate::warding::{SimulationState, SimulationWard};
/// Encapsulation solution for the simulations runner
/// Holds the network state, the simulating nodes and the simulation settings.
pub struct SimulationRunner<N, O>
where
N: Node,
O: Overlay,
{
nodes: Arc<RwLock<Vec<N>>>,
settings: SimulationSettings<N::Settings, O::Settings>,
rng: SmallRng,
_overlay: PhantomData<O>,
}
impl<N: Node, O: Overlay> SimulationRunner<N, O>
where
N: Send + Sync,
N::Settings: Clone,
O::Settings: Clone,
{
pub fn new(settings: SimulationSettings<N::Settings, O::Settings>) -> Self {
let seed = settings
.seed
.unwrap_or_else(|| rand::thread_rng().next_u64());
println!("Seed: {seed}");
let mut rng = SmallRng::seed_from_u64(seed);
let overlay = O::new(settings.overlay_settings.clone());
let nodes = Self::nodes_from_initial_settings(&settings, overlay, &mut rng);
let nodes = Arc::new(RwLock::new(nodes));
Self {
nodes,
settings,
rng,
_overlay: Default::default(),
}
}
/// Initialize nodes from settings and calculate initial network state.
fn nodes_from_initial_settings(
settings: &SimulationSettings<N::Settings, O::Settings>,
_overlay: O, // TODO: attach overlay information to nodes
seed: &mut SmallRng,
) -> Vec<N> {
let SimulationSettings {
node_settings,
node_count,
..
} = settings;
(0..*node_count)
.map(|id| N::new(seed, id.into(), node_settings.clone()))
.collect()
}
pub fn simulate(&mut self, out_data: Option<&mut Vec<OutData>>) {
match self.settings.runner_settings.clone() {
RunnerSettings::Sync => {
sync_runner::simulate(self, out_data);
}
RunnerSettings::Async { chunks } => {
async_runner::simulate(self, chunks, out_data);
}
RunnerSettings::Glauber {
maximum_iterations,
update_rate,
} => {
glauber_runner::simulate(self, update_rate, maximum_iterations, out_data);
}
RunnerSettings::Layered {
rounds_gap,
distribution,
} => {
layered_runner::simulate(self, rounds_gap, distribution, out_data);
}
}
}
fn dump_state_to_out_data(
&self,
_simulation_state: &SimulationState<N>,
_out_ata: &mut Option<&mut Vec<OutData>>,
) {
todo!("What data do we want to expose?")
}
fn check_wards(&mut self, state: &SimulationState<N>) -> bool {
self.settings
.wards
.par_iter_mut()
.map(|ward| ward.analyze(state))
.any(|x| x)
}
fn step(&mut self) {
self.nodes
.write()
.expect("Single access to nodes vector")
.par_iter_mut()
.for_each(|node| {
node.step();
});
}
}

View File

@ -0,0 +1,31 @@
use super::SimulationRunner;
use crate::node::Node;
use crate::output_processors::OutData;
use crate::overlay::Overlay;
use crate::warding::SimulationState;
use std::sync::Arc;
/// Simulate with option of dumping the network state as a `::polars::Series`
pub fn simulate<N: Node, O: Overlay>(
runner: &mut SimulationRunner<N, O>,
mut out_data: Option<&mut Vec<OutData>>,
) where
N: Send + Sync,
N::Settings: Clone,
O::Settings: Clone,
{
let state = SimulationState {
nodes: Arc::clone(&runner.nodes),
};
runner.dump_state_to_out_data(&state, &mut out_data);
for _ in 1.. {
runner.step();
runner.dump_state_to_out_data(&state, &mut out_data);
// check if any condition makes the simulation stop
if runner.check_wards(&state) {
break;
}
}
}

View File

@ -0,0 +1,36 @@
use crate::network::regions::Region;
use crate::node::StepTime;
use crate::warding::Ward;
use serde::Deserialize;
use std::collections::HashMap;
#[derive(Clone, Debug, Deserialize, Default)]
pub enum RunnerSettings {
#[default]
Sync,
Async {
chunks: usize,
},
Glauber {
maximum_iterations: usize,
update_rate: usize,
},
Layered {
rounds_gap: usize,
distribution: Option<Vec<f32>>,
},
}
#[derive(Deserialize)]
pub struct SimulationSettings<N, O> {
pub network_behaviors: HashMap<(Region, Region), StepTime>,
pub regions: Vec<Region>,
#[serde(default)]
pub wards: Vec<Ward>,
pub overlay_settings: O,
pub node_settings: N,
pub runner_settings: RunnerSettings,
pub node_count: usize,
pub committee_size: usize,
pub seed: Option<u64>,
}

View File

@ -0,0 +1,44 @@
// std
use std::sync::{Arc, RwLock};
// crates
use serde::Deserialize;
// internal
use crate::node::Node;
mod ttf;
pub struct SimulationState<N> {
pub nodes: Arc<RwLock<Vec<N>>>,
}
/// A ward is a computation over the `NetworkState`, it must return true if the state satisfies
/// the warding conditions. It is used to stop the consensus simulation if such condition is reached.
pub trait SimulationWard<N> {
type SimulationState;
fn analyze(&mut self, state: &Self::SimulationState) -> bool;
}
/// Ward dispatcher
/// Enum to avoid Boxing (Box<dyn SimulationWard>) wards.
#[derive(Debug, Deserialize)]
pub enum Ward {
#[serde(rename = "time_to_finality")]
MaxView(ttf::MaxViewWard),
}
impl Ward {
pub fn simulation_ward_mut<N: Node>(
&mut self,
) -> &mut dyn SimulationWard<N, SimulationState = SimulationState<N>> {
match self {
Ward::MaxView(ward) => ward,
}
}
}
impl<N: Node> SimulationWard<N> for Ward {
type SimulationState = SimulationState<N>;
fn analyze(&mut self, state: &Self::SimulationState) -> bool {
self.simulation_ward_mut().analyze(state)
}
}

View File

@ -0,0 +1,63 @@
use crate::node::Node;
use crate::warding::{SimulationState, SimulationWard};
use serde::Deserialize;
/// Time to finality ward. It monitors the amount of rounds of the simulations, triggers when surpassing
/// the set threshold.
#[derive(Debug, Deserialize, Copy, Clone)]
pub struct MaxViewWard {
_max_view: usize,
}
impl<N: Node> SimulationWard<N> for MaxViewWard {
type SimulationState = SimulationState<N>;
fn analyze(&mut self, _state: &Self::SimulationState) -> bool {
true // TODO: implement using simulation state
}
}
#[cfg(test)]
mod test {
use crate::node::{Node, NodeId};
use crate::warding::ttf::MaxViewWard;
use crate::warding::{SimulationState, SimulationWard};
use rand::Rng;
use std::ops::AddAssign;
use std::sync::{Arc, RwLock};
#[test]
fn rebase_threshold() {
impl Node for usize {
type Settings = ();
type State = Self;
fn new<R: Rng>(rng: &mut R, id: NodeId, settings: Self::Settings) -> Self {
id.inner()
}
fn id(&self) -> NodeId {
(*self).into()
}
fn current_view(&self) -> usize {
*self
}
fn state(&self) -> &Self::State {
self
}
fn step(&mut self) {
self.add_assign(1);
}
}
let mut ttf = MaxViewWard { _max_view: 10 };
let mut cond = false;
let node = 11;
let mut state = SimulationState {
nodes: Arc::new(RwLock::new(vec![node])),
};
assert!(ttf.analyze(&state));
}
}