Extract claro simulations into new repository

This commit is contained in:
Daniel Sanchez Quiros 2022-10-19 15:17:22 +02:00
commit 6ea3006af9
36 changed files with 5114 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/target
.idea

2052
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

11
Cargo.toml Normal file
View File

@ -0,0 +1,11 @@
[workspace]
members = [
"consensus/claro",
"consensus/snowball",
"simulations/snow-family"
]
[profile.release-opt]
inherits = "release"
lto = true

22
README.md Normal file
View File

@ -0,0 +1,22 @@
# Consensus Research
## Project Structure
* `consensus`: Consensus implementation libraries
* `snowball`: Snowball implementation
* `claro`: Claro implementation
* `prototypes`: Simulations and experiments related libraries and binaries
* `consensus-simulations`: Consensus simulations app
## Build & Test
Minimal Rust supported version: `1.63`
When in development, please, use `cargo clippy` to build the project. Any warning is promoted to an error in our CI.
* Use `cargo test` for executing tests, and `cargo test -- --nocapture` for seeing test outputs.
* Use `cargo run --exampel {example_name}` to run an example.
### Build Documentation
Simply run `cargo doc --open --no-deps` to build and access a copy of the generated documentation.

View File

@ -0,0 +1,23 @@
[package]
name = "claro"
version = "0.1.0"
edition = "2021"
authors = [
"Daniel Sanchez Quiros <danielsq@status.im>"
]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1"
rand = "0.8"
tracing = "0.1"
tracing-core = "0.1"
tracing-subscriber = { version = "0.3", features = ["fmt", "json", "std"] }
tokio = { version = "1.17", features = ["sync"] }
[dev-dependencies]
tokio = { version = "1.17", features = ["rt-multi-thread", "macros"] }
[features]
default = []
testing = []

View File

@ -0,0 +1,361 @@
// std
use std::fmt::{Debug, Display, Formatter};
use std::marker::PhantomData;
use tracing::debug;
// crates
// internal
use crate::query::NodeQuery;
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum Vote<Tx> {
Yes(Tx),
No(Tx),
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum Opinion<Tx> {
None(Tx),
Yes(Tx),
No(Tx),
}
impl<Tx> Opinion<Tx> {
pub fn flip(self) -> Self {
match self {
Opinion::Yes(tx) => Opinion::No(tx),
Opinion::No(tx) => Opinion::Yes(tx),
none => none,
}
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum Decision<Tx> {
Decided(Opinion<Tx>),
Undecided(Opinion<Tx>),
}
impl<Tx> Display for Opinion<Tx> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let tag = match self {
Opinion::Yes(_) => "yes",
Opinion::No(_) => "no",
Opinion::None(_) => "none",
};
write!(f, "{}", tag)
}
}
impl<Tx> Display for Decision<Tx> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let tag = match self {
Decision::Decided(_) => "decided",
Decision::Undecided(_) => "undecided",
};
write!(f, "{}", tag)
}
}
impl<Tx> From<Opinion<Tx>> for Option<Vote<Tx>> {
fn from(opinion: Opinion<Tx>) -> Self {
match opinion {
Opinion::Yes(tx) => Some(Vote::Yes(tx)),
Opinion::No(tx) => Some(Vote::No(tx)),
Opinion::None(_) => None,
}
}
}
impl<Tx> From<Vote<Tx>> for Opinion<Tx> {
fn from(vote: Vote<Tx>) -> Self {
match vote {
Vote::Yes(tx) => Opinion::Yes(tx),
Vote::No(tx) => Opinion::No(tx),
}
}
}
impl<Tx> From<Decision<Tx>> for Option<Vote<Tx>> {
fn from(decision: Decision<Tx>) -> Self {
match decision {
Decision::Decided(opinion) | Decision::Undecided(opinion) => opinion.into(),
}
}
}
#[allow(dead_code)]
/// Claro round computed evidence, confidence and alpha
pub struct ClaroRoundCalculation {
confidence: f32,
e1: f32,
e2: f32,
e: f32,
alpha: f32,
}
/// Claro internal state
#[derive(Default, Debug)]
pub struct ClaroState {
/// Positive votes seen
evidence: usize,
/// Total votes seen, positive and negative
evidence_accumulated: usize,
/// Votes ratio
confidence: usize,
}
impl ClaroState {
pub fn update_confidence<Tx>(&mut self, votes: &[Vote<Tx>]) {
let total_votes = votes.len();
self.confidence = self.confidence.saturating_add(total_votes);
}
pub fn update_evidence<Tx>(&mut self, votes: &[Vote<Tx>]) {
let total_votes = votes.len();
let total_yes = votes.iter().filter(|v| matches!(v, Vote::Yes(_))).count();
self.evidence = self.evidence.saturating_add(total_yes);
self.evidence_accumulated = self.evidence_accumulated.saturating_add(total_votes);
}
pub fn confidence(&self) -> usize {
self.confidence
}
pub fn evidence(&self) -> usize {
self.evidence
}
pub fn evidence_accumulated(&self) -> usize {
self.evidence_accumulated
}
}
/// Node query configuration
#[derive(Debug, Clone, Copy)]
pub struct QueryConfiguration {
/// How many nodes to query
pub query_size: usize,
/// Initial query
pub initial_query_size: usize,
/// Growth increment per claro round
pub query_multiplier: usize,
/// Max value for [`QueryConfiguration::query_multiplier`]
pub max_multiplier: usize,
}
impl QueryConfiguration {
#[allow(dead_code)]
pub fn new(query_size: usize) -> Self {
Self {
query_size,
initial_query_size: query_size,
// TODO: Should this be configurable? Runtime vs Compiled
query_multiplier: 2,
max_multiplier: 4,
}
}
/// Increment query based upon configuration
/// query_size = min(query_size * growth_constant, initial_query_size * growth_max)
fn grow(&mut self) {
self.query_size = (self.query_size * self.query_multiplier)
.min(self.initial_query_size * self.max_multiplier);
}
}
/// Claro algorithm configuration
#[derive(Debug, Clone, Copy)]
pub struct ClaroConfiguration {
pub evidence_alpha: f32,
pub evidence_alpha_2: f32,
pub confidence_beta: f32,
pub look_ahead: usize,
pub query: QueryConfiguration,
}
/// Claro computation object
pub struct ClaroSolver<Tx> {
_phantom: PhantomData<Tx>,
/// Internal state
state: ClaroState,
/// Configuration, including node query configuration
configuration: ClaroConfiguration,
/// Current tx decision
decision: Decision<Tx>,
/// Node query setup for current node
node_query: NodeQuery,
}
// TODO: can we remove clone here?
impl<Tx: Clone + Debug> ClaroSolver<Tx> {
pub fn new(tx: Tx, configuration: ClaroConfiguration, node_query: NodeQuery) -> Self {
Self {
_phantom: Default::default(),
state: Default::default(),
decision: Decision::Undecided(Opinion::Yes(tx)),
configuration,
node_query,
}
}
pub fn with_initial_opinion(
configuration: ClaroConfiguration,
node_query: NodeQuery,
opinion: Opinion<Tx>,
) -> Self {
Self {
_phantom: Default::default(),
state: Default::default(),
decision: Decision::Undecided(opinion),
configuration,
node_query,
}
}
/// Compute a single round state from already queried nodes votes
fn round_state(&self, votes: &[Vote<Tx>]) -> ClaroRoundCalculation {
let total_votes = votes.len();
let yes_votes = votes.iter().filter(|&v| matches!(v, Vote::Yes(_))).count();
let confidence = self.state.confidence() as f32
/ (self.state.confidence() as f32 + self.configuration.look_ahead as f32);
let e1 = yes_votes as f32 / total_votes as f32;
let e2 = self.state.evidence() as f32 / self.state.evidence_accumulated() as f32;
let e = e1 * (1f32 - confidence) + e2 * confidence;
let alpha = self.configuration.evidence_alpha * (1f32 - confidence)
+ self.configuration.evidence_alpha_2 * confidence;
ClaroRoundCalculation {
confidence,
e1,
e2,
e,
alpha,
}
}
/// Compute a single round
/// mutates the decision parameter upon this round data
pub fn step(&mut self, tx: Tx, votes: &[Vote<Tx>]) {
assert!(matches!(self.decision, Decision::Undecided(_)));
debug!(votes = ?votes);
if let Decision::Undecided(Opinion::None(_)) = self.decision() {
if let Some(vote) = votes.first().cloned() {
self.decision = Decision::Undecided(vote.into());
}
}
if !votes.is_empty() {
self.state.update_evidence(votes);
self.state.update_confidence(votes);
let ClaroRoundCalculation {
e,
alpha,
confidence,
..
} = self.round_state(votes);
debug!(e = e, alpha = alpha);
if e > alpha {
self.decision = Decision::Undecided(Opinion::Yes(tx));
} else if e < 1f32 - alpha {
self.decision = Decision::Undecided(Opinion::No(tx));
} else {
self.configuration.query.grow();
}
if confidence > self.configuration.confidence_beta {
self.decision = Decision::Decided(self.opinion());
}
}
}
/// Derive vote from it's current decision
pub fn vote(&self) -> Option<Vote<Tx>> {
self.decision.clone().into()
}
pub fn decision(&self) -> Decision<Tx> {
self.decision.clone()
}
pub fn opinion(&self) -> Opinion<Tx> {
match &self.decision {
Decision::Decided(o) | Decision::Undecided(o) => o.clone(),
}
}
pub fn state(&self) -> &ClaroState {
&self.state
}
pub fn node_query(&self) -> &NodeQuery {
&self.node_query
}
}
#[cfg(test)]
mod test {
use crate::claro::{Decision, ClaroConfiguration, ClaroSolver, QueryConfiguration, Vote};
use crate::query::NodeQuery;
use crate::testing::query::*;
use crate::{Opinion, VoteQuery};
use std::fmt::Debug;
#[derive(Clone, Eq, PartialEq, Debug)]
struct EmptyTx;
fn test_all_votes<Tx: Clone + PartialEq + Debug + Send + Sync + 'static>(
tx: Tx,
votes: &[Vote<Tx>],
expected: Decision<Tx>,
) {
let config = ClaroConfiguration {
evidence_alpha: 0.01,
evidence_alpha_2: 0.01,
confidence_beta: 0.01,
look_ahead: 1,
query: QueryConfiguration::new(10),
};
let node_query = NodeQuery::new(config.query.query_size, "node_1".into());
let mut solver = ClaroSolver::new(tx.clone(), config, node_query);
assert_eq!(
solver.decision,
Decision::Undecided(Opinion::Yes(tx.clone()))
);
solver.step(tx, votes);
assert_eq!(solver.decision, expected);
}
#[test]
fn all_approved() {
let votes: Vec<_> = (0..10).map(|_| Vote::<bool>::Yes(true)).collect();
test_all_votes::<bool>(true, &votes, Decision::Decided(Opinion::Yes(true)));
}
#[test]
fn all_rejected() {
let votes: Vec<_> = (0..10).map(|_| Vote::<bool>::No(true)).collect();
test_all_votes::<bool>(true, &votes, Decision::Decided(Opinion::No(true)));
}
#[tokio::test]
async fn loop_all_approved() {
let vote = Vote::Yes(EmptyTx);
let mut fixed_query = FixedQuery::new(vote.clone());
let config = ClaroConfiguration {
evidence_alpha: 0.01,
evidence_alpha_2: 0.01,
confidence_beta: 0.01,
look_ahead: 1,
query: QueryConfiguration::new(10),
};
let node_query = NodeQuery::new(config.query.query_size, "node_1".into());
let mut solver = ClaroSolver::new(EmptyTx, config, node_query);
let query = fixed_query.query(&solver.node_query, EmptyTx).await;
solver.step(EmptyTx, &query);
assert_eq!(solver.vote(), Some(vote))
}
}

View File

@ -0,0 +1,12 @@
mod claro;
mod query;
mod tracing;
#[cfg(feature = "testing")]
pub mod testing;
pub use self::claro::{
Decision, ClaroConfiguration, ClaroSolver, ClaroState, Opinion, QueryConfiguration, Vote,
};
pub use self::query::{NodeId, NodeQuery, NodeWeight, NodesSample, VoteQuery};
pub use self::tracing::{claro_tracing_layer_with_writer, CLARO_TARGET_TAG};

View File

@ -0,0 +1,107 @@
use crate::claro::Vote;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::collections::HashMap;
use tracing::debug;
// TODO: Check on proper types
/// Node ids type
pub type NodeId = String;
/// Node weight alias
/// Refers to amount of staking a node holds
pub type NodeWeight = f64;
/// Node ids <=> weights sampling information trait
pub trait NodesSample {
fn nodes(&self) -> Vec<NodeId>;
fn weights(&self) -> HashMap<&NodeId, NodeWeight>;
}
/// Selector of nodes, random sample for some size `K`
#[derive(Debug, Clone)]
pub struct NodeQuery {
node_size: usize,
node_id: NodeId,
}
impl NodeQuery {
pub fn new(node_size: usize, node_id: NodeId) -> Self {
Self { node_size, node_id }
}
pub fn query_size(&self) -> usize {
self.node_size
}
pub fn node_id(&self) -> &NodeId {
&self.node_id
}
pub fn sample<Sample: NodesSample>(&self, node_sample: &Sample) -> Vec<NodeId> {
let node_ids = node_sample.nodes();
let weights = node_sample.weights();
// TODO: do we need to be reproducible?
let mut rng = thread_rng();
let node_ids = node_ids
.as_slice()
.choose_multiple_weighted(&mut rng, self.node_size + 1, |e| *weights.get(e).unwrap())
.unwrap()
.cloned()
.filter(|node_id| node_id != &self.node_id)
.take(self.node_size)
.collect();
debug!(query_node_ids = ?node_ids);
node_ids
}
}
/// Communication layer abstraction trait
/// Used by the claro algorithm runner to query for the votes of other nodes
#[async_trait::async_trait]
pub trait VoteQuery: Send + Sync {
type Tx;
async fn query(&mut self, node_query: &NodeQuery, tx: Self::Tx) -> Vec<Vote<Self::Tx>>;
}
#[cfg(test)]
mod test {
use crate::query::{NodeId, NodeQuery, NodeWeight, NodesSample};
use std::collections::{HashMap, HashSet};
struct TestSample {
node_ids: Vec<NodeId>,
node_weights: Vec<NodeWeight>,
}
impl TestSample {
fn len(&self) -> usize {
assert_eq!(self.node_weights.len(), self.node_ids.len());
self.node_ids.len()
}
}
impl NodesSample for TestSample {
fn nodes(&self) -> Vec<NodeId> {
self.node_ids.clone()
}
fn weights(&self) -> HashMap<&NodeId, NodeWeight> {
self.node_ids
.iter()
.zip(self.node_weights.iter().copied())
.collect()
}
}
#[test]
fn unique_sample_set() {
let query: NodeQuery = NodeQuery::new(10, "".into());
let sample = TestSample {
node_ids: (0..10).map(|i| i.to_string()).collect(),
node_weights: (1..11usize).map(|i| i as f64).collect(),
};
let ids: HashSet<_> = query.sample(&sample).into_iter().collect();
assert_eq!(ids.len(), sample.len());
}
}

View File

@ -0,0 +1 @@
pub mod query;

View File

@ -0,0 +1,38 @@
use crate::{NodeQuery, Vote, VoteQuery};
use std::marker::PhantomData;
pub struct NoQuery<Tx>(PhantomData<Tx>);
impl<Tx> Default for NoQuery<Tx> {
fn default() -> Self {
Self(Default::default())
}
}
#[async_trait::async_trait]
impl<Tx: Send + Sync> VoteQuery for NoQuery<Tx> {
type Tx = Tx;
async fn query(&mut self, _node_query: &NodeQuery, _tx: Self::Tx) -> Vec<Vote<Self::Tx>> {
vec![]
}
}
pub struct FixedQuery<Tx: Clone + Send + Sync>(Vote<Tx>);
impl<Tx: Clone + Send + Sync> FixedQuery<Tx> {
pub fn new(vote: Vote<Tx>) -> Self {
Self(vote)
}
}
#[async_trait::async_trait]
impl<Tx: Clone + Send + Sync> VoteQuery for FixedQuery<Tx> {
type Tx = Tx;
async fn query(&mut self, node_query: &NodeQuery, _tx: Self::Tx) -> Vec<Vote<Self::Tx>> {
(0..node_query.query_size())
.map(|_| self.0.clone())
.collect()
}
}

View File

@ -0,0 +1,17 @@
use tracing_core::Subscriber;
use tracing_subscriber::filter::filter_fn;
use tracing_subscriber::fmt::MakeWriter;
use tracing_subscriber::Layer;
pub const CLARO_TARGET_TAG: &str = "CLARO_TARGET";
pub fn claro_tracing_layer_with_writer<W, S>(writer: W, filter_tag: &'static str) -> impl Layer<S>
where
W: for<'writer> MakeWriter<'writer> + 'static,
S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
{
tracing_subscriber::fmt::layer()
.with_writer(writer)
.json()
.with_filter(filter_fn(move |metadata| metadata.target() == filter_tag))
}

View File

@ -0,0 +1,12 @@
[package]
name = "snowball"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
claro = { path = "../claro" }
[dev-dependencies]
claro = { path = "../claro", features = ["testing"]}

View File

@ -0,0 +1,7 @@
#[allow(dead_code)]
mod snowball;
pub use crate::snowball::{SnowballConfiguration, SnowballSolver};
/// Snowball logging filtering tag
pub const SNOWBALL_TARGET_TAG: &str = "SNOWBALL_TARGET";

View File

@ -0,0 +1,172 @@
use claro::{Decision, NodeQuery, Opinion, Vote};
use std::fmt::Debug;
/// Snowball algorithm configuration
#[derive(Debug, Clone, Copy)]
pub struct SnowballConfiguration {
pub quorum_size: usize,
pub sample_size: usize,
pub decision_threshold: usize,
}
/// Snowball computation object
pub struct SnowballSolver<Tx> {
configuration: SnowballConfiguration,
decision: Decision<Tx>,
consecutive_success: u64,
node_query: NodeQuery,
}
impl<Tx: Clone + Debug> SnowballSolver<Tx> {
pub fn new(tx: Tx, configuration: SnowballConfiguration, node_query: NodeQuery) -> Self {
Self {
configuration,
decision: Decision::Undecided(Opinion::None(tx)),
consecutive_success: 0,
node_query,
}
}
pub fn with_initial_opinion(
configuration: SnowballConfiguration,
node_query: NodeQuery,
opinion: Opinion<Tx>,
) -> Self {
Self {
configuration,
decision: Decision::Undecided(opinion),
consecutive_success: 0,
node_query,
}
}
fn count_opinion_votes(&self, votes: &[Vote<Tx>]) -> usize {
votes
.iter()
.filter(|v| {
matches!(
(v, self.vote()),
(Vote::Yes(_), Some(Vote::Yes(_))) | (Vote::No(_), Some(Vote::No(_)))
)
})
.count()
}
pub fn step(&mut self, votes: &[Vote<Tx>]) {
assert!(matches!(self.decision, Decision::Undecided(_)));
let preference_count = self.count_opinion_votes(votes);
let not_preference_count = votes.len() - preference_count;
if preference_count >= self.configuration.quorum_size {
self.consecutive_success += 1;
} else if not_preference_count >= self.configuration.quorum_size {
self.decision = Decision::Undecided(self.opinion().flip());
self.consecutive_success = 1;
} else {
self.consecutive_success = 0
}
if self.consecutive_success > self.configuration.decision_threshold as u64 {
self.decision = Decision::Decided(self.opinion())
}
}
pub fn consecutive_success(&self) -> u64 {
self.consecutive_success
}
pub fn decision(&self) -> Decision<Tx> {
self.decision.clone()
}
pub fn opinion(&self) -> Opinion<Tx> {
match &self.decision {
Decision::Decided(o) | Decision::Undecided(o) => o.clone(),
}
}
/// Derive vote from it's current decision
pub fn vote(&self) -> Option<Vote<Tx>> {
self.decision().into()
}
pub fn node_query(&self) -> &NodeQuery {
&self.node_query
}
}
#[cfg(test)]
mod test {
use super::{SnowballConfiguration, SnowballSolver};
use claro::{Decision, NodeQuery, Opinion, Vote};
#[test]
fn test_change_opinion() {
let configuration = SnowballConfiguration {
quorum_size: 1,
sample_size: 10,
decision_threshold: 10,
};
let mut solver = SnowballSolver::with_initial_opinion(
configuration,
NodeQuery::new(0, "0".to_string()),
Opinion::Yes(true),
);
let votes = vec![Vote::No(true); 10];
solver.step(&votes);
assert!(matches!(solver.decision(), Decision::Undecided(_)));
assert_eq!(solver.consecutive_success, 1);
assert_eq!(solver.opinion(), Opinion::No(true));
}
#[test]
fn test_makes_decision() {
let configuration = SnowballConfiguration {
quorum_size: 1,
sample_size: 10,
decision_threshold: 10,
};
let beta = configuration.decision_threshold;
let mut solver = SnowballSolver::with_initial_opinion(
configuration,
NodeQuery::new(0, "0".to_string()),
Opinion::Yes(true),
);
let votes = vec![Vote::No(true); 10];
for _ in 0..beta + 1 {
solver.step(&votes);
}
assert_eq!(solver.decision(), Decision::Decided(Opinion::No(true)));
assert_eq!(solver.consecutive_success, beta as u64 + 1);
assert_eq!(solver.opinion(), Opinion::No(true));
}
#[test]
fn test_reset_consecutive_counter() {
let configuration = SnowballConfiguration {
quorum_size: 2,
sample_size: 10,
decision_threshold: 10,
};
let mut solver = SnowballSolver::with_initial_opinion(
configuration,
NodeQuery::new(0, "0".to_string()),
Opinion::Yes(true),
);
let votes = vec![Vote::No(true), Vote::Yes(true)];
solver.step(&votes);
assert_eq!(solver.consecutive_success, 0);
assert_eq!(solver.opinion(), Opinion::Yes(true));
assert!(matches!(solver.decision(), Decision::Undecided(_)));
}
}

View File

@ -0,0 +1,18 @@
[package]
name = "consensus-simulations"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
clap = { version = "3.2", features = ["derive"] }
claro = { path = "../../consensus/claro", features = ["testing"] }
once_cell = "1.13"
polars = { version = "0.23", features = ["serde", "object", "json", "csv-file", "parquet", "dtype-struct"] }
rand = { version = "0.8", features = ["small_rng"] }
rayon = "1.5"
fixed-slice-deque = "0.1.0-beta1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
snowball = { path = "../../consensus/snowball" }

View File

@ -0,0 +1,323 @@
# Consensus simulations
## Usage
### Build
Run cargo build command under the general project folder (consensus-prototypes):
```shell
cargo build --profile release-opt --bin snow-family
```
Built binary is usually placed at `target/release-opt/consensus-simulations`,
or if built for a specific architecture (overridden) at `target/{ARCH}/release-opt/consensus-simulations`.
### Execute
Move binary at some place of your choice, or after build run (from the main project folder):
```shell
./target/release-opt/snow-family --help
```
```
consensus-simulations
Main simulation wrapper Pipes together the cli arguments with the execution
USAGE:
snow-family.exe [OPTIONS] --input-settings <INPUT_SETTINGS> --output-file <OUTPUT_FILE>
OPTIONS:
-f, --output-format <OUTPUT_FORMAT> Output format selector [default: parquet]
-h, --help Print help information
-i, --input-settings <INPUT_SETTINGS> Json file path, on `SimulationSettings` format
-o, --output-file <OUTPUT_FILE> Output file path
```
## SimulationSettings
Simulations are configured with a `json` settings description file like in the example:
```json
{
"consensus_settings": {
"snow_ball": {
"quorum_size": 14,
"sample_size": 20,
"decision_threshold": 20
}
},
"distribution": {
"yes": 0.6,
"no": 0.4,
"none": 0.0
},
"byzantine_settings": {
"total_size": 10000,
"distribution": {
"honest": 1.0,
"infantile": 0.0,
"random": 0.0,
"omniscient": 0.0
}
},
"wards": [
{
"time_to_finality": {
"ttf_threshold" : 1
}
}
],
"network_modifiers": [
{
"random_drop": {
"drop_rate": 0.01
}
}
],
"seed" : 18042022
}
```
### consensus_settings
`consensus_settings` is the consensus backend configuration, the following consensus are supported:
* [`snow_ball`](#Snowball)
* [`claro`](#Claro)
#### Snowball
Attributes:
* `quorum_size`: `usize`, `alpha` as per the snowball algorithm
* `sample_size`: `usize`, `K` as per the snowball algorithm
* `decision_threshold`: `usize`, `beta` as per the snowball algorithm
Example:
```json
{
"quorum_size": 14,
"sample_size": 20,
"decision_threshold": 20
}
```
#### Claro
Attributes:
* `evidence_alpha`: `f32`, `alpha` as per the claro algorithm
* `evidence_alpha_2`: `f32`, `alpha2` as per the claro algorithm
* `confidence_beta`: `f32`, `beta` as per the claro algorithm (AKA decision threshold)
* `look_ahead`: `usize`, `l` as per the claro algorithm
* `query`: `QueryConfiguration`:
* `query_size`: `usize`, step node query size
* `initial_query_size`: `usize`, base query size (usually same as `query_size`)
* `query_multiplier`: `usize`, query size calculation in case no quorum found in step query
* `max_multiplier`: `usize`, max query multiplier to apply
Example:
```json
{
"evidence_alpha": 0.8,
"evidence_alpha_2": 0.5,
"confidence_beta": 0.8,
"look_ahead": 20,
"query": {
"query_size": 30,
"initial_query_size": 30,
"query_multiplier": 2,
"max_multiplier": 4
}
}
```
### distribution
Initial **honest nodes** opinion distribution (**normalized**, must sum up to `1.0`)
* `yes`, initial yes distribution
* `no`, initial no distribution
* `none`, initial none opinion distribution
Example:
```json
{
"yes": 0.6,
"no": 0.4,
"none": 0.0
}
```
### byzantine_settings
Byzantine nodes configuration
* `total_size`: `usize`, total number of nodes to be spawned
* `distribution`: **normalized** distribution on hones/byzantine nodes
* `honest`: `f32`, **normalized** amount of hones nodes
* `infantile`: `f32`, **normalized** amount of infantile nodes
* `random`: `f32`, **normalized** amount of random nodes
* `omniscient`: `f32`, **normalized** amount of omniscient nodes
Example:
```json
{
"total_size": 10000,
"distribution": {
"honest": 1.0,
"infantile": 0.0,
"random": 0.0,
"omniscient": 0.0
}
}
```
### Simulation style
Simulation can be done in different fashions:
* *Sync*, (**default**) nodes run per step at the same time, updating on the previous states.
* *Async*, nodes run per batches (*chunks*) of predefined sizes
* *Glauber*, use the [glauber symulations solver](https://en.wikipedia.org/wiki/Glauber_dynamics)
* `update_rate`, record network state every `update_rate` processed chunks.
* `maximum_iterations`, threshold limit of simulation iterations
Example:
```json
{
...,
"simulation_style": "Sync"
}
```
```json
{
...,
"simulation_style": {
"Async" : {
"chunks": 20
}
}
}
```
```json
{
...,
"simulation_style": {
"Glauber" : {
"update_rate": 1000,
"maximum_iterations":1000000
}
}
}
```
### wards
List of configurable experiment stop conditions based on the network state.
* `time_to_finality`, break when reaching a threshold number of consensus rounds
* `ttf_threshold`: `usize`, threshold to be rebased
```json
[
{
"time_to_finality": {
"ttf_threshold" : 1
}
}
]
```
* `stabilised`, break when for `n` rounds the network state keeps the same
* `buffer`: `usize`, consecutive number of rounds or iterations to check
* `check`: selector of what the ward should be aware of for checking states:
* `rounds`: check by consecutive rounds
* `iterations`: check every `n` iterations
```json
[
{
"stabilised": {
"buffer" : 3,
"check" : { "type": "rounds" }
}
}
]
```
or
```json
[
{
"stabilised": {
"buffer" : 3,
"check" : { "type": "iterations", "chunk": 100 }
}
}
]
```
* `converged`, break when a specified ratio of decided nodes is reached
* `ratio`, `[0.0-1.0]` range of decided nodes threshold
### network_modifiers
List of modifiers that handle the network state in each step iteration
* `random_drop`, drop a percentage of the votes (setting them up as `None`)
* `drop_rate`: `f32`, normalize rate of dropped messages
Example:
```json
[
{
"random_drop": {
"drop_rate": 0.01
}
}
]
```
### seed
The simulations can be run with a customized seed (otherwise is provided by the app itself) in order to make reproducible
runs. An `u64` integer must be provided
```json
{
...
"seed" : 18042022
}
```
## Output format
Output is a [`Polars::Dataframe`](https://docs.rs/polars/latest/polars/frame/struct.DataFrame.html) [python version](https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.DataFrame.html)
Columns are vote states for each round (from `0`, initial state, to experiment end round).
Three modes are supported, `["json", "csv", "parquet"]`, all of them standard dumps of `polars`.
### Votes
Votes are encoded as:
* `None` => `0`
* `Yes` => `1`
* `No` => `2`

View File

@ -0,0 +1,150 @@
// std
use std::error::Error;
use std::fmt::{Display, Formatter};
use std::fs::File;
use std::io::Cursor;
use std::path::{Path, PathBuf};
use std::str::FromStr;
// crates
use crate::output_processors::OutData;
use clap::Parser;
use polars::io::SerWriter;
use polars::prelude::{DataFrame, JsonReader, SerReader};
use serde::de::DeserializeOwned;
// internal
use crate::runner::SimulationRunner;
use crate::settings::SimulationSettings;
/// Output format selector enum
#[derive(Debug, Default)]
enum OutputFormat {
Json,
Csv,
#[default]
Parquet,
}
impl Display for OutputFormat {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let tag = match self {
OutputFormat::Json => "json",
OutputFormat::Csv => "csv",
OutputFormat::Parquet => "parquet",
};
write!(f, "{}", tag)
}
}
impl FromStr for OutputFormat {
type Err = std::io::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"json" => Ok(Self::Json),
"csv" => Ok(Self::Csv),
"parquet" => Ok(Self::Parquet),
tag => Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Invalid {} tag, only [json, csv, polars] are supported",
tag
),
)),
}
}
}
/// Main simulation wrapper
/// Pipes together the cli arguments with the execution
#[derive(Parser)]
pub struct SimulationApp {
/// Json file path, on `SimulationSettings` format
#[clap(long, short)]
input_settings: PathBuf,
/// Output file path
#[clap(long, short)]
output_file: PathBuf,
/// Output format selector
#[clap(long, short = 'f', default_value_t)]
output_format: OutputFormat,
}
impl SimulationApp {
pub fn run(self) -> Result<(), Box<dyn Error>> {
let Self {
input_settings,
output_file,
output_format,
} = self;
let simulation_settings: SimulationSettings = load_json_from_file(&input_settings)?;
simulation_settings.distribution.check_distribution()?;
simulation_settings
.byzantine_settings
.distribution
.check_distribution()?;
let mut simulation_runner = SimulationRunner::new(simulation_settings);
// build up series vector
let mut out_data: Vec<OutData> = Vec::new();
simulation_runner.simulate(Some(&mut out_data));
let mut dataframe: DataFrame = out_data_to_dataframe(out_data);
dump_dataframe_to(output_format, &mut dataframe, &output_file)?;
Ok(())
}
}
fn out_data_to_dataframe(out_data: Vec<OutData>) -> DataFrame {
let mut cursor = Cursor::new(Vec::new());
serde_json::to_writer(&mut cursor, &out_data).expect("Dump data to json ");
let dataframe = JsonReader::new(cursor)
.finish()
.expect("Load dataframe from intermediary json");
dataframe
.unnest(["state"])
.expect("Node state should be unnest")
}
/// Generically load a json file
fn load_json_from_file<T: DeserializeOwned>(path: &Path) -> Result<T, Box<dyn Error>> {
let f = File::open(path).map_err(Box::new)?;
serde_json::from_reader(f).map_err(|e| Box::new(e) as Box<dyn Error>)
}
fn dump_dataframe_to_json(data: &mut DataFrame, out_path: &Path) -> Result<(), Box<dyn Error>> {
let out_path = out_path.with_extension("json");
let f = File::create(out_path)?;
let mut writer = polars::prelude::JsonWriter::new(f);
writer
.finish(data)
.map_err(|e| Box::new(e) as Box<dyn Error>)
}
fn dump_dataframe_to_csv(data: &mut DataFrame, out_path: &Path) -> Result<(), Box<dyn Error>> {
let out_path = out_path.with_extension("csv");
let f = File::create(out_path)?;
let mut writer = polars::prelude::CsvWriter::new(f);
writer
.finish(data)
.map_err(|e| Box::new(e) as Box<dyn Error>)
}
fn dump_dataframe_to_parquet(data: &mut DataFrame, out_path: &Path) -> Result<(), Box<dyn Error>> {
let out_path = out_path.with_extension("parquet");
let f = File::create(out_path)?;
let writer = polars::prelude::ParquetWriter::new(f);
writer
.finish(data)
.map_err(|e| Box::new(e) as Box<dyn Error>)
}
fn dump_dataframe_to(
output_format: OutputFormat,
data: &mut DataFrame,
out_path: &Path,
) -> Result<(), Box<dyn Error>> {
match output_format {
OutputFormat::Json => dump_dataframe_to_json(data, out_path),
OutputFormat::Csv => dump_dataframe_to_csv(data, out_path),
OutputFormat::Parquet => dump_dataframe_to_parquet(data, out_path),
}
}

View File

@ -0,0 +1,17 @@
mod app;
mod network_behaviour;
mod node;
mod output_processors;
mod runner;
mod settings;
mod warding;
use crate::app::SimulationApp;
use clap::Parser;
use std::error::Error;
fn main() -> Result<(), Box<dyn Error>> {
let app: SimulationApp = app::SimulationApp::parse();
app.run()?;
Ok(())
}

View File

@ -0,0 +1,61 @@
use crate::network_behaviour::NetworkBehaviour;
use crate::node::Vote;
use rand::prelude::IteratorRandom;
use rand::rngs::SmallRng;
use serde::Deserialize;
/// Randomly drop some of the network votes
/// Drop rate should be normalized
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct RandomDrop {
drop_rate: f32,
}
impl NetworkBehaviour for RandomDrop {
fn modify_network_state(&mut self, network_state: &mut [Option<Vote>], rng: &mut SmallRng) {
let amount: usize =
(self.drop_rate.clamp(0f32, 1f32) * network_state.len() as f32).round() as usize;
for i in (0..network_state.len()).choose_multiple(rng, amount) {
*network_state.get_mut(i).unwrap() = None;
}
}
}
#[cfg(test)]
mod test {
use crate::network_behaviour::drop::RandomDrop;
use crate::network_behaviour::NetworkBehaviour;
use crate::node::{NoTx, Vote};
use rand::prelude::SmallRng;
use rand::SeedableRng;
const SEED: u64 = 18042022;
#[test]
fn full_drop_rate() {
let mut rng: SmallRng = SmallRng::seed_from_u64(SEED);
let mut random_drop = RandomDrop { drop_rate: 1.0 };
let mut votes: Vec<Option<Vote>> = (0..10).map(|_| Some(Vote::Yes(NoTx))).collect();
random_drop.modify_network_state(&mut votes, &mut rng);
assert!(votes.iter().all(Option::is_none));
}
#[test]
fn none_drop_rate() {
let mut rng: SmallRng = SmallRng::seed_from_u64(SEED);
let mut random_drop = RandomDrop { drop_rate: 0.0 };
let mut votes: Vec<Option<Vote>> = (0..10).map(|_| Some(Vote::Yes(NoTx))).collect();
random_drop.modify_network_state(&mut votes, &mut rng);
assert!(votes.iter().all(Option::is_some));
}
#[test]
fn half_drop_rate() {
let mut rng: SmallRng = SmallRng::seed_from_u64(SEED);
let mut random_drop = RandomDrop { drop_rate: 0.5 };
let mut votes: Vec<Option<Vote>> = (0..10).map(|_| Some(Vote::Yes(NoTx))).collect();
random_drop.modify_network_state(&mut votes, &mut rng);
assert_eq!(votes.iter().filter(|vote| vote.is_some()).count(), 5);
}
}

View File

@ -0,0 +1,34 @@
mod drop;
use crate::node::Vote;
use rand::rngs::SmallRng;
use serde::Deserialize;
/// Modify a ['crate::node::NetworkState'](network state), single exclusive access is guaranteed
pub trait NetworkBehaviour {
fn modify_network_state(&mut self, network_state: &mut [Option<Vote>], rng: &mut SmallRng);
}
/// [`NetworkBehaviour`] dispatcher
/// Enum to avoid Boxing (Box<dyn NetworkBehaviour>) modifiers.
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum NetworkModifiers {
RandomDrop(drop::RandomDrop),
}
impl NetworkModifiers {
/// Get inner [`NetworkBehaviour`] mut reference
pub fn network_behaviour_mut(&mut self) -> &mut dyn NetworkBehaviour {
match self {
NetworkModifiers::RandomDrop(behaviour) => behaviour,
}
}
}
impl NetworkBehaviour for NetworkModifiers {
fn modify_network_state(&mut self, network_state: &mut [Option<Vote>], rng: &mut SmallRng) {
self.network_behaviour_mut()
.modify_network_state(network_state, rng);
}
}

View File

@ -0,0 +1,78 @@
// std
// crates
use rand::rngs::SmallRng;
use serde::Serialize;
// internal
use crate::node::{query_network_state, ComputeNode, Decision, NetworkState, NoTx, NodeId};
use crate::output_processors::{NodeStateRecord, SerializedNodeState};
use claro::{ClaroSolver, ClaroState};
/// Claro consensus node
/// Wrapper over [`::claro::ClaroSolver`]
pub struct ClaroNode {
solver: ClaroSolver<NoTx>,
network_state: NetworkState,
node_id: NodeId,
rng: SmallRng,
}
impl ClaroNode {
pub fn new(
node_id: usize,
solver: ClaroSolver<NoTx>,
network_state: NetworkState,
rng: SmallRng,
) -> Self {
Self {
node_id,
solver,
network_state,
rng,
}
}
}
impl ComputeNode for ClaroNode {
fn id(&self) -> usize {
self.node_id
}
fn step(&mut self) {
if matches!(self.solver.decision(), Decision::Undecided(_)) {
let votes = query_network_state(
&self.network_state,
self.solver.node_query().query_size(),
self.node_id,
&mut self.rng,
);
self.solver.step(NoTx, &votes);
}
}
fn decision(&self) -> Decision {
self.solver.decision()
}
}
#[derive(Serialize)]
struct OutClaroState {
evidence: u64,
evidence_accumulated: u64,
confidence: u64,
}
impl From<&ClaroState> for OutClaroState {
fn from(state: &ClaroState) -> Self {
OutClaroState {
evidence: state.evidence() as u64,
evidence_accumulated: state.evidence_accumulated() as u64,
confidence: state.confidence() as u64,
}
}
}
impl NodeStateRecord for ClaroNode {
fn get_serialized_state_record(&self) -> SerializedNodeState {
serde_json::to_value(OutClaroState::from(self.solver.state())).unwrap()
}
}

View File

@ -0,0 +1,73 @@
use rand::rngs::SmallRng;
// std
// crates
// internal
use crate::node::{
query_network_state, ComputeNode, Decision, NetworkState, NoTx, NodeId, Opinion, Vote,
};
use crate::output_processors::NodeStateRecord;
/// Node that replies with the opposite of the step query.
/// For each query:
/// if majority == yes: reply no
/// if majority == no: reply yes
pub struct InfantileNode {
network_state: NetworkState,
query_size: usize,
node_id: NodeId,
decision: Decision,
rng: SmallRng,
}
impl InfantileNode {
pub fn new(
node_id: usize,
query_size: usize,
network_state: NetworkState,
rng: SmallRng,
) -> Self {
let decision = Decision::Undecided(Opinion::None(NoTx));
Self {
node_id,
query_size,
network_state,
decision,
rng,
}
}
fn flip_majority(votes: &[Vote]) -> Opinion {
let yes_votes = votes
.iter()
.filter(|vote| matches!(vote, Vote::Yes(_)))
.count();
let len = votes.len();
if yes_votes > len / 2 {
Opinion::No(NoTx)
} else {
Opinion::Yes(NoTx)
}
}
}
impl ComputeNode for InfantileNode {
fn id(&self) -> usize {
self.node_id
}
fn step(&mut self) {
let votes = query_network_state(
&self.network_state,
self.query_size,
self.node_id,
&mut self.rng,
);
self.decision = Decision::Undecided(InfantileNode::flip_majority(&votes));
}
fn decision(&self) -> Decision {
self.decision
}
}
impl NodeStateRecord for InfantileNode {}

View File

@ -0,0 +1,184 @@
// std
use std::sync::{Arc, RwLock};
// crates
use ::claro::ClaroSolver;
use rand::prelude::IteratorRandom;
use rand::rngs::SmallRng;
use rand::RngCore;
// internal
use crate::node::claro::ClaroNode;
use crate::node::infantile::InfantileNode;
pub use crate::node::omniscient::{MasterOmniscientNode, OmniscientPuppetNode};
use crate::node::random::RandomNode;
use crate::node::snowball::SnowballNode;
use crate::output_processors::NodeStateRecord;
use ::snowball::SnowballSolver;
mod claro;
mod infantile;
mod omniscient;
mod random;
mod snowball;
/// Consensus experiments consist on just one round, we just care about voting itself not the content
/// hence we need a Transaction that carries no information.
#[derive(Copy, Clone, Debug)]
pub struct NoTx;
/// NoTx vote
pub type Vote = ::claro::Vote<NoTx>;
/// NoTx decision
pub type Decision = ::claro::Decision<NoTx>;
/// NoTx opinion
pub type Opinion = ::claro::Opinion<NoTx>;
pub type NodeId = usize;
/// Shared hook to the simulation state
pub type NetworkState = Arc<RwLock<Vec<Option<Vote>>>>;
/// Node computation abstraction layer
pub trait ComputeNode {
fn id(&self) -> usize;
fn step(&mut self);
fn vote(&self) -> Option<Vote> {
self.opinion().into()
}
fn opinion(&self) -> Opinion {
match self.decision() {
Decision::Decided(opinion) | Decision::Undecided(opinion) => opinion,
}
}
fn decision(&self) -> Decision;
}
/// Query the network state for a fixed size skipping self node id
pub fn query_network_state(
network_state: &NetworkState,
query_size: usize,
node_id: NodeId,
rng: &mut impl RngCore,
) -> Vec<Vote> {
network_state
.read()
.unwrap()
.iter()
.enumerate()
.choose_multiple(rng, query_size + 1)
.into_iter()
.filter_map(|(id, vote)| if id != node_id { *vote } else { None })
.take(query_size)
.collect()
}
/// Node dispatcher
/// Enum to avoid Boxing (Box<dyn ComputeNode>) the nodes.
pub enum Node {
Snowball(snowball::SnowballNode),
Claro(claro::ClaroNode),
Random(random::RandomNode),
Infantile(infantile::InfantileNode),
OmniscientPuppet(omniscient::OmniscientPuppetNode),
}
impl Node {
pub fn new_snowball(
node_id: NodeId,
solver: SnowballSolver<NoTx>,
network_state: NetworkState,
rng: SmallRng,
) -> Self {
Self::Snowball(SnowballNode::new(node_id, solver, network_state, rng))
}
pub fn new_claro(
node_id: NodeId,
solver: ClaroSolver<NoTx>,
network_state: NetworkState,
seed: SmallRng,
) -> Self {
Self::Claro(ClaroNode::new(node_id, solver, network_state, seed))
}
pub fn new_random(node_id: NodeId) -> Self {
Self::Random(RandomNode::new(node_id))
}
pub fn new_infantile(
node_id: NodeId,
query_size: usize,
network_state: NetworkState,
rng: SmallRng,
) -> Self {
Self::Infantile(InfantileNode::new(node_id, query_size, network_state, rng))
}
pub fn new_omniscient_puppet(puppet: OmniscientPuppetNode) -> Self {
Self::OmniscientPuppet(puppet)
}
/// Get `ComputeNode` inner mut reference
pub fn inner_node_mut(&mut self) -> &mut dyn ComputeNode {
let node: &mut dyn ComputeNode = match self {
Node::Snowball(node) => node,
Node::Claro(node) => node,
Node::Random(node) => node,
Node::Infantile(node) => node,
Node::OmniscientPuppet(node) => node,
};
node
}
/// Get `ComputeNode` inner reference
pub fn inner_node(&self) -> &dyn ComputeNode {
let node: &dyn ComputeNode = match self {
Node::Snowball(node) => node,
Node::Claro(node) => node,
Node::Random(node) => node,
Node::Infantile(node) => node,
Node::OmniscientPuppet(node) => node,
};
node
}
pub fn serialized_state(&self) -> &dyn NodeStateRecord {
match self {
Node::Snowball(node) => node,
Node::Claro(node) => node,
Node::Random(node) => node,
Node::Infantile(node) => node,
Node::OmniscientPuppet(node) => node,
}
}
pub fn type_as_string(&self) -> String {
match self {
Node::Snowball(_) => "snowball",
Node::Claro(_) => "claro",
Node::Random(_) => "random",
Node::Infantile(_) => "infantile",
Node::OmniscientPuppet(_) => "omniscient",
}
.to_string()
}
}
impl ComputeNode for Node {
fn id(&self) -> usize {
self.inner_node().id()
}
fn step(&mut self) {
self.inner_node_mut().step()
}
fn decision(&self) -> Decision {
self.inner_node().decision()
}
}

View File

@ -0,0 +1,114 @@
// std
use std::sync::{Arc, RwLock};
// crates
// internal
use crate::node::{ComputeNode, Decision, NetworkState, NoTx, NodeId, Opinion, Vote};
use crate::output_processors::NodeStateRecord;
/// Node that knows the network state all the time.
/// It orchestrates responses based on that.
/// As an optimization just a single node takes care of everything, then we place Puppet nodes
/// in the list of nodes that just replies with whatever the Master omniscient node decides.
pub struct MasterOmniscientNode {
honest_nodes_ids: Vec<NodeId>,
omniscient_nodes_ids: Vec<NodeId>,
network_state: NetworkState,
decision: Arc<RwLock<Decision>>,
node_id: NodeId,
}
/// Omniscient puppet node. Node that just replies with whatever the `MasterOmniscientNode` decides.
#[derive(Clone)]
pub struct OmniscientPuppetNode {
node_id: NodeId,
decision: Arc<RwLock<Decision>>,
}
impl MasterOmniscientNode {
pub fn new(
node_id: NodeId,
honest_nodes_ids: Vec<NodeId>,
omniscient_nodes_ids: Vec<NodeId>,
network_state: NetworkState,
) -> Self {
Self {
node_id,
honest_nodes_ids,
omniscient_nodes_ids,
network_state,
decision: Arc::new(RwLock::new(Decision::Undecided(Opinion::None(NoTx)))),
}
}
fn analyze_and_write_votes(&mut self) {
let mut state = self
.network_state
.write()
.expect("Only access to network state resource from omniscient node");
let honest_votes: Vec<Option<Vote>> = self
.honest_nodes_ids
.iter()
.map(|node_id| state.get(*node_id).expect("Node id should be within range"))
.copied()
.collect();
let yes_votes = honest_votes
.iter()
.filter(|v| matches!(v, Some(Vote::Yes(_))))
.count();
let no_votes = honest_votes
.iter()
.filter(|v| matches!(v, Some(Vote::No(_))))
.count();
let vote = if yes_votes > no_votes {
*self.decision.write().unwrap() = Decision::Undecided(Opinion::No(NoTx));
Some(Vote::No(NoTx))
} else {
*self.decision.write().unwrap() = Decision::Undecided(Opinion::Yes(NoTx));
Some(Vote::Yes(NoTx))
};
for &i in &self.omniscient_nodes_ids {
if let Some(old_vote) = state.get_mut(i) {
*old_vote = vote;
}
}
}
pub fn puppet_node(&self, node_id: NodeId) -> OmniscientPuppetNode {
OmniscientPuppetNode {
node_id,
decision: Arc::clone(&self.decision),
}
}
}
impl ComputeNode for MasterOmniscientNode {
fn id(&self) -> usize {
self.node_id
}
fn step(&mut self) {
self.analyze_and_write_votes();
}
fn decision(&self) -> Decision {
*self.decision.read().unwrap()
}
}
impl ComputeNode for OmniscientPuppetNode {
fn id(&self) -> usize {
self.node_id
}
fn step(&mut self) {}
fn decision(&self) -> Decision {
*self.decision.read().unwrap()
}
}
impl NodeStateRecord for OmniscientPuppetNode {}

View File

@ -0,0 +1,45 @@
// std
// crates
// internal
use crate::node::{ComputeNode, Decision, NoTx, NodeId, Opinion};
use crate::output_processors::NodeStateRecord;
/// Nodes that takes a random decision each step
pub struct RandomNode {
decision: Decision,
node_id: NodeId,
}
impl RandomNode {
pub fn new(node_id: NodeId) -> Self {
Self {
decision: Decision::Undecided(Opinion::None(NoTx)),
node_id,
}
}
fn rand_opinion() -> Opinion {
let bool_opinion: bool = rand::random();
if bool_opinion {
Opinion::Yes(NoTx)
} else {
Opinion::No(NoTx)
}
}
}
impl ComputeNode for RandomNode {
fn id(&self) -> usize {
self.node_id
}
fn step(&mut self) {
self.decision = Decision::Undecided(RandomNode::rand_opinion());
}
fn decision(&self) -> Decision {
self.decision
}
}
impl NodeStateRecord for RandomNode {}

View File

@ -0,0 +1,70 @@
use rand::rngs::SmallRng;
// std
// crates
use serde::Serialize;
// internal
use crate::node::{query_network_state, ComputeNode, Decision, NetworkState, NoTx, NodeId};
use crate::output_processors::{NodeStateRecord, SerializedNodeState};
use snowball::SnowballSolver;
/// Snowball consensus node
/// Wrapper over [`::snowball::SnowballSolver`]
pub struct SnowballNode {
solver: SnowballSolver<NoTx>,
network_state: NetworkState,
node_id: NodeId,
rng: SmallRng,
}
impl SnowballNode {
pub fn new(
node_id: usize,
solver: SnowballSolver<NoTx>,
network_state: NetworkState,
rng: SmallRng,
) -> Self {
Self {
node_id,
solver,
network_state,
rng,
}
}
}
impl ComputeNode for SnowballNode {
fn id(&self) -> usize {
self.node_id
}
fn step(&mut self) {
if matches!(self.solver.decision(), Decision::Undecided(_)) {
let votes = query_network_state(
&self.network_state,
self.solver.node_query().query_size(),
self.node_id,
&mut self.rng,
);
self.solver.step(&votes);
}
}
fn decision(&self) -> Decision {
self.solver.decision()
}
}
#[derive(Serialize)]
struct OutSnowballState {
consecutive_success: u64,
}
impl NodeStateRecord for SnowballNode {
fn get_serialized_state_record(&self) -> SerializedNodeState {
let consecutive_success = self.solver.consecutive_success();
serde_json::to_value(OutSnowballState {
consecutive_success,
})
.unwrap()
}
}

View File

@ -0,0 +1,19 @@
use serde::Serialize;
pub type SerializedNodeState = serde_json::Value;
#[derive(Serialize)]
pub struct OutData {
pub id: u64,
pub iteration: u64,
pub round: u64,
pub vote: u8,
pub _type: String,
pub state: SerializedNodeState,
}
pub trait NodeStateRecord {
fn get_serialized_state_record(&self) -> SerializedNodeState {
SerializedNodeState::Null
}
}

View File

@ -0,0 +1,64 @@
use crate::node::{ComputeNode, Vote};
use crate::output_processors::OutData;
use crate::runner::SimulationRunner;
use crate::warding::SimulationState;
use rand::prelude::SliceRandom;
use rayon::prelude::*;
use std::collections::HashSet;
use std::sync::Arc;
pub fn simulate(
runner: &mut SimulationRunner,
chunk_size: usize,
mut out_data: Option<&mut Vec<OutData>>,
) {
let mut node_ids: Vec<usize> = (0..runner
.nodes
.read()
.expect("Read access to nodes vector")
.len())
.collect();
let mut simulation_state = SimulationState {
network_state: Arc::clone(&runner.network_state),
nodes: Arc::clone(&runner.nodes),
iteration: 0,
round: 0,
};
runner.dump_state_to_out_data(&simulation_state, &mut out_data);
loop {
node_ids.shuffle(&mut runner.rng);
for ids_chunk in node_ids.chunks(chunk_size) {
if let Some(master_omniscient) = runner.master_omniscient.as_mut() {
master_omniscient.step();
}
let ids: HashSet<usize> = ids_chunk.iter().copied().collect();
let new_state: Vec<Option<Vote>> = runner
.nodes
.write()
.expect("Write access to nodes vector")
.par_iter_mut()
.enumerate()
.map(|(id, node)| {
if ids.contains(&id) {
node.step();
node.vote()
} else {
node.vote()
}
})
.collect();
runner.set_new_network_state(new_state);
runner.dump_state_to_out_data(&simulation_state, &mut out_data);
simulation_state.iteration += 1;
}
simulation_state.round += 1;
// check if any condition makes the simulation stop
if runner.check_wards(&simulation_state) {
break;
}
// run modifiers over the current step network state
runner.run_network_behaviour_modifiers();
}
}

View File

@ -0,0 +1,66 @@
use crate::node::{ComputeNode, Node, NodeId};
use crate::output_processors::OutData;
use crate::runner::SimulationRunner;
use crate::warding::SimulationState;
use rand::prelude::IteratorRandom;
use std::collections::BTreeSet;
use std::sync::Arc;
/// [Glauber dynamics simulation](https://en.wikipedia.org/wiki/Glauber_dynamics)
pub fn simulate(
runner: &mut SimulationRunner,
update_rate: usize,
maximum_iterations: usize,
mut out_data: Option<&mut Vec<OutData>>,
) {
let mut simulation_state = SimulationState {
network_state: Arc::clone(&runner.network_state),
nodes: Arc::clone(&runner.nodes),
iteration: 0,
round: 0,
};
let mut nodes_remaining: BTreeSet<NodeId> = (0..runner
.nodes
.read()
.expect("Read access to nodes vector")
.len())
.collect();
let iterations: Vec<_> = (0..maximum_iterations).collect();
'main: for chunk in iterations.chunks(update_rate) {
for i in chunk {
simulation_state.iteration = *i;
if nodes_remaining.is_empty() {
break 'main;
}
let node_id = *nodes_remaining.iter().choose(&mut runner.rng).expect(
"Some id to be selected as it should be impossible for the set to be empty here",
);
{
let vote = {
let mut shared_nodes =
runner.nodes.write().expect("Write access to nodes vector");
let node: &mut Node = shared_nodes
.get_mut(node_id)
.expect("Node should be present");
node.step();
if matches!(node.decision(), claro::Decision::Decided(_)) {
nodes_remaining.remove(&node_id);
}
node.vote()
};
runner.update_single_network_state_vote(node_id, vote);
}
// check if any condition makes the simulation stop
if runner.check_wards(&simulation_state) {
break 'main;
}
// run modifiers over the current step network state
runner.run_network_behaviour_modifiers();
}
runner.dump_state_to_out_data(&simulation_state, &mut out_data);
}
}

View File

@ -0,0 +1,448 @@
mod async_runner;
mod glauber_runner;
mod sync_runner;
// std
use std::sync::{Arc, RwLock};
// crates
use rand::prelude::SliceRandom;
use rand::rngs::SmallRng;
use rand::{RngCore, SeedableRng};
use rayon::prelude::*;
// internal
use crate::network_behaviour::NetworkBehaviour;
use crate::node::{
ComputeNode, MasterOmniscientNode, NetworkState, NoTx, Node, NodeId, Opinion, Vote,
};
use crate::output_processors::OutData;
use crate::settings::{
ByzantineDistribution, ByzantineSettings, ConsensusSettings, SimulationSettings,
SimulationStyle,
};
use crate::warding::{SimulationState, SimulationWard};
use claro::{ClaroSolver, NodeQuery};
use snowball::SnowballSolver;
/// Encapsulation solution for the simulations runner
/// Holds the network state, the simulating nodes and the simulation settings.
pub struct SimulationRunner {
network_state: NetworkState,
nodes: Arc<RwLock<Vec<Node>>>,
master_omniscient: Option<MasterOmniscientNode>,
settings: SimulationSettings,
rng: SmallRng,
}
impl SimulationRunner {
pub fn new(settings: SimulationSettings) -> Self {
let seed = settings
.seed
.unwrap_or_else(|| rand::thread_rng().next_u64());
println!("Seed: {}", seed);
let mut rng = SmallRng::seed_from_u64(seed);
let (nodes, network_state, master_omniscient) =
Self::nodes_from_initial_settings(&settings, &mut rng);
let nodes = Arc::new(RwLock::new(nodes));
Self {
network_state,
nodes,
master_omniscient,
settings,
rng,
}
}
/// Initialize nodes from settings and calculate initial network state.
fn nodes_from_initial_settings(
settings: &SimulationSettings,
mut seed: &mut SmallRng,
) -> (Vec<Node>, NetworkState, Option<MasterOmniscientNode>) {
let SimulationSettings {
consensus_settings,
distribution,
byzantine_settings:
ByzantineSettings {
total_size,
distribution:
ByzantineDistribution {
honest,
infantile,
random,
omniscient,
},
},
..
} = settings;
// shuffling is just for representation
let mut node_ids: Vec<_> = (0..*total_size).collect();
node_ids.shuffle(seed);
let mut node_ids_iter = node_ids.into_iter();
// total sized based sizes
let [honest_size, infantile_size, random_size, omniscient_size] =
[honest, infantile, random, omniscient]
.map(|&x| (*total_size as f32 * x).round() as usize);
dbg!([honest_size, infantile_size, random_size, omniscient_size]);
let options = [Opinion::None(NoTx), Opinion::Yes(NoTx), Opinion::No(NoTx)];
// build up initial hones nodes distribution
let mut votes_distribution: Vec<Opinion> = options
.into_iter()
.flat_map(|opinion| {
let size: usize =
(honest_size as f32 * distribution.weight_by_opinion(&opinion)) as usize;
std::iter::repeat(opinion).take(size)
})
.chain(std::iter::repeat(Opinion::None(NoTx)))
.take(honest_size)
.collect();
// check that we actually have all opinions as needed
assert_eq!(votes_distribution.len(), honest_size);
// shuffle distribution
votes_distribution.shuffle(seed);
// uninitialized network state, should be recalculated afterwards
let network_state: NetworkState = Arc::new(RwLock::new(vec![None; *total_size]));
// Allow needless collect: we actually need to do so in order to liberate the node_ids_iter
// otherwise it is borrowed mutably more than once...apparently the compiler is not smart enough (still)
// to catch that it should be safe to do so in this case. So, we collect.
// This should not really impact on running performance other than getting the nodes prepared
// would take a bit more.
let hones_nodes_ids: Vec<_> = std::iter::from_fn(|| node_ids_iter.next())
.take(honest_size)
.collect();
#[allow(clippy::needless_collect)]
let honest_nodes: Vec<_> = Self::build_honest_nodes(
hones_nodes_ids.iter().copied().zip(votes_distribution),
*total_size,
Arc::clone(&network_state),
*consensus_settings,
seed,
)
.collect();
#[allow(clippy::needless_collect)]
let infantile_nodes: Vec<_> = std::iter::from_fn(|| node_ids_iter.next())
.take(infantile_size)
.map(|node_id| {
Node::new_infantile(
node_id,
consensus_settings.query_size(),
Arc::clone(&network_state),
SmallRng::from_rng(&mut seed).expect("Rng should build properly from seed rng"),
)
})
.collect();
#[allow(clippy::needless_collect)]
let random_nodes: Vec<_> = std::iter::from_fn(|| node_ids_iter.next())
.take(random_size)
.map(Node::new_random)
.collect();
let (master_omniscient, omniscient_nodes) = {
if omniscient_size > 0 {
let omniscient_nodes_ids: Vec<_> = std::iter::from_fn(|| node_ids_iter.next())
.take(omniscient_size)
.collect();
let omniscient_node = MasterOmniscientNode::new(
NodeId::MAX,
hones_nodes_ids,
omniscient_nodes_ids.clone(),
Arc::clone(&network_state),
);
#[allow(clippy::needless_collect)]
let puppets: Vec<_> = omniscient_nodes_ids
.iter()
.map(|id| Node::new_omniscient_puppet(omniscient_node.puppet_node(*id)))
.collect();
(Some(omniscient_node), puppets.into_iter())
} else {
(None, vec![].into_iter())
}
};
let mut nodes: Vec<Node> = honest_nodes
.into_iter()
.chain(omniscient_nodes)
.chain(infantile_nodes.into_iter())
.chain(random_nodes.into_iter())
.collect();
nodes.sort_unstable_by_key(|node| node.inner_node().id());
// set up network state with the current distribution
let new_network_state = Self::network_state_from_nodes(&nodes);
*network_state.write().unwrap() = new_network_state;
(nodes, network_state, master_omniscient)
}
fn build_honest_nodes<'a>(
node_data: impl Iterator<Item = (NodeId, Opinion)> + 'a,
total_size: usize,
network_state: NetworkState,
consensus_settings: ConsensusSettings,
mut seed: &'a mut SmallRng,
) -> impl Iterator<Item = Node> + 'a {
match consensus_settings {
ConsensusSettings::SnowBall(snowball_settings) => {
node_data.map(Box::new(move |(node_id, opinion)| {
Node::new_snowball(
node_id,
SnowballSolver::with_initial_opinion(
snowball_settings,
NodeQuery::new(total_size, node_id.to_string()),
opinion,
),
Arc::clone(&network_state),
SmallRng::from_rng(&mut seed)
.expect("Rng should build properly from seed rng"),
)
})
as Box<dyn FnMut((usize, Opinion)) -> Node>)
}
ConsensusSettings::Claro(claro_settings) => {
node_data.map(Box::new(move |(node_id, opinion)| {
Node::new_claro(
node_id,
ClaroSolver::with_initial_opinion(
claro_settings,
NodeQuery::new(total_size, node_id.to_string()),
opinion,
),
Arc::clone(&network_state),
SmallRng::from_rng(&mut seed)
.expect("Rng should build properly from seed rng"),
)
})
as Box<dyn FnMut((usize, Opinion)) -> Node>)
}
}
}
#[inline]
fn network_state_from_nodes(nodes: &[Node]) -> Vec<Option<Vote>> {
dbg!(nodes.len());
nodes.par_iter().map(|node| node.vote()).collect()
}
pub fn simulate(&mut self, out_data: Option<&mut Vec<OutData>>) {
match &self.settings.simulation_style {
SimulationStyle::Sync => {
sync_runner::simulate(self, out_data);
}
&SimulationStyle::Async { chunks } => {
async_runner::simulate(self, chunks, out_data);
}
&SimulationStyle::Glauber {
maximum_iterations,
update_rate,
} => {
glauber_runner::simulate(self, update_rate, maximum_iterations, out_data);
}
}
}
fn dump_state_to_out_data(
&self,
simulation_state: &SimulationState,
out_ata: &mut Option<&mut Vec<OutData>>,
) {
if let Some(out) = out_ata.as_deref_mut() {
let nodes = self.nodes.read().unwrap();
let iteration = simulation_state.iteration as u64;
let round = simulation_state.round as u64;
let updated = nodes.iter().map(|node| {
let node_type = node.type_as_string();
let vote = match node.vote() {
None => 0u8,
Some(Vote::Yes(_)) => 1,
Some(Vote::No(_)) => 2,
};
OutData {
id: node.inner_node().id() as u64,
iteration,
_type: node_type,
round,
vote,
state: node.serialized_state().get_serialized_state_record(),
}
});
out.extend(updated);
}
}
fn check_wards(&mut self, state: &SimulationState) -> bool {
self.settings
.wards
.par_iter_mut()
.map(|ward| ward.analyze(state))
.any(|x| x)
}
fn run_network_behaviour_modifiers(&mut self) {
let mut network_state = self
.network_state
.write()
.expect("Single access to network state for running behaviour modifiers");
for modifier in self.settings.network_modifiers.iter_mut() {
modifier.modify_network_state(&mut network_state, &mut self.rng);
}
}
pub fn step(&mut self) {
let new_network_state: Vec<Option<Vote>> = self.run_step();
self.set_new_network_state(new_network_state);
}
fn set_new_network_state(&mut self, new_network_state: Vec<Option<Vote>>) {
let mut network_state = self
.network_state
.write()
.expect("No threads could be accessing the network state");
*network_state = new_network_state;
}
fn update_single_network_state_vote(&mut self, id: NodeId, vote: Option<Vote>) {
let mut network_state = self
.network_state
.write()
.expect("No threads could be accessing the network state");
*network_state.get_mut(id).unwrap() = vote;
}
fn run_step(&mut self) -> Vec<Option<Vote>> {
if let Some(master_omniscient) = self.master_omniscient.as_mut() {
master_omniscient.step();
}
self.nodes
.write()
.expect("Single access to nodes vector")
.par_iter_mut()
.map(|node| {
node.step();
node.vote()
})
.collect()
}
}
#[cfg(test)]
mod test {
use crate::node::{ComputeNode, Node, Vote};
use crate::runner::SimulationRunner;
use crate::settings::{
ByzantineDistribution, ByzantineSettings, ConsensusSettings, InitialDistribution,
SimulationSettings,
};
use claro::{ClaroConfiguration, QueryConfiguration};
use rand::rngs::SmallRng;
use rand::{thread_rng, SeedableRng};
#[test]
fn nodes_distribution_from_initial_settings() {
let initial_settings = SimulationSettings {
simulation_style: Default::default(),
consensus_settings: ConsensusSettings::Claro(ClaroConfiguration {
evidence_alpha: 0.0,
evidence_alpha_2: 0.0,
confidence_beta: 0.0,
look_ahead: 0,
query: QueryConfiguration {
query_size: 0,
initial_query_size: 0,
query_multiplier: 0,
max_multiplier: 0,
},
}),
distribution: InitialDistribution {
yes: 0.5,
no: 0.5,
none: 0.0,
},
byzantine_settings: ByzantineSettings {
total_size: 999,
distribution: ByzantineDistribution {
honest: 0.7,
infantile: 0.1,
random: 0.1,
omniscient: 0.1,
},
},
wards: vec![],
network_modifiers: vec![],
seed: None,
};
let mut rng = SmallRng::from_rng(&mut thread_rng()).unwrap();
let (nodes, _, _) =
SimulationRunner::nodes_from_initial_settings(&initial_settings, &mut rng);
let honest_nodes: Vec<_> = nodes
.iter()
.filter(|node| matches!(node, Node::Claro(_)))
.collect();
assert_eq!(
honest_nodes.len(),
(initial_settings.byzantine_settings.total_size as f32
* initial_settings.byzantine_settings.distribution.honest) as usize
);
let half_count = honest_nodes.len() / 2;
let yes_count = honest_nodes
.iter()
.filter(|node| matches!(node.vote(), Some(Vote::Yes(_))))
.count();
assert_eq!(yes_count, half_count);
let no_count = honest_nodes
.iter()
.filter(|node| matches!(node.vote(), Some(Vote::No(_))))
.count();
assert_eq!(no_count, half_count);
let byzantine_rate_size = 100;
let infantile_nodes_count = nodes
.iter()
.filter(|node| matches!(node, Node::Infantile(_)))
.count();
assert_eq!(infantile_nodes_count, byzantine_rate_size);
let random_nodes_count = nodes
.iter()
.filter(|node| matches!(node, Node::Random(_)))
.count();
assert_eq!(random_nodes_count, byzantine_rate_size);
let omniscient_nodes_count = nodes
.iter()
.filter(|node| matches!(node, Node::OmniscientPuppet(_)))
.count();
assert_eq!(omniscient_nodes_count, byzantine_rate_size);
}
}

View File

@ -0,0 +1,29 @@
use super::SimulationRunner;
use crate::output_processors::OutData;
use crate::warding::SimulationState;
use std::sync::Arc;
/// Simulate with option of dumping the network state as a `::polars::Series`
pub fn simulate(runner: &mut SimulationRunner, mut out_data: Option<&mut Vec<OutData>>) {
let mut state = SimulationState {
network_state: Arc::clone(&runner.network_state),
nodes: Arc::clone(&runner.nodes),
iteration: 0,
round: 0,
};
runner.dump_state_to_out_data(&state, &mut out_data);
for i in 1.. {
state.round = i;
state.iteration = i;
runner.step();
runner.dump_state_to_out_data(&state, &mut out_data);
// check if any condition makes the simulation stop
if runner.check_wards(&state) {
break;
}
// run modifiers over the current step network state
runner.run_network_behaviour_modifiers();
}
}

View File

@ -0,0 +1,155 @@
use std::error::Error;
use std::fmt::Debug;
// std
// crates
use crate::network_behaviour::NetworkModifiers;
use crate::node::Opinion;
use crate::warding::Ward;
use serde::Deserialize;
// internal
/// Foreign Serialize, Deserialize implementation for `::snowball::SnowballConfiguration`
#[derive(Debug, Deserialize)]
#[serde(remote = "::snowball::SnowballConfiguration")]
pub struct SnowballConfigurationDeSer {
pub quorum_size: usize,
pub sample_size: usize,
pub decision_threshold: usize,
}
/// Foreign Serialize, Deserialize implementation for `::claro::QueryConfiguration`
#[derive(Debug, Deserialize)]
#[serde(remote = "::claro::QueryConfiguration")]
pub struct QueryConfigurationDeSer {
pub query_size: usize,
pub initial_query_size: usize,
pub query_multiplier: usize,
pub max_multiplier: usize,
}
/// Foreign Serialize, Deserialize implementation for `::claro::ClaroConfiguration`
#[derive(Debug, Deserialize)]
#[serde(remote = "::claro::ClaroConfiguration")]
pub struct ClaroConfigurationDeSer {
pub evidence_alpha: f32,
pub evidence_alpha_2: f32,
pub confidence_beta: f32,
pub look_ahead: usize,
#[serde(with = "QueryConfigurationDeSer")]
pub query: ::claro::QueryConfiguration,
}
/// Consensus selector
#[derive(Debug, Copy, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ConsensusSettings {
SnowBall(#[serde(with = "SnowballConfigurationDeSer")] ::snowball::SnowballConfiguration),
Claro(#[serde(with = "ClaroConfigurationDeSer")] ::claro::ClaroConfiguration),
}
impl ConsensusSettings {
pub fn query_size(&self) -> usize {
match self {
ConsensusSettings::SnowBall(snowball) => snowball.sample_size,
ConsensusSettings::Claro(claro) => claro.query.query_size,
}
}
}
/// Initial normalized distribution settings for hones nodes. Must sum up to `1.0`
#[derive(Debug, Deserialize)]
pub struct InitialDistribution {
pub yes: f32,
pub no: f32,
pub none: f32,
}
impl InitialDistribution {
pub fn weight_by_opinion(&self, opinion: &Opinion) -> f32 {
match opinion {
Opinion::None(_) => self.none,
Opinion::Yes(_) => self.yes,
Opinion::No(_) => self.no,
}
}
pub fn check_distribution(&self) -> Result<(), Box<dyn Error>> {
let values = [self.none, self.yes, self.no];
check_normalized_distribution(self, &values)
}
}
/// Byzantine nodes normalized distribution. Must sum up to `1.0`
#[derive(Debug, Deserialize)]
pub struct ByzantineDistribution {
pub honest: f32,
pub infantile: f32,
pub random: f32,
pub omniscient: f32,
}
impl ByzantineDistribution {
pub fn check_distribution(&self) -> Result<(), Box<dyn Error>> {
let values = [self.honest, self.infantile, self.random, self.omniscient];
check_normalized_distribution(self, &values)
}
}
/// Byzantine settings, size of simulation and byzantine distribution
#[derive(Debug, Deserialize)]
pub struct ByzantineSettings {
pub total_size: usize,
pub distribution: ByzantineDistribution,
}
#[derive(Debug, Deserialize, Default)]
pub enum SimulationStyle {
#[default]
Sync,
Async {
chunks: usize,
},
Glauber {
maximum_iterations: usize,
update_rate: usize,
},
}
/// Full simulation settings:
/// * consensus settings
/// * initial distribution
/// * byzantine setting
/// * simulation wards
/// * simulation network behaviour modifiers
/// * simulation style
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct SimulationSettings {
pub consensus_settings: ConsensusSettings,
pub distribution: InitialDistribution,
pub byzantine_settings: ByzantineSettings,
#[serde(default)]
pub wards: Vec<Ward>,
#[serde(default)]
pub network_modifiers: Vec<NetworkModifiers>,
#[serde(default)]
pub simulation_style: SimulationStyle,
#[serde(default)]
pub seed: Option<u64>,
}
/// Check if a settings distribution is normalized (sum up to `1.0`)
fn check_normalized_distribution<T: Debug>(
holder: T,
distribution: &[f32],
) -> Result<(), Box<dyn Error>> {
let value: f32 = distribution.iter().sum();
if value != 1.0f32 {
Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("{holder:?} distribution is not normalized, values sum {value} != 1.0"),
)))
} else {
Ok(())
}
}

View File

@ -0,0 +1,87 @@
use crate::node::{ComputeNode, Decision, Node};
use crate::warding::{SimulationState, SimulationWard};
use serde::de::Error;
use serde::{Deserialize, Deserializer};
#[derive(Debug, Deserialize)]
pub struct ConvergedWard {
#[serde(deserialize_with = "deserialize_normalized_value")]
ratio: f32,
}
impl ConvergedWard {
pub fn converged(&self, len: usize, decisions: impl Iterator<Item = Decision>) -> bool {
let total_decided = decisions
.filter(|decision| matches!(decision, Decision::Decided(_)))
.count();
(total_decided as f32 / len as f32) >= self.ratio
}
}
impl SimulationWard for ConvergedWard {
type SimulationState = SimulationState;
fn analyze(&mut self, state: &Self::SimulationState) -> bool {
let nodes = state.nodes.read().expect("Read access to nodes vec");
self.converged(nodes.len(), nodes.iter().map(Node::decision))
}
}
// TODO: Probably a good idea to have a serde_utils crate
fn deserialize_normalized_value<'de, D>(deserializer: D) -> Result<f32, D::Error>
where
D: Deserializer<'de>,
{
let value = f32::deserialize(deserializer)?;
(0f32..=1f32)
.contains(&value)
.then_some(value)
.ok_or_else(|| {
D::Error::custom(&format!(
"Only normalized values [0.0, 1.0] are valid, got: {}",
value
))
})
}
#[cfg(test)]
mod tests {
use crate::node::NoTx;
use crate::warding::converged::ConvergedWard;
use claro::{Decision, Opinion};
#[test]
fn converge_full() {
let decisions = vec![
Decision::Decided(Opinion::Yes(NoTx)),
Decision::Decided(Opinion::Yes(NoTx)),
];
let ward = ConvergedWard { ratio: 1.0 };
assert!(ward.converged(2, decisions.into_iter()));
}
#[test]
fn converge_ratio() {
let decisions = vec![
Decision::Decided(Opinion::Yes(NoTx)),
Decision::Decided(Opinion::Yes(NoTx)),
Decision::Undecided(Opinion::Yes(NoTx)),
];
let ward = ConvergedWard { ratio: 0.5 };
assert!(ward.converged(2, decisions.into_iter()));
}
#[test]
fn not_converge() {
let decisions = vec![
Decision::Decided(Opinion::Yes(NoTx)),
Decision::Undecided(Opinion::Yes(NoTx)),
];
let ward = ConvergedWard { ratio: 1.0 };
assert!(!ward.converged(2, decisions.into_iter()));
}
}

View File

@ -0,0 +1,52 @@
use crate::node::{NetworkState, Node};
use serde::Deserialize;
use std::sync::{Arc, RwLock};
mod converged;
mod stabilised;
mod ttf;
pub struct SimulationState {
pub network_state: NetworkState,
pub nodes: Arc<RwLock<Vec<Node>>>,
pub iteration: usize,
pub round: usize,
}
/// A ward is a computation over the `NetworkState`, it must return true if the state satisfies
/// the warding conditions. It is used to stop the consensus simulation if such condition is reached.
pub trait SimulationWard {
type SimulationState;
fn analyze(&mut self, state: &Self::SimulationState) -> bool;
}
/// Ward dispatcher
/// Enum to avoid Boxing (Box<dyn SimulationWard>) wards.
#[derive(Debug, Deserialize)]
pub enum Ward {
#[serde(rename = "time_to_finality")]
Ttf(ttf::TimeToFinalityWard),
#[serde(rename = "stabilised")]
Stabilised(stabilised::StabilisedWard),
#[serde(rename = "converged")]
Converged(converged::ConvergedWard),
}
impl Ward {
pub fn simulation_ward_mut(
&mut self,
) -> &mut dyn SimulationWard<SimulationState = SimulationState> {
match self {
Ward::Ttf(ward) => ward,
Ward::Stabilised(stabilised) => stabilised,
Ward::Converged(converged) => converged,
}
}
}
impl SimulationWard for Ward {
type SimulationState = SimulationState;
fn analyze(&mut self, state: &Self::SimulationState) -> bool {
self.simulation_ward_mut().analyze(state)
}
}

View File

@ -0,0 +1,148 @@
// std
use std::collections::HashSet;
// crates
use fixed_slice_deque::FixedSliceDeque;
use serde::{Deserialize, Deserializer};
// internal
use crate::node::{NetworkState, Vote};
use crate::warding::{SimulationState, SimulationWard};
#[derive(Debug, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum StabilisedCheck {
Iterations {
chunk: usize,
},
Rounds {
#[serde(default, skip_deserializing)]
last_round: usize,
},
}
impl StabilisedCheck {
pub fn should_check(&mut self, state: &SimulationState) -> bool {
match self {
StabilisedCheck::Iterations { chunk } => (state.iteration % *chunk) == 0,
StabilisedCheck::Rounds { last_round } => {
let different_round = *last_round < state.round;
*last_round = state.round;
different_round
}
}
}
}
#[derive(Debug, Deserialize)]
pub struct StabilisedWard {
#[serde(deserialize_with = "deserialize_fixed_slice_from_usize")]
buffer: FixedSliceDeque<(usize, usize)>,
check: StabilisedCheck,
}
impl StabilisedWard {
fn is_stabilised(&self) -> bool {
if self.buffer.is_full() {
let set: HashSet<_> = self.buffer.iter().copied().collect();
return set.len() == 1;
}
false
}
fn count_state(network_state: NetworkState) -> (usize, usize) {
network_state
.read()
.unwrap()
.iter()
.fold((0, 0), |count @ (yes, no), vote| match vote {
None => count,
Some(Vote::Yes(_)) => (yes + 1, no),
Some(Vote::No(_)) => (yes, no + 1),
})
}
}
impl SimulationWard for StabilisedWard {
type SimulationState = SimulationState;
fn analyze(&mut self, state: &Self::SimulationState) -> bool {
if !self.check.should_check(state) {
return false;
}
self.buffer
.push_back(StabilisedWard::count_state(state.network_state.clone()));
self.is_stabilised()
}
}
fn deserialize_fixed_slice_from_usize<'d, T, D: Deserializer<'d>>(
d: D,
) -> Result<FixedSliceDeque<T>, D::Error> {
let value = usize::deserialize(d)?;
Ok(FixedSliceDeque::new(value))
}
#[cfg(test)]
mod tests {
use crate::node::{NoTx, Vote};
use crate::warding::stabilised::{StabilisedCheck, StabilisedWard};
use crate::warding::{SimulationState, SimulationWard};
use fixed_slice_deque::FixedSliceDeque;
use std::sync::{Arc, RwLock};
#[test]
fn check_rounds() {
let mut ward = StabilisedWard {
buffer: FixedSliceDeque::new(2),
check: StabilisedCheck::Rounds { last_round: 0 },
};
let mut simulation_state = SimulationState {
network_state: Arc::new(RwLock::new(vec![Some(Vote::Yes(NoTx))])),
nodes: Arc::new(RwLock::new(vec![])),
iteration: 0,
round: 0,
};
for i in 0..2 {
simulation_state.round = i;
assert!(!ward.analyze(&simulation_state));
}
simulation_state.round = 3;
assert!(ward.analyze(&simulation_state));
}
#[test]
fn check_iterations() {
let mut ward = StabilisedWard {
buffer: FixedSliceDeque::new(2),
check: StabilisedCheck::Iterations { chunk: 3 },
};
let mut simulation_state = SimulationState {
network_state: Arc::new(RwLock::new(vec![Some(Vote::Yes(NoTx))])),
nodes: Arc::new(RwLock::new(vec![])),
iteration: 0,
round: 0,
};
for i in 0..3 {
simulation_state.iteration = i;
assert!(!ward.analyze(&simulation_state));
}
simulation_state.iteration = 3;
assert!(ward.analyze(&simulation_state));
}
#[test]
fn deserialize() {
let rounds = r#"{ "buffer" : 3, "check" : { "type": "rounds" } }"#;
let iterations = r#"{ "buffer" : 3, "check" : { "type": "iterations", "chunk": 100 } }"#;
for s in [rounds, iterations] {
let ward: StabilisedWard =
serde_json::from_str(s).expect("Should deserialize correctly");
assert_eq!(ward.buffer.capacity(), 3);
}
}
}

View File

@ -0,0 +1,42 @@
use crate::warding::{SimulationState, SimulationWard};
use serde::Deserialize;
/// Time to finality ward. It monitors the amount of rounds of the simulations, triggers when surpassing
/// the set threshold.
#[derive(Debug, Deserialize, Copy, Clone)]
pub struct TimeToFinalityWard {
ttf_threshold: usize,
}
impl SimulationWard for TimeToFinalityWard {
type SimulationState = SimulationState;
fn analyze(&mut self, state: &SimulationState) -> bool {
state.round > self.ttf_threshold
}
}
#[cfg(test)]
mod test {
use crate::node::NetworkState;
use crate::warding::ttf::TimeToFinalityWard;
use crate::warding::{SimulationState, SimulationWard};
use std::sync::{Arc, RwLock};
#[test]
fn rebase_threshold() {
let network_state = NetworkState::new(RwLock::new(vec![]));
let mut ttf = TimeToFinalityWard { ttf_threshold: 10 };
let mut cond = false;
let mut state = SimulationState {
network_state,
nodes: Arc::new(Default::default()),
iteration: 0,
round: 0,
};
for _ in 0..11 {
state.round += 1;
cond = ttf.analyze(&state);
}
assert!(cond);
}
}