diff --git a/consensus-engine/src/lib.rs b/consensus-engine/src/lib.rs index 001a7739..37b8a5bc 100644 --- a/consensus-engine/src/lib.rs +++ b/consensus-engine/src/lib.rs @@ -32,6 +32,11 @@ impl Carnot { pub fn current_view(&self) -> View { self.current_view } + + pub fn highest_voted_view(&self) -> View { + self.highest_voted_view + } + /// Upon reception of a block /// /// Preconditions: @@ -71,7 +76,6 @@ impl Carnot { return Err(()); } let mut new_state = self.clone(); - if new_state.block_is_safe(block.clone()) { new_state.safe_blocks.insert(block.id, block.clone()); new_state.update_high_qc(block.parent_qc); @@ -361,6 +365,10 @@ impl Carnot { self.overlay.is_member_of_root_committee(self.id) } + pub fn overlay(&self) -> &O { + &self.overlay + } + /// A way to allow for overlay extendability without compromising the engine /// generality. pub fn update_overlay(&self, f: F) -> Result diff --git a/consensus-engine/src/overlay/flat_overlay.rs b/consensus-engine/src/overlay/flat_overlay.rs index e687d4d4..ad925bba 100644 --- a/consensus-engine/src/overlay/flat_overlay.rs +++ b/consensus-engine/src/overlay/flat_overlay.rs @@ -6,7 +6,6 @@ use serde::{Deserialize, Serialize}; /// Flat overlay with a single committee and round robin leader selection. pub struct FlatOverlay { nodes: Vec, - leader: L, } diff --git a/consensus-engine/src/types.rs b/consensus-engine/src/types.rs index 26e6c1bc..abd5866a 100644 --- a/consensus-engine/src/types.rs +++ b/consensus-engine/src/types.rs @@ -82,6 +82,15 @@ impl Block { pub fn parent(&self) -> BlockId { self.parent_qc.block() } + + pub fn genesis() -> Self { + Self { + id: [0; 32], + view: 0, + parent_qc: Qc::Standard(StandardQc::genesis()), + leader_proof: LeaderProof::LeaderId { leader_id: [0; 32] }, + } + } } /// Possible output events. diff --git a/nomos-core/src/block.rs b/nomos-core/src/block.rs index 43423021..7962cc0e 100644 --- a/nomos-core/src/block.rs +++ b/nomos-core/src/block.rs @@ -33,7 +33,7 @@ impl Block { ) -> Self { let transactions = txs.collect(); let header = consensus_engine::Block { - id: [view as u8; 32], + id: [0; 32], view, parent_qc, leader_proof: LeaderProof::LeaderId { @@ -46,7 +46,7 @@ impl Block { transactions, beacon, }; - let id = id_from_wire_content(&s.as_bytes()); + let id = block_id_from_wire_content(&s); s.header.id = id; s } @@ -66,9 +66,12 @@ impl Block { } } -fn id_from_wire_content(bytes: &[u8]) -> consensus_engine::BlockId { +pub fn block_id_from_wire_content( + block: &Block, +) -> consensus_engine::BlockId { use blake2::digest::{consts::U32, Digest}; use blake2::Blake2b; + let bytes = block.as_bytes(); let mut hasher = Blake2b::::new(); hasher.update(bytes); hasher.finalize().into() @@ -82,7 +85,7 @@ impl Block { pub fn from_bytes(bytes: &[u8]) -> Self { let mut result: Self = wire::deserialize(bytes).unwrap(); - result.header.id = id_from_wire_content(bytes); + result.header.id = block_id_from_wire_content(&result); result } } diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index e9eebc80..240c7dfd 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -1,4 +1,4 @@ -mod leader_selection; +pub mod leader_selection; pub mod network; mod tally; mod task_manager; @@ -298,7 +298,7 @@ where } } -#[allow(dead_code)] // TODO: remove this when using broadcasting events +#[derive(Debug)] enum Output { Send(consensus_engine::Send), BroadcastTimeoutQc { timeout_qc: TimeoutQc }, @@ -716,7 +716,7 @@ where } } -pub(crate) enum Event { +enum Event { Proposal { block: Block, stream: Pin> + Send>>, diff --git a/nomos-services/consensus/src/network/messages.rs b/nomos-services/consensus/src/network/messages.rs index 73a1c9ff..5733c114 100644 --- a/nomos-services/consensus/src/network/messages.rs +++ b/nomos-services/consensus/src/network/messages.rs @@ -6,7 +6,7 @@ use crate::NodeId; use consensus_engine::{BlockId, NewView, Qc, Timeout, TimeoutQc, View, Vote}; use nomos_core::wire; -#[derive(Clone, Serialize, Deserialize, Debug)] +#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] pub struct ProposalChunkMsg { pub chunk: Box<[u8]>, pub proposal: BlockId, @@ -39,7 +39,7 @@ impl VoteMsg { } } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Hash, PartialEq, Eq, Clone)] pub struct NewViewMsg { pub voter: NodeId, pub vote: NewView, @@ -54,7 +54,7 @@ impl NewViewMsg { } } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Hash, PartialEq, Eq, Clone)] pub struct TimeoutMsg { pub voter: NodeId, pub vote: Timeout, @@ -69,7 +69,7 @@ impl TimeoutMsg { } } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Clone)] pub struct TimeoutQcMsg { pub source: NodeId, pub qc: TimeoutQc, diff --git a/simulations/Cargo.toml b/simulations/Cargo.toml index 68946543..ae66cbad 100644 --- a/simulations/Cargo.toml +++ b/simulations/Cargo.toml @@ -3,16 +3,27 @@ name = "simulations" version = "0.1.0" edition = "2021" +[[bin]] +name = "simulation" +path = "src/bin/app.rs" + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] anyhow = "1" arc-swap = "1.6" +bls-signatures = "0.14" clap = { version = "4", features = ["derive"] } +chrono = { version = "0.4", features = ["serde"] } crc32fast = "1.3" crossbeam = { version = "0.8.2", features = ["crossbeam-channel"] } +consensus-engine = { path = "../consensus-engine" } fixed-slice-deque = "0.1.0-beta2" +futures = "0.3" +humantime-serde = "1" nomos-core = { path = "../nomos-core" } +nomos-consensus = { path = "../nomos-services/consensus" } +once_cell = "1.17" parking_lot = "0.12" polars = { version = "0.27", features = ["serde", "object", "json", "csv-file", "parquet", "dtype-struct"] } rand = { version = "0.8", features = ["small_rng"] } @@ -22,6 +33,8 @@ serde = { version = "1.0", features = ["derive", "rc"] } serde_with = "2.3" serde_json = "1.0" thiserror = "1" +tracing = { version = "0.1", default-features = false, features = ["log", "attributes"] } +tracing-subscriber = { version = "0.3", features = ["env-filter", "tracing-log"]} [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = { version = "0.2", features = ["js"] } diff --git a/simulations/config/carnot.json b/simulations/config/carnot.json new file mode 100644 index 00000000..da92b5cf --- /dev/null +++ b/simulations/config/carnot.json @@ -0,0 +1,60 @@ +{ + "network_settings": { + "network_behaviors": { + "north america:north america": { + "secs": 1, + "nanos": 0 + }, + "north america:europe": { + "secs": 1, + "nanos": 0 + }, + "north america:asia": { + "secs": 1, + "nanos": 0 + }, + "europe:europe": { + "secs": 1, + "nanos": 0 + }, + "europe:asia": { + "secs": 1, + "nanos": 0 + }, + "europe:north america": { + "secs": 1, + "nanos": 0 + }, + "asia:north america": { + "secs": 1, + "nanos": 0 + }, + "asia:europe": { + "secs": 1, + "nanos": 0 + }, + "asia:asia": { + "secs": 1, + "nanos": 0 + } + }, + "regions": { + "north america": 0.4, + "europe": 0.4, + "asia": 0.3 + } + }, + "overlay_settings": "Flat", + "node_settings": { + "seed": 0, + "timeout": "1000ms" + }, + "runner_settings": "Sync", + "stream_settings": { + "format": "json" + }, + "node_count": 3, + "views_count": 3, + "leaders_count": 1, + "seed": 0 +} \ No newline at end of file diff --git a/simulations/src/bin/app.rs b/simulations/src/bin/app.rs index eab07267..e24baf71 100644 --- a/simulations/src/bin/app.rs +++ b/simulations/src/bin/app.rs @@ -1,9 +1,7 @@ // std use anyhow::Ok; use serde::Serialize; -use simulations::streaming::io::IOSubscriber; -use simulations::streaming::naive::NaiveSubscriber; -use simulations::streaming::polars::PolarsSubscriber; +use simulations::node::carnot::CarnotSettings; use std::collections::BTreeMap; use std::fs::File; use std::path::{Path, PathBuf}; @@ -11,6 +9,8 @@ use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; // crates use clap::Parser; +use consensus_engine::overlay::{FlatOverlay, RandomBeaconState, RoundRobin}; +use consensus_engine::Block; use crossbeam::channel; use parking_lot::RwLock; use rand::rngs::SmallRng; @@ -22,12 +22,15 @@ use simulations::network::regions::{create_regions, RegionsData}; use simulations::network::{InMemoryNetworkInterface, Network}; use simulations::node::dummy::DummyNode; use simulations::node::{Node, NodeId, OverlayState, ViewOverlay}; -use simulations::overlay::{create_overlay, Overlay, SimulationOverlay}; -use simulations::streaming::StreamType; +use simulations::overlay::{create_overlay, SimulationOverlay}; +use simulations::streaming::{ + io::IOSubscriber, naive::NaiveSubscriber, polars::PolarsSubscriber, + runtime_subscriber::RuntimeSubscriber, settings_subscriber::SettingsSubscriber, StreamType, +}; // internal use simulations::{ node::carnot::CarnotNode, output_processors::OutData, runner::SimulationRunner, - settings::SimulationSettings, + settings::SimulationSettings, util::node_id, }; /// Main simulation wrapper @@ -38,7 +41,7 @@ pub struct SimulationApp { #[clap(long, short)] input_settings: PathBuf, #[clap(long)] - stream_type: StreamType, + stream_type: Option, } impl SimulationApp { @@ -56,9 +59,7 @@ impl SimulationApp { .as_secs() }); let mut rng = SmallRng::seed_from_u64(seed); - let mut node_ids: Vec = (0..simulation_settings.node_count) - .map(Into::into) - .collect(); + let mut node_ids: Vec = (0..simulation_settings.node_count).map(node_id).collect(); node_ids.shuffle(&mut rng); let regions = create_regions(&node_ids, &mut rng, &simulation_settings.network_settings); @@ -79,17 +80,52 @@ impl SimulationApp { overlays, })); - let mut network = Network::new(regions_data); - match &simulation_settings.node_settings { - simulations::settings::NodeSettings::Carnot => { + simulations::settings::NodeSettings::Carnot { timeout } => { + let ids = node_ids.clone(); + let mut network = Network::new(regions_data); let nodes = node_ids .iter() - .map(|node_id| CarnotNode::new(*node_id)) + .copied() + .map(|node_id| { + let (node_message_sender, node_message_receiver) = channel::unbounded(); + let network_message_receiver = + network.connect(node_id, node_message_receiver); + let network_interface = InMemoryNetworkInterface::new( + node_id, + node_message_sender, + network_message_receiver, + ); + let nodes: Vec = ids.clone().into_iter().map(Into::into).collect(); + let leader = nodes.first().copied().unwrap(); + let overlay_settings = consensus_engine::overlay::Settings { + nodes: nodes.to_vec(), + leader: RoundRobin::new(), + }; + // FIXME: Actually use a proposer and a key to generate random beacon state + let genesis = nomos_core::block::Block::new( + 0, + Block::genesis().parent_qc, + [].into_iter(), + leader, + RandomBeaconState::Sad { + entropy: Box::new([0; 32]), + }, + ); + CarnotNode::>::new( + node_id, + CarnotSettings::new(nodes, *timeout), + overlay_settings, + genesis, + network_interface, + &mut rng, + ) + }) .collect(); run(network, nodes, simulation_settings, stream_type)?; } simulations::settings::NodeSettings::Dummy => { + let mut network = Network::new(regions_data); let nodes = node_ids .iter() .map(|node_id| { @@ -115,7 +151,7 @@ fn run( network: Network, nodes: Vec, settings: SimulationSettings, - stream_type: StreamType, + stream_type: Option, ) -> anyhow::Result<()> where M: Clone + Send + Sync + 'static, @@ -125,30 +161,47 @@ where { let stream_settings = settings.stream_settings.clone(); let runner = - SimulationRunner::<_, _, OutData>::new(network, nodes, Default::default(), settings); + SimulationRunner::<_, _, OutData>::new(network, nodes, Default::default(), settings)?; macro_rules! bail { ($settings: ident, $sub: ident) => { let handle = runner.simulate()?; - let mut sub_handle = handle.subscribe::<$sub>($settings)?; - std::thread::spawn(move || { - sub_handle.run(); + let mut data_subscriber_handle = handle.subscribe::<$sub>($settings)?; + let mut runtime_subscriber_handle = + handle.subscribe::>(Default::default())?; + let mut settings_subscriber_handle = + handle.subscribe::>(Default::default())?; + std::thread::scope(|s| { + s.spawn(move || { + data_subscriber_handle.run(); + }); + + s.spawn(move || { + runtime_subscriber_handle.run(); + }); + + s.spawn(move || { + settings_subscriber_handle.run(); + }); }); handle.join()?; }; } match stream_type { - StreamType::Naive => { + Some(StreamType::Naive) => { let settings = stream_settings.unwrap_naive(); bail!(settings, NaiveSubscriber); } - StreamType::IO => { + Some(StreamType::IO) => { let settings = stream_settings.unwrap_io(); bail!(settings, IOSubscriber); } - StreamType::Polars => { + Some(StreamType::Polars) => { let settings = stream_settings.unwrap_polars(); bail!(settings, PolarsSubscriber); } + None => { + runner.simulate()?.join()?; + } }; Ok(()) } @@ -162,27 +215,34 @@ fn load_json_from_file(path: &Path) -> anyhow::Result { // Helper method to pregenerate views. // TODO: Remove once shared overlay can generate new views on demand. fn generate_overlays( - node_ids: &[NodeId], - overlay: &SimulationOverlay, - overlay_count: usize, - leader_count: usize, - rng: &mut R, + _node_ids: &[NodeId], + _overlay: &SimulationOverlay, + _overlay_count: usize, + _leader_count: usize, + _rng: &mut R, ) -> BTreeMap { - (0..overlay_count) - .map(|view_id| { - ( - view_id, - ViewOverlay { - leaders: overlay.leaders(node_ids, leader_count, rng).collect(), - layout: overlay.layout(node_ids, rng), - }, - ) - }) - .collect() + // TODO: This call needs to be removed + Default::default() } fn main() -> anyhow::Result<()> { + let filter = std::env::var("SIMULATION_LOG").unwrap_or_else(|_| "info".to_owned()); + let subscriber = tracing_subscriber::fmt::fmt() + .without_time() + .with_line_number(true) + .with_env_filter(filter) + .with_file(false) + .with_target(true) + .with_ansi(true) + .finish(); + tracing::subscriber::set_global_default(subscriber) + .expect("config_tracing is only called once"); + let app: SimulationApp = SimulationApp::parse(); - app.run()?; + + if let Err(e) = app.run() { + tracing::error!("Error: {}", e); + std::process::exit(1); + } Ok(()) } diff --git a/simulations/src/lib.rs b/simulations/src/lib.rs index 59052263..194934d7 100644 --- a/simulations/src/lib.rs +++ b/simulations/src/lib.rs @@ -6,3 +6,7 @@ pub mod runner; pub mod settings; pub mod streaming; pub mod warding; + +pub mod util; +static START_TIME: once_cell::sync::Lazy = + once_cell::sync::Lazy::new(std::time::Instant::now); diff --git a/simulations/src/network/behaviour.rs b/simulations/src/network/behaviour.rs index a0606564..20faad1a 100644 --- a/simulations/src/network/behaviour.rs +++ b/simulations/src/network/behaviour.rs @@ -4,7 +4,7 @@ use std::{collections::HashMap, time::Duration}; use rand::Rng; use serde::{Deserialize, Serialize}; -use super::{regions::Region, NetworkSettings}; +use super::{NetworkBehaviourKey, NetworkSettings}; // internal #[derive(Default, Debug, Clone, Serialize, Deserialize)] @@ -31,10 +31,10 @@ impl NetworkBehaviour { // network behaviors for pairs of NodeIds. pub fn create_behaviours( network_settings: &NetworkSettings, -) -> HashMap<(Region, Region), NetworkBehaviour> { +) -> HashMap { network_settings .network_behaviors .iter() - .map(|((a, b), d)| ((*a, *b), NetworkBehaviour::new(*d, 0.0))) + .map(|(k, d)| (*k, NetworkBehaviour::new(*d, 0.0))) .collect() } diff --git a/simulations/src/network/mod.rs b/simulations/src/network/mod.rs index 943f9c26..52e8e163 100644 --- a/simulations/src/network/mod.rs +++ b/simulations/src/network/mod.rs @@ -2,13 +2,14 @@ use std::{ collections::HashMap, ops::Add, + str::FromStr, time::{Duration, Instant}, }; // crates use crossbeam::channel::{self, Receiver, Sender}; use rand::{rngs::ThreadRng, Rng}; use rayon::prelude::*; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; // internal use crate::node::NodeId; @@ -17,9 +18,45 @@ pub mod regions; type NetworkTime = Instant; -#[derive(Clone, Debug, Deserialize, Default)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct NetworkBehaviourKey { + pub from: regions::Region, + pub to: regions::Region, +} + +impl NetworkBehaviourKey { + pub fn new(from: regions::Region, to: regions::Region) -> Self { + Self { from, to } + } +} + +impl Serialize for NetworkBehaviourKey { + fn serialize(&self, serializer: S) -> Result { + let s = format!("{}:{}", self.from, self.to); + serializer.serialize_str(&s) + } +} + +impl<'de> Deserialize<'de> for NetworkBehaviourKey { + fn deserialize>(deserializer: D) -> Result { + let s = String::deserialize(deserializer)?; + let mut split = s.split(':'); + let from = split.next().ok_or(serde::de::Error::custom( + "NetworkBehaviourKey should be in the form of `from_region:to_region`", + ))?; + let to = split.next().ok_or(serde::de::Error::custom( + "NetworkBehaviourKey should be in the form of `from_region:to_region`", + ))?; + Ok(Self::new( + regions::Region::from_str(from).map_err(serde::de::Error::custom)?, + regions::Region::from_str(to).map_err(serde::de::Error::custom)?, + )) + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct NetworkSettings { - pub network_behaviors: HashMap<(regions::Region, regions::Region), Duration>, + pub network_behaviors: HashMap, /// Represents node distribution in the simulated regions. /// The sum of distributions should be 1. pub regions: HashMap, @@ -193,7 +230,7 @@ mod tests { regions::{Region, RegionsData}, Network, NetworkInterface, NetworkMessage, }; - use crate::node::NodeId; + use crate::{network::NetworkBehaviourKey, node::NodeId, util::node_id}; use crossbeam::channel::{self, Receiver, Sender}; use std::{collections::HashMap, time::Duration}; @@ -232,12 +269,12 @@ mod tests { #[test] fn send_receive_messages() { - let node_a = 0.into(); - let node_b = 1.into(); + let node_a = node_id(0); + let node_b = node_id(1); let regions = HashMap::from([(Region::Europe, vec![node_a, node_b])]); let behaviour = HashMap::from([( - (Region::Europe, Region::Europe), + NetworkBehaviourKey::new(Region::Europe, Region::Europe), NetworkBehaviour::new(Duration::from_millis(100), 0.0), )]); let regions_data = RegionsData::new(regions, behaviour); @@ -281,9 +318,9 @@ mod tests { #[test] fn regions_send_receive_messages() { - let node_a = 0.into(); - let node_b = 1.into(); - let node_c = 2.into(); + let node_a = node_id(0); + let node_b = node_id(1); + let node_c = node_id(2); let regions = HashMap::from([ (Region::Asia, vec![node_a, node_b]), @@ -291,15 +328,15 @@ mod tests { ]); let behaviour = HashMap::from([ ( - (Region::Asia, Region::Asia), + NetworkBehaviourKey::new(Region::Asia, Region::Asia), NetworkBehaviour::new(Duration::from_millis(100), 0.0), ), ( - (Region::Asia, Region::Europe), + NetworkBehaviourKey::new(Region::Asia, Region::Europe), NetworkBehaviour::new(Duration::from_millis(500), 0.0), ), ( - (Region::Europe, Region::Europe), + NetworkBehaviourKey::new(Region::Europe, Region::Europe), NetworkBehaviour::new(Duration::from_millis(100), 0.0), ), ]); diff --git a/simulations/src/network/regions.rs b/simulations/src/network/regions.rs index 887f93a9..ee4d5b98 100644 --- a/simulations/src/network/regions.rs +++ b/simulations/src/network/regions.rs @@ -1,14 +1,14 @@ // std use rand::{seq::SliceRandom, Rng}; -use std::collections::HashMap; +use std::{collections::HashMap, str::FromStr}; // crates use serde::{Deserialize, Serialize}; // internal use crate::{network::behaviour::NetworkBehaviour, node::NodeId}; -use super::NetworkSettings; +use super::{NetworkBehaviourKey, NetworkSettings}; -#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)] +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] pub enum Region { NorthAmerica, Europe, @@ -18,18 +18,74 @@ pub enum Region { Australia, } +impl core::fmt::Display for Region { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let s = match self { + Self::NorthAmerica => "NorthAmerica", + Self::Europe => "Europe", + Self::Asia => "Asia", + Self::Africa => "Africa", + Self::SouthAmerica => "SouthAmerica", + Self::Australia => "Australia", + }; + write!(f, "{s}") + } +} + +impl FromStr for Region { + type Err = String; + + fn from_str(s: &str) -> Result { + match s + .trim() + .to_lowercase() + .replace(['-', '_', ' '], "") + .as_str() + { + "northamerica" | "na" => Ok(Self::NorthAmerica), + "europe" | "eu" => Ok(Self::Europe), + "asia" | "as" => Ok(Self::Asia), + "africa" | "af" => Ok(Self::Africa), + "southamerica" | "sa" => Ok(Self::SouthAmerica), + "australia" | "au" => Ok(Self::Australia), + _ => Err(format!("Unknown region: {s}")), + } + } +} + +impl Serialize for Region { + fn serialize(&self, serializer: S) -> Result { + let s = match self { + Self::NorthAmerica => "North America", + Self::Europe => "Europe", + Self::Asia => "Asia", + Self::Africa => "Africa", + Self::SouthAmerica => "South America", + Self::Australia => "Australia", + }; + serializer.serialize_str(s) + } +} + +impl<'de> Deserialize<'de> for Region { + fn deserialize>(deserializer: D) -> Result { + let s = String::deserialize(deserializer)?; + Self::from_str(&s).map_err(serde::de::Error::custom) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RegionsData { pub regions: HashMap>, #[serde(skip)] pub node_region: HashMap, - pub region_network_behaviour: HashMap<(Region, Region), NetworkBehaviour>, + pub region_network_behaviour: HashMap, } impl RegionsData { pub fn new( regions: HashMap>, - region_network_behaviour: HashMap<(Region, Region), NetworkBehaviour>, + region_network_behaviour: HashMap, ) -> Self { let node_region = regions .iter() @@ -49,9 +105,11 @@ impl RegionsData { pub fn network_behaviour(&self, node_a: NodeId, node_b: NodeId) -> &NetworkBehaviour { let region_a = self.node_region[&node_a]; let region_b = self.node_region[&node_b]; + let k = NetworkBehaviourKey::new(region_a, region_b); + let k_rev = NetworkBehaviourKey::new(region_b, region_a); self.region_network_behaviour - .get(&(region_a, region_b)) - .or(self.region_network_behaviour.get(&(region_b, region_a))) + .get(&k) + .or(self.region_network_behaviour.get(&k_rev)) .expect("Network behaviour not found for the given regions") } @@ -106,6 +164,7 @@ mod tests { NetworkSettings, }, node::NodeId, + util::node_id, }; #[test] @@ -144,9 +203,7 @@ mod tests { let mut rng = StepRng::new(1, 0); for tcase in test_cases.iter() { - let nodes = (0..tcase.node_count) - .map(Into::into) - .collect::>(); + let nodes = (0..tcase.node_count).map(node_id).collect::>(); let available_regions = vec![ Region::NorthAmerica, diff --git a/simulations/src/node/carnot/event_builder.rs b/simulations/src/node/carnot/event_builder.rs new file mode 100644 index 00000000..a170afea --- /dev/null +++ b/simulations/src/node/carnot/event_builder.rs @@ -0,0 +1,226 @@ +use crate::node::carnot::messages::CarnotMessage; +use crate::util::parse_idx; +use consensus_engine::{Carnot, NewView, Overlay, Qc, StandardQc, Timeout, TimeoutQc, View, Vote}; +use nomos_consensus::network::messages::{NewViewMsg, TimeoutMsg, VoteMsg}; +use nomos_consensus::NodeId; +use nomos_core::block::Block; +use std::collections::{HashMap, HashSet}; +use std::hash::Hash; + +pub type CarnotTx = [u8; 32]; + +pub(crate) struct EventBuilder { + id: NodeId, + leader_vote_message: Tally, + vote_message: Tally, + timeout_message: Tally, + new_view_message: Tally, + pub(crate) current_view: View, +} + +impl EventBuilder { + pub fn new(id: NodeId) -> Self { + Self { + vote_message: Default::default(), + leader_vote_message: Default::default(), + timeout_message: Default::default(), + new_view_message: Default::default(), + current_view: View::default(), + id, + } + } + + pub fn step( + &mut self, + messages: Vec, + engine: &Carnot, + ) -> Vec> { + let mut events = Vec::new(); + // only run when the engine is in the genesis view + if engine.highest_voted_view() == -1 + && engine.overlay().is_member_of_leaf_committee(self.id) + { + tracing::info!(node = parse_idx(&self.id), "voting genesis",); + let genesis = engine.genesis_block(); + events.push(Event::Approve { + qc: Qc::Standard(StandardQc { + view: genesis.view, + id: genesis.id, + }), + block: genesis, + votes: HashSet::new(), + }) + } + + for message in messages { + match message { + CarnotMessage::Proposal(msg) => { + let block = Block::from_bytes(&msg.chunk); + tracing::info!( + node=parse_idx(&self.id), + current_view = engine.current_view(), + block_view=block.header().view, + block=?block.header().id, + parent_block=?block.header().parent(), + "receive proposal message", + ); + events.push(Event::Proposal { block }) + } + CarnotMessage::TimeoutQc(msg) => { + events.push(Event::TimeoutQc { timeout_qc: msg.qc }); + } + CarnotMessage::Vote(msg) => { + let msg_view = msg.vote.view; + let block_id = msg.vote.block; + let voter = msg.voter; + let is_next_view_leader = engine.is_next_leader(); + let is_message_from_root_committee = + engine.overlay().is_member_of_root_committee(voter); + + let tally = if is_message_from_root_committee { + &mut self.leader_vote_message + } else { + &mut self.vote_message + }; + + let Some(qc) = msg.qc.clone() else { + tracing::warn!(node=?parse_idx(&self.id), current_view = engine.current_view(), "received vote without QC"); + continue; + }; + + // if the message comes from the root committee, then use the leader threshold, otherwise use the leaf threshold + let threshold = if is_message_from_root_committee { + engine.leader_super_majority_threshold() + } else { + engine.super_majority_threshold() + }; + + if let Some(votes) = tally.tally_by(msg_view, msg, threshold) { + if let Some(block) = engine + .blocks_in_view(msg_view) + .iter() + .find(|block| block.id == block_id) + .cloned() + { + tracing::info!( + node=parse_idx(&self.id), + votes=votes.len(), + current_view = engine.current_view(), + block_view=block.view, + block=?block.id, + "approve block", + ); + + if is_next_view_leader && is_message_from_root_committee { + events.push(Event::ProposeBlock { + qc: Qc::Standard(StandardQc { + view: block.view, + id: block.id, + }), + }); + } else { + events.push(Event::Approve { + qc, + block, + votes: votes.into_iter().map(|v| v.vote).collect(), + }); + } + } + } + } + CarnotMessage::Timeout(msg) => { + let msg_view = msg.vote.view; + if let Some(timeouts) = self.timeout_message.tally(msg_view, msg) { + events.push(Event::RootTimeout { + timeouts: timeouts.into_iter().map(|v| v.vote).collect(), + }) + } + } + CarnotMessage::NewView(msg) => { + let msg_view = msg.vote.view; + let timeout_qc = msg.vote.timeout_qc.clone(); + self.current_view = core::cmp::max(self.current_view, msg_view); + // if we are the leader, then use the leader threshold, otherwise use the leaf threshold + let threshold = if engine.is_next_leader() { + engine.leader_super_majority_threshold() + } else { + engine.super_majority_threshold() + }; + + if let Some(new_views) = + self.new_view_message.tally_by(msg_view, msg, threshold) + { + events.push(Event::NewView { + new_views: new_views.into_iter().map(|v| v.vote).collect(), + timeout_qc, + }) + } + } + } + } + + events + } +} + +struct Tally { + cache: HashMap>, + threshold: usize, +} + +impl Default for Tally { + fn default() -> Self { + Self::new(0) + } +} + +impl Tally { + fn new(threshold: usize) -> Self { + Self { + cache: Default::default(), + threshold, + } + } + + fn tally(&mut self, view: View, message: T) -> Option> { + self.tally_by(view, message, self.threshold) + } + + fn tally_by(&mut self, view: View, message: T, threshold: usize) -> Option> { + let entries = self.cache.entry(view).or_default(); + entries.insert(message); + let entries = entries.len(); + if entries == threshold { + Some(self.cache.remove(&view).unwrap()) + } else { + None + } + } +} + +pub enum Event { + Proposal { + block: Block, + }, + #[allow(dead_code)] + Approve { + qc: Qc, + block: consensus_engine::Block, + votes: HashSet, + }, + ProposeBlock { + qc: Qc, + }, + LocalTimeout, + NewView { + timeout_qc: TimeoutQc, + new_views: HashSet, + }, + TimeoutQc { + timeout_qc: TimeoutQc, + }, + RootTimeout { + timeouts: HashSet, + }, + None, +} diff --git a/simulations/src/node/carnot/message_cache.rs b/simulations/src/node/carnot/message_cache.rs new file mode 100644 index 00000000..ac8eb5ee --- /dev/null +++ b/simulations/src/node/carnot/message_cache.rs @@ -0,0 +1,30 @@ +use crate::node::carnot::messages::CarnotMessage; +use consensus_engine::View; +use polars::export::ahash::HashMap; + +pub(crate) struct MessageCache { + cache: HashMap>, +} + +impl MessageCache { + pub fn new() -> Self { + Self { + cache: Default::default(), + } + } + + pub fn update>(&mut self, messages: I) { + for message in messages { + let entry = self.cache.entry(message.view()).or_default(); + entry.push(message); + } + } + + pub fn prune(&mut self, view: View) { + self.cache.retain(|v, _| v > &view); + } + + pub fn retrieve(&mut self, view: View) -> Vec { + self.cache.remove(&view).unwrap_or_default() + } +} diff --git a/simulations/src/node/carnot/messages.rs b/simulations/src/node/carnot/messages.rs new file mode 100644 index 00000000..807ffe1f --- /dev/null +++ b/simulations/src/node/carnot/messages.rs @@ -0,0 +1,25 @@ +use consensus_engine::View; +use nomos_consensus::network::messages::{ + NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg, +}; + +#[derive(Eq, PartialEq, Hash, Clone)] +pub enum CarnotMessage { + Proposal(ProposalChunkMsg), + Vote(VoteMsg), + TimeoutQc(TimeoutQcMsg), + Timeout(TimeoutMsg), + NewView(NewViewMsg), +} + +impl CarnotMessage { + pub fn view(&self) -> View { + match self { + CarnotMessage::Proposal(msg) => msg.view, + CarnotMessage::Vote(msg) => msg.vote.view, + CarnotMessage::TimeoutQc(msg) => msg.qc.view, + CarnotMessage::Timeout(msg) => msg.vote.view, + CarnotMessage::NewView(msg) => msg.vote.view, + } + } +} diff --git a/simulations/src/node/carnot/mod.rs b/simulations/src/node/carnot/mod.rs index c0078228..8b2f6522 100644 --- a/simulations/src/node/carnot/mod.rs +++ b/simulations/src/node/carnot/mod.rs @@ -1,33 +1,211 @@ +#![allow(dead_code)] + +mod event_builder; +mod message_cache; +mod messages; + // std +use std::hash::Hash; +use std::{collections::HashMap, time::Duration}; // crates +use bls_signatures::PrivateKey; +use rand::Rng; use serde::{Deserialize, Serialize}; // internal +use self::messages::CarnotMessage; use super::{Node, NodeId}; +use crate::network::{InMemoryNetworkInterface, NetworkInterface, NetworkMessage}; +use crate::node::carnot::event_builder::{CarnotTx, Event}; +use crate::node::carnot::message_cache::MessageCache; +use crate::util::parse_idx; +use consensus_engine::overlay::RandomBeaconState; +use consensus_engine::{ + Block, BlockId, Carnot, Committee, Overlay, Payload, Qc, StandardQc, TimeoutQc, View, Vote, +}; +use nomos_consensus::network::messages::ProposalChunkMsg; +use nomos_consensus::{ + leader_selection::UpdateableLeaderSelection, + network::messages::{NewViewMsg, TimeoutMsg, VoteMsg}, +}; -#[derive(Default, Serialize)] -pub struct CarnotState {} - -#[derive(Clone, Default, Deserialize)] -pub struct CarnotSettings {} - -#[allow(dead_code)] // TODO: remove when handling settings -pub struct CarnotNode { - id: NodeId, - state: CarnotState, - settings: CarnotSettings, +#[derive(Serialize)] +pub struct CarnotState { + current_view: View, + highest_voted_view: View, + local_high_qc: StandardQc, + #[serde(serialize_with = "serialize_blocks")] + safe_blocks: HashMap, + last_view_timeout_qc: Option, + latest_committed_block: Block, + latest_committed_view: View, + root_committe: Committee, + parent_committe: Committee, + child_committees: Vec, + committed_blocks: Vec, } -impl CarnotNode { - pub fn new(id: NodeId) -> Self { +/// Have to implement this manually because of the `serde_json` will panic if the key of map +/// is not a string. +fn serialize_blocks(blocks: &HashMap, serializer: S) -> Result +where + S: serde::Serializer, +{ + use serde::ser::SerializeMap; + let mut ser = serializer.serialize_map(Some(blocks.len()))?; + for (k, v) in blocks { + ser.serialize_entry(&format!("{k:?}"), v)?; + } + ser.end() +} + +impl From<&Carnot> for CarnotState { + fn from(value: &Carnot) -> Self { + let current_view = value.current_view(); Self { - id, - state: Default::default(), - settings: Default::default(), + current_view, + local_high_qc: value.high_qc(), + parent_committe: value.parent_committee(), + root_committe: value.root_committee(), + child_committees: value.child_committees(), + latest_committed_block: value.latest_committed_block(), + latest_committed_view: value.latest_committed_view(), + safe_blocks: value + .blocks_in_view(current_view) + .into_iter() + .map(|b| (b.id, b)) + .collect(), + last_view_timeout_qc: value.last_view_timeout_qc(), + committed_blocks: value.committed_blocks(), + highest_voted_view: Default::default(), } } } -impl Node for CarnotNode { +#[derive(Clone, Default, Deserialize)] +pub struct CarnotSettings { + nodes: Vec, + timeout: Duration, +} + +impl CarnotSettings { + pub fn new(nodes: Vec, timeout: Duration) -> Self { + Self { nodes, timeout } + } +} + +#[allow(dead_code)] // TODO: remove when handling settings +pub struct CarnotNode { + id: consensus_engine::NodeId, + state: CarnotState, + settings: CarnotSettings, + network_interface: InMemoryNetworkInterface, + message_cache: MessageCache, + event_builder: event_builder::EventBuilder, + engine: Carnot, + random_beacon_pk: PrivateKey, +} + +impl CarnotNode { + pub fn new( + id: consensus_engine::NodeId, + settings: CarnotSettings, + overlay_settings: O::Settings, + genesis: nomos_core::block::Block, + network_interface: InMemoryNetworkInterface, + rng: &mut R, + ) -> Self { + let overlay = O::new(overlay_settings); + let engine = Carnot::from_genesis(id, genesis.header().clone(), overlay); + let state = CarnotState::from(&engine); + // pk is generated in an insecure way, but for simulation purpouses using a rng like smallrng is more useful + let mut pk_buff = [0; 32]; + rng.fill_bytes(&mut pk_buff); + let random_beacon_pk = PrivateKey::new(pk_buff); + Self { + id, + state, + settings, + network_interface, + message_cache: MessageCache::new(), + event_builder: event_builder::EventBuilder::new(id), + engine, + random_beacon_pk, + } + } + + pub(crate) fn send_message(&self, message: NetworkMessage) { + self.network_interface + .send_message(self.id, message.payload); + } + + fn handle_output(&self, output: Output) { + match output { + Output::Send(consensus_engine::Send { + to, + payload: Payload::Vote(vote), + }) => { + for node in to { + self.network_interface.send_message( + node, + CarnotMessage::Vote(VoteMsg { + voter: self.id, + vote: vote.clone(), + qc: Some(Qc::Standard(StandardQc { + view: vote.view, + id: vote.block, + })), + }), + ); + } + } + Output::Send(consensus_engine::Send { + to, + payload: Payload::NewView(new_view), + }) => { + for node in to { + self.network_interface.send_message( + node, + CarnotMessage::NewView(NewViewMsg { + voter: node, + vote: new_view.clone(), + }), + ); + } + } + Output::Send(consensus_engine::Send { + to, + payload: Payload::Timeout(timeout), + }) => { + for node in to { + self.network_interface.send_message( + node, + CarnotMessage::Timeout(TimeoutMsg { + voter: node, + vote: timeout.clone(), + }), + ); + } + } + Output::BroadcastTimeoutQc { .. } => { + unimplemented!() + } + Output::BroadcastProposal { proposal } => { + for node in &self.settings.nodes { + self.network_interface.send_message( + *node, + CarnotMessage::Proposal(ProposalChunkMsg { + chunk: proposal.as_bytes().to_vec().into(), + proposal: proposal.header().id, + view: proposal.header().view, + }), + ) + } + } + } + } +} + +impl> Node for CarnotNode { type Settings = CarnotSettings; type State = CarnotState; @@ -36,7 +214,7 @@ impl Node for CarnotNode { } fn current_view(&self) -> usize { - todo!() + self.event_builder.current_view as usize } fn state(&self) -> &CarnotState { @@ -44,6 +222,143 @@ impl Node for CarnotNode { } fn step(&mut self) { - todo!() + // split messages per view, we just one to process the current engine processing view or proposals or timeoutqcs + let (mut current_view_messages, other_view_messages): (Vec<_>, Vec<_>) = self + .network_interface + .receive_messages() + .into_iter() + .map(|m| m.payload) + .partition(|m| { + m.view() == self.engine.current_view() + || matches!(m, CarnotMessage::Proposal(_) | CarnotMessage::TimeoutQc(_)) + }); + self.message_cache.prune(self.engine.current_view() - 1); + self.message_cache.update(other_view_messages); + current_view_messages.append(&mut self.message_cache.retrieve(self.engine.current_view())); + + let events = self.event_builder.step(current_view_messages, &self.engine); + + for event in events { + let mut output: Vec> = vec![]; + match event { + Event::Proposal { block } => { + let current_view = self.engine.current_view(); + tracing::info!( + node=parse_idx(&self.id), + last_committed_view=self.engine.latest_committed_view(), + current_view = current_view, + block_view = block.header().view, + block = ?block.header().id, + parent_block=?block.header().parent(), + "receive block proposal", + ); + match self.engine.receive_block(block.header().clone()) { + Ok(mut new) => { + if self.engine.current_view() != new.current_view() { + new = new + .update_overlay(|overlay| { + overlay.update_leader_selection(|leader_selection| { + leader_selection.on_new_block_received(block.clone()) + }) + }) + .unwrap_or(new); + self.engine = new; + } + } + Err(_) => { + tracing::error!(node = parse_idx(&self.id), current_view = self.engine.current_view(), block_view = block.header().view, block = ?block.header().id, "receive block proposal, but is invalid"); + } + } + + if self.engine.overlay().is_member_of_leaf_committee(self.id) { + output.push(Output::Send(consensus_engine::Send { + to: self.engine.parent_committee(), + payload: Payload::Vote(Vote { + view: self.engine.current_view(), + block: block.header().id, + }), + })) + } + } + // This branch means we already get enough votes for this block + // So we can just call approve_block + Event::Approve { block, .. } => { + tracing::info!( + node = parse_idx(&self.id), + current_view = self.engine.current_view(), + block_view = block.view, + block = ?block.id, + parent_block=?block.parent(), + "receive approve message" + ); + let (new, out) = self.engine.approve_block(block); + tracing::info!(vote=?out, node=parse_idx(&self.id)); + output = vec![Output::Send(out)]; + self.engine = new; + } + Event::ProposeBlock { qc } => { + output = vec![Output::BroadcastProposal { + proposal: nomos_core::block::Block::new( + qc.view() + 1, + qc.clone(), + [].into_iter(), + self.id, + RandomBeaconState::generate_happy( + qc.view() + 1, + &self.random_beacon_pk, + ), + ), + }] + } + // This branch means we already get enough new view msgs for this qc + // So we can just call approve_new_view + Event::NewView { + timeout_qc: _, + new_views: _, + } => { + // let (new, out) = self.engine.approve_new_view(timeout_qc, new_views); + // output = Some(out); + // self.engine = new; + // let next_view = timeout_qc.view + 2; + // if self.engine.is_leader_for_view(next_view) { + // self.gather_new_views(&[self.id].into_iter().collect(), timeout_qc); + // } + tracing::error!("unimplemented new view branch"); + unimplemented!() + } + Event::TimeoutQc { timeout_qc } => { + self.engine = self.engine.receive_timeout_qc(timeout_qc); + } + Event::RootTimeout { timeouts } => { + println!("root timeouts: {timeouts:?}"); + } + Event::LocalTimeout => { + tracing::error!("unimplemented local timeout branch"); + unreachable!("local timeout will never be constructed") + } + Event::None => { + tracing::error!("unimplemented none branch"); + unreachable!("none event will never be constructed") + } + } + + for output_event in output { + self.handle_output(output_event); + } + } + + // update state + self.state = CarnotState::from(&self.engine); } } + +#[derive(Debug)] +enum Output { + Send(consensus_engine::Send), + BroadcastTimeoutQc { + timeout_qc: TimeoutQc, + }, + BroadcastProposal { + proposal: nomos_core::block::Block, + }, +} diff --git a/simulations/src/node/dummy.rs b/simulations/src/node/dummy.rs index 9921e258..b6771c62 100644 --- a/simulations/src/node/dummy.rs +++ b/simulations/src/node/dummy.rs @@ -8,7 +8,7 @@ use crate::{ node::{Node, NodeId}, }; -use super::{OverlayGetter, OverlayState, SharedState, ViewOverlay}; +use super::{CommitteeId, OverlayGetter, OverlayState, SharedState, ViewOverlay}; #[derive(Debug, Default, Serialize)] pub struct DummyState { @@ -112,7 +112,7 @@ impl LocalView { let current_roots = view .layout .committees - .get(&0.into()) + .get(&CommitteeId(0)) .map(|c| c.nodes.clone()); Self { @@ -435,7 +435,7 @@ mod tests { network::{ behaviour::NetworkBehaviour, regions::{Region, RegionsData}, - InMemoryNetworkInterface, Network, + InMemoryNetworkInterface, Network, NetworkBehaviourKey, }, node::{ dummy::{get_child_nodes, get_parent_nodes, get_roles, DummyRole}, @@ -445,6 +445,7 @@ mod tests { tree::{TreeOverlay, TreeSettings}, Overlay, }, + util::node_id, }; use super::{DummyMessage, DummyNode, Intent, Vote}; @@ -452,7 +453,7 @@ mod tests { fn init_network(node_ids: &[NodeId]) -> Network { let regions = HashMap::from([(Region::Europe, node_ids.to_vec())]); let behaviour = HashMap::from([( - (Region::Europe, Region::Europe), + NetworkBehaviourKey::new(Region::Europe, Region::Europe), NetworkBehaviour::new(Duration::from_millis(100), 0.0), )]); let regions_data = RegionsData::new(regions, behaviour); @@ -516,7 +517,7 @@ mod tests { .for_each(|leader_id| { for _ in 0..committee_size { nodes - .get(&0.into()) + .get(&node_id(0)) .unwrap() .send_message(*leader_id, DummyMessage::Vote(initial_vote.clone())); } @@ -538,7 +539,7 @@ mod tests { let mut network = init_network(&node_ids); let view = ViewOverlay { - leaders: vec![0.into(), 1.into(), 2.into()], + leaders: vec![node_id(0), node_id(1), node_id(2)], layout: overlay.layout(&node_ids, &mut rng), }; let overlay_state = Arc::new(RwLock::new(OverlayState { @@ -556,9 +557,9 @@ mod tests { let initial_vote = Vote::new(1, Intent::FromRootToLeader); // Using any node as the sender for initial proposal to leader nodes. - nodes[&0.into()].send_message(0.into(), DummyMessage::Vote(initial_vote.clone())); - nodes[&0.into()].send_message(1.into(), DummyMessage::Vote(initial_vote.clone())); - nodes[&0.into()].send_message(2.into(), DummyMessage::Vote(initial_vote)); + nodes[&node_id(0)].send_message(node_id(0), DummyMessage::Vote(initial_vote.clone())); + nodes[&node_id(0)].send_message(node_id(1), DummyMessage::Vote(initial_vote.clone())); + nodes[&node_id(0)].send_message(node_id(2), DummyMessage::Vote(initial_vote)); network.collect_messages(); for (_, node) in nodes.iter() { @@ -586,15 +587,15 @@ mod tests { } // Root and Internal haven't sent their votes yet. - assert!(!nodes[&0.into()].state().view_state[&1].vote_sent); // Root - assert!(!nodes[&1.into()].state().view_state[&1].vote_sent); // Internal - assert!(!nodes[&2.into()].state().view_state[&1].vote_sent); // Internal + assert!(!nodes[&node_id(0)].state().view_state[&1].vote_sent); // Root + assert!(!nodes[&node_id(1)].state().view_state[&1].vote_sent); // Internal + assert!(!nodes[&node_id(2)].state().view_state[&1].vote_sent); // Internal // Leaves should have thier vote sent. - assert!(nodes[&3.into()].state().view_state[&1].vote_sent); // Leaf - assert!(nodes[&4.into()].state().view_state[&1].vote_sent); // Leaf - assert!(nodes[&5.into()].state().view_state[&1].vote_sent); // Leaf - assert!(nodes[&6.into()].state().view_state[&1].vote_sent); // Leaf + assert!(nodes[&node_id(3)].state().view_state[&1].vote_sent); // Leaf + assert!(nodes[&node_id(4)].state().view_state[&1].vote_sent); // Leaf + assert!(nodes[&node_id(5)].state().view_state[&1].vote_sent); // Leaf + assert!(nodes[&node_id(6)].state().view_state[&1].vote_sent); // Leaf // 3. Internal nodes send vote to root node. network.dispatch_after(Duration::from_millis(100)); @@ -604,15 +605,15 @@ mod tests { network.collect_messages(); // Root hasn't sent its votes yet. - assert!(!nodes[&0.into()].state().view_state[&1].vote_sent); // Root + assert!(!nodes[&node_id(0)].state().view_state[&1].vote_sent); // Root // Internal and leaves should have thier vote sent. - assert!(nodes[&1.into()].state().view_state[&1].vote_sent); // Internal - assert!(nodes[&2.into()].state().view_state[&1].vote_sent); // Internal - assert!(nodes[&3.into()].state().view_state[&1].vote_sent); // Leaf - assert!(nodes[&4.into()].state().view_state[&1].vote_sent); // Leaf - assert!(nodes[&5.into()].state().view_state[&1].vote_sent); // Leaf - assert!(nodes[&6.into()].state().view_state[&1].vote_sent); // Leaf + assert!(nodes[&node_id(1)].state().view_state[&1].vote_sent); // Internal + assert!(nodes[&node_id(2)].state().view_state[&1].vote_sent); // Internal + assert!(nodes[&node_id(3)].state().view_state[&1].vote_sent); // Leaf + assert!(nodes[&node_id(4)].state().view_state[&1].vote_sent); // Leaf + assert!(nodes[&node_id(5)].state().view_state[&1].vote_sent); // Leaf + assert!(nodes[&node_id(6)].state().view_state[&1].vote_sent); // Leaf // 4. Root node send vote to next view leader nodes. network.dispatch_after(Duration::from_millis(100)); @@ -622,13 +623,13 @@ mod tests { network.collect_messages(); // Root has sent its votes. - assert!(nodes[&0.into()].state().view_state[&1].vote_sent); // Root - assert!(nodes[&1.into()].state().view_state[&1].vote_sent); // Internal - assert!(nodes[&2.into()].state().view_state[&1].vote_sent); // Internal - assert!(nodes[&3.into()].state().view_state[&1].vote_sent); // Leaf - assert!(nodes[&4.into()].state().view_state[&1].vote_sent); // Leaf - assert!(nodes[&5.into()].state().view_state[&1].vote_sent); // Leaf - assert!(nodes[&6.into()].state().view_state[&1].vote_sent); // Leaf + assert!(nodes[&node_id(0)].state().view_state[&1].vote_sent); // Root + assert!(nodes[&node_id(1)].state().view_state[&1].vote_sent); // Internal + assert!(nodes[&node_id(2)].state().view_state[&1].vote_sent); // Internal + assert!(nodes[&node_id(3)].state().view_state[&1].vote_sent); // Leaf + assert!(nodes[&node_id(4)].state().view_state[&1].vote_sent); // Leaf + assert!(nodes[&node_id(5)].state().view_state[&1].vote_sent); // Leaf + assert!(nodes[&node_id(6)].state().view_state[&1].vote_sent); // Leaf // 5. Leaders receive vote and broadcast new Proposal(Block) to all nodes. network.dispatch_after(Duration::from_millis(100)); @@ -656,15 +657,15 @@ mod tests { } // Root and Internal haven't sent their votes yet. - assert!(!nodes[&0.into()].state().view_state[&2].vote_sent); // Root - assert!(!nodes[&1.into()].state().view_state[&2].vote_sent); // Internal - assert!(!nodes[&2.into()].state().view_state[&2].vote_sent); // Internal + assert!(!nodes[&node_id(0)].state().view_state[&2].vote_sent); // Root + assert!(!nodes[&node_id(1)].state().view_state[&2].vote_sent); // Internal + assert!(!nodes[&node_id(2)].state().view_state[&2].vote_sent); // Internal // Leaves should have thier vote sent. - assert!(nodes[&3.into()].state().view_state[&2].vote_sent); // Leaf - assert!(nodes[&4.into()].state().view_state[&2].vote_sent); // Leaf - assert!(nodes[&5.into()].state().view_state[&2].vote_sent); // Leaf - assert!(nodes[&6.into()].state().view_state[&2].vote_sent); // Leaf + assert!(nodes[&node_id(3)].state().view_state[&2].vote_sent); // Leaf + assert!(nodes[&node_id(4)].state().view_state[&2].vote_sent); // Leaf + assert!(nodes[&node_id(5)].state().view_state[&2].vote_sent); // Leaf + assert!(nodes[&node_id(6)].state().view_state[&2].vote_sent); // Leaf } #[test] @@ -685,7 +686,7 @@ mod tests { })); // There are more nodes in the network than in a tree overlay. - let node_ids: Vec = (0..100).map(Into::into).collect(); + let node_ids: Vec = (0..100).map(node_id).collect(); let mut network = init_network(&node_ids); let overlays = generate_overlays(&node_ids, &overlay, 4, 3, &mut rng); @@ -735,7 +736,7 @@ mod tests { })); // There are more nodes in the network than in a tree overlay. - let node_ids: Vec = (0..10000).map(Into::into).collect(); + let node_ids: Vec = (0..10000).map(node_id).collect(); let mut network = init_network(&node_ids); let overlays = generate_overlays(&node_ids, &overlay, 4, 100, &mut rng); @@ -785,7 +786,7 @@ mod tests { })); // There are more nodes in the network than in a tree overlay. - let node_ids: Vec = (0..100000).map(Into::into).collect(); + let node_ids: Vec = (0..100000).map(node_id).collect(); let mut network = init_network(&node_ids); let overlays = generate_overlays(&node_ids, &overlay, 4, 1000, &mut rng); @@ -824,42 +825,42 @@ mod tests { ( 0, None, - Some(BTreeSet::from([1.into(), 2.into()])), + Some(BTreeSet::from([node_id(1), node_id(2)])), vec![DummyRole::Root], ), ( 1, - Some(BTreeSet::from([0.into()])), - Some(BTreeSet::from([3.into(), 4.into()])), + Some(BTreeSet::from([node_id(0)])), + Some(BTreeSet::from([node_id(3), node_id(4)])), vec![DummyRole::Internal], ), ( 2, - Some(BTreeSet::from([0.into()])), - Some(BTreeSet::from([5.into(), 6.into()])), + Some(BTreeSet::from([node_id(0)])), + Some(BTreeSet::from([node_id(5), node_id(6)])), vec![DummyRole::Internal], ), ( 3, - Some(BTreeSet::from([1.into()])), + Some(BTreeSet::from([node_id(1)])), None, vec![DummyRole::Leaf], ), ( 4, - Some(BTreeSet::from([1.into()])), + Some(BTreeSet::from([node_id(1)])), None, vec![DummyRole::Leaf], ), ( 5, - Some(BTreeSet::from([2.into()])), + Some(BTreeSet::from([node_id(2)])), None, vec![DummyRole::Leaf], ), ( 6, - Some(BTreeSet::from([2.into()])), + Some(BTreeSet::from([node_id(2)])), None, vec![DummyRole::Leader, DummyRole::Leaf], ), @@ -871,12 +872,12 @@ mod tests { committee_size: 1, }); let node_ids: Vec = overlay.nodes(); - let leaders = vec![6.into()]; + let leaders = vec![node_id(6)]; let layout = overlay.layout(&node_ids, &mut rng); let view = ViewOverlay { leaders, layout }; - for (node_id, expected_parents, expected_children, expected_roles) in test_cases { - let node_id = node_id.into(); + for (nid, expected_parents, expected_children, expected_roles) in test_cases { + let node_id = node_id(nid); let parents = get_parent_nodes(node_id, &view); let children = get_child_nodes(node_id, &view); let role = get_roles(node_id, &view, &parents, &children); diff --git a/simulations/src/node/mod.rs b/simulations/src/node/mod.rs index 81eec2a7..4d2e4c4a 100644 --- a/simulations/src/node/mod.rs +++ b/simulations/src/node/mod.rs @@ -17,27 +17,7 @@ use serde::{Deserialize, Serialize}; // internal use crate::overlay::{Layout, OverlaySettings, SimulationOverlay}; -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] -#[serde(transparent)] -pub struct NodeId(usize); - -impl NodeId { - #[inline] - pub const fn new(id: usize) -> Self { - Self(id) - } - - #[inline] - pub const fn inner(&self) -> usize { - self.0 - } -} - -impl From for NodeId { - fn from(id: usize) -> Self { - Self(id) - } -} +pub use consensus_engine::NodeId; #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] #[serde(transparent)] @@ -183,7 +163,7 @@ impl Node for usize { type State = Self; fn id(&self) -> NodeId { - (*self).into() + crate::util::node_id(*self) } fn current_view(&self) -> usize { diff --git a/simulations/src/output_processors/mod.rs b/simulations/src/output_processors/mod.rs index 25fb2707..07726e51 100644 --- a/simulations/src/output_processors/mod.rs +++ b/simulations/src/output_processors/mod.rs @@ -1,16 +1,97 @@ +use std::time::Duration; + +use chrono::{DateTime, Utc}; use serde::Serialize; +use crate::settings::SimulationSettings; use crate::warding::SimulationState; +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum RecordType { + Meta, + Settings, + Data, +} + +pub trait Record: From + From + Send + Sync + 'static { + fn record_type(&self) -> RecordType; + + fn is_settings(&self) -> bool { + self.record_type() == RecordType::Settings + } + + fn is_meta(&self) -> bool { + self.record_type() == RecordType::Meta + } + + fn is_data(&self) -> bool { + self.record_type() == RecordType::Data + } +} + pub type SerializedNodeState = serde_json::Value; #[derive(Serialize)] -pub struct OutData(SerializedNodeState); +pub struct Runtime { + start: DateTime, + end: DateTime, + elapsed: Duration, +} + +impl Runtime { + pub(crate) fn load() -> anyhow::Result { + let elapsed = crate::START_TIME.elapsed(); + let end = Utc::now(); + Ok(Self { + start: end + .checked_sub_signed(chrono::Duration::from_std(elapsed)?) + .unwrap(), + end, + elapsed, + }) + } +} + +#[derive(Serialize)] +#[serde(untagged)] +pub enum OutData { + Runtime(Runtime), + Settings(Box), + Data(SerializedNodeState), +} + +impl From for OutData { + fn from(runtime: Runtime) -> Self { + Self::Runtime(runtime) + } +} + +impl From for OutData { + fn from(settings: SimulationSettings) -> Self { + Self::Settings(Box::new(settings)) + } +} + +impl From for OutData { + fn from(state: SerializedNodeState) -> Self { + Self::Data(state) + } +} + +impl Record for OutData { + fn record_type(&self) -> RecordType { + match self { + Self::Runtime(_) => RecordType::Meta, + Self::Settings(_) => RecordType::Settings, + Self::Data(_) => RecordType::Data, + } + } +} impl OutData { #[inline] pub const fn new(state: SerializedNodeState) -> Self { - Self(state) + Self::Data(state) } } diff --git a/simulations/src/overlay/flat.rs b/simulations/src/overlay/flat.rs index 52387b67..2c56bc62 100644 --- a/simulations/src/overlay/flat.rs +++ b/simulations/src/overlay/flat.rs @@ -6,6 +6,7 @@ use rand::Rng; use super::Overlay; use crate::node::NodeId; use crate::overlay::{Committee, Layout}; +use crate::util::node_id; pub struct FlatOverlay; impl FlatOverlay { @@ -22,7 +23,7 @@ impl Default for FlatOverlay { impl Overlay for FlatOverlay { fn nodes(&self) -> Vec { - (0..10).map(NodeId::from).collect() + (0..10).map(node_id).collect() } fn leaders( diff --git a/simulations/src/overlay/mod.rs b/simulations/src/overlay/mod.rs index 11abd72c..4a62cfef 100644 --- a/simulations/src/overlay/mod.rs +++ b/simulations/src/overlay/mod.rs @@ -5,7 +5,7 @@ pub mod tree; use std::collections::{BTreeSet, HashMap}; // crates use rand::Rng; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; // internal use crate::node::{CommitteeId, NodeId}; @@ -97,7 +97,7 @@ pub enum SimulationOverlay { Tree(tree::TreeOverlay), } -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum OverlaySettings { Flat, Tree(TreeSettings), diff --git a/simulations/src/overlay/tree.rs b/simulations/src/overlay/tree.rs index 31af6eb3..e535a185 100644 --- a/simulations/src/overlay/tree.rs +++ b/simulations/src/overlay/tree.rs @@ -2,18 +2,21 @@ use std::collections::HashMap; // crates use rand::seq::IteratorRandom; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; // internal use super::{Committee, Layout, Overlay}; -use crate::node::{CommitteeId, NodeId}; +use crate::{ + node::{CommitteeId, NodeId}, + util::node_id, +}; -#[derive(Clone, Debug, Default, Deserialize)] +#[derive(Clone, Debug, Default, Serialize, Deserialize)] pub enum TreeType { #[default] FullBinaryTree, } -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct TreeSettings { pub tree_type: TreeType, pub committee_size: usize, @@ -105,7 +108,7 @@ impl TreeOverlay { impl Overlay for TreeOverlay { fn nodes(&self) -> Vec { let properties = get_tree_properties(&self.settings); - (0..properties.node_count).map(From::from).collect() + (0..properties.node_count).map(node_id).collect() } fn leaders( @@ -151,6 +154,8 @@ fn get_layer(id: usize) -> CommitteeId { #[cfg(test)] mod tests { + use crate::util::node_id; + use super::*; use rand::rngs::mock::StepRng; @@ -224,13 +229,13 @@ mod tests { let root_nodes = &layout.committees[&CommitteeId::new(0)].nodes; assert_eq!(root_nodes.len(), 10); - assert_eq!(root_nodes.first(), Some(&NodeId::new(0))); - assert_eq!(root_nodes.last(), Some(&NodeId::new(9))); + assert_eq!(root_nodes.first(), Some(&node_id(0))); + assert_eq!(root_nodes.last(), Some(&node_id(9))); let last_nodes = &layout.committees[&CommitteeId::new(1022)].nodes; assert_eq!(last_nodes.len(), 10); - assert_eq!(last_nodes.first(), Some(&NodeId::new(10220))); - assert_eq!(last_nodes.last(), Some(&NodeId::new(10229))); + assert_eq!(last_nodes.first(), Some(&node_id(10220))); + assert_eq!(last_nodes.last(), Some(&node_id(10229))); } #[test] diff --git a/simulations/src/runner/async_runner.rs b/simulations/src/runner/async_runner.rs index 1752e38e..45947f18 100644 --- a/simulations/src/runner/async_runner.rs +++ b/simulations/src/runner/async_runner.rs @@ -1,4 +1,5 @@ use crate::node::{Node, NodeId}; +use crate::output_processors::Record; use crate::runner::SimulationRunner; use crate::warding::SimulationState; use crossbeam::channel::bounded; @@ -21,7 +22,11 @@ where N: Send + Sync + 'static, N::Settings: Clone + Send, N::State: Serialize, - R: for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + Send + Sync + 'static, + R: Record + + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + + Send + + Sync + + 'static, { let simulation_state = SimulationState:: { nodes: Arc::clone(&runner.nodes), diff --git a/simulations/src/runner/glauber_runner.rs b/simulations/src/runner/glauber_runner.rs index 015608dd..40cc6341 100644 --- a/simulations/src/runner/glauber_runner.rs +++ b/simulations/src/runner/glauber_runner.rs @@ -1,5 +1,7 @@ use crate::node::{Node, NodeId}; +use crate::output_processors::Record; use crate::runner::SimulationRunner; +use crate::util::{node_id, parse_idx}; use crate::warding::SimulationState; use crossbeam::channel::bounded; use crossbeam::select; @@ -23,7 +25,11 @@ where N: Send + Sync + 'static, N::Settings: Clone + Send, N::State: Serialize, - R: for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + Send + Sync + 'static, + R: Record + + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + + Send + + Sync + + 'static, { let simulation_state = SimulationState { nodes: Arc::clone(&runner.nodes), @@ -31,7 +37,7 @@ where let inner_runner = runner.inner.clone(); let nodes = runner.nodes; - let nodes_remaining: BTreeSet = (0..nodes.read().len()).map(From::from).collect(); + let nodes_remaining: BTreeSet = (0..nodes.read().len()).map(node_id).collect(); let iterations: Vec<_> = (0..maximum_iterations).collect(); let (stop_tx, stop_rx) = bounded(1); let p = runner.producer.clone(); @@ -56,7 +62,7 @@ where { let mut shared_nodes = nodes.write(); let node: &mut N = shared_nodes - .get_mut(node_id.inner()) + .get_mut(parse_idx(&node_id)) .expect("Node should be present"); node.step(); } diff --git a/simulations/src/runner/layered_runner.rs b/simulations/src/runner/layered_runner.rs index f37755b7..20cafbcc 100644 --- a/simulations/src/runner/layered_runner.rs +++ b/simulations/src/runner/layered_runner.rs @@ -40,7 +40,9 @@ use rand::rngs::SmallRng; use serde::Serialize; // internal use crate::node::{Node, NodeId}; +use crate::output_processors::Record; use crate::runner::SimulationRunner; +use crate::util::parse_idx; use crate::warding::SimulationState; use super::SimulationRunnerHandle; @@ -56,7 +58,11 @@ where N: Send + Sync + 'static, N::Settings: Clone + Send, N::State: Serialize, - R: for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + Send + Sync + 'static, + R: Record + + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + + Send + + Sync + + 'static, { let distribution = distribution.unwrap_or_else(|| std::iter::repeat(1.0f32).take(gap).collect()); @@ -91,7 +97,7 @@ where { let mut shared_nodes = nodes.write(); let node: &mut N = shared_nodes - .get_mut(node_id.inner()) + .get_mut(parse_idx(&node_id)) .expect("Node should be present"); let prev_view = node.current_view(); node.step(); diff --git a/simulations/src/runner/mod.rs b/simulations/src/runner/mod.rs index dc846ee9..50ef41a3 100644 --- a/simulations/src/runner/mod.rs +++ b/simulations/src/runner/mod.rs @@ -7,6 +7,7 @@ mod sync_runner; use std::sync::Arc; use std::time::Duration; +use crate::output_processors::Record; // crates use crate::streaming::{StreamProducer, Subscriber, SubscriberHandle}; use crossbeam::channel::Sender; @@ -28,7 +29,7 @@ pub struct SimulationRunnerHandle { handle: std::thread::JoinHandle>, } -impl SimulationRunnerHandle { +impl SimulationRunnerHandle { pub fn stop_after(self, duration: Duration) -> anyhow::Result<()> { std::thread::sleep(duration); self.stop() @@ -76,7 +77,7 @@ where .any(|x| x) } - fn step(&mut self, nodes: &mut Vec) + fn step(&mut self, nodes: &mut [N]) where N: Node + Send + Sync, N::Settings: Clone + Send, @@ -108,19 +109,28 @@ where N: Send + Sync + 'static, N::Settings: Clone + Send, N::State: Serialize, - R: for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + Send + Sync + 'static, + R: Record + + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + + Send + + Sync + + 'static, { pub fn new( network: Network, nodes: Vec, producer: StreamProducer, - settings: SimulationSettings, - ) -> Self { + mut settings: SimulationSettings, + ) -> anyhow::Result { let seed = settings .seed .unwrap_or_else(|| rand::thread_rng().next_u64()); - println!("Seed: {seed}"); + settings + .seed + .get_or_insert_with(|| rand::thread_rng().next_u64()); + + // Store the settings to the producer so that we can collect them later + producer.send(R::from(settings.clone()))?; let rng = SmallRng::seed_from_u64(seed); let nodes = Arc::new(RwLock::new(nodes)); @@ -136,7 +146,7 @@ where leaders_count: _, network_settings: _, } = settings; - Self { + Ok(Self { runner_settings, inner: Arc::new(RwLock::new(SimulationRunnerInner { network, @@ -145,10 +155,13 @@ where })), nodes, producer, - } + }) } pub fn simulate(self) -> anyhow::Result> { + // init the start time + let _ = *crate::START_TIME; + match self.runner_settings.clone() { RunnerSettings::Sync => sync_runner::simulate(self), RunnerSettings::Async { chunks } => async_runner::simulate(self, chunks), diff --git a/simulations/src/runner/sync_runner.rs b/simulations/src/runner/sync_runner.rs index ad6f1ec4..c885e6f9 100644 --- a/simulations/src/runner/sync_runner.rs +++ b/simulations/src/runner/sync_runner.rs @@ -1,8 +1,8 @@ use serde::Serialize; use super::{SimulationRunner, SimulationRunnerHandle}; -use crate::node::Node; use crate::warding::SimulationState; +use crate::{node::Node, output_processors::Record}; use crossbeam::channel::{bounded, select}; use std::sync::Arc; @@ -15,7 +15,11 @@ where N: Send + Sync + 'static, N::Settings: Clone + Send, N::State: Serialize, - R: for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + Send + Sync + 'static, + R: Record + + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + + Send + + Sync + + 'static, { let state = SimulationState { nodes: Arc::clone(&runner.nodes), @@ -67,7 +71,7 @@ mod tests { network::{ behaviour::NetworkBehaviour, regions::{Region, RegionsData}, - InMemoryNetworkInterface, Network, + InMemoryNetworkInterface, Network, NetworkBehaviourKey, }, node::{ dummy::{DummyMessage, DummyNode}, @@ -78,6 +82,7 @@ mod tests { runner::SimulationRunner, settings::SimulationSettings, streaming::StreamProducer, + util::node_id, }; use crossbeam::channel; use parking_lot::RwLock; @@ -91,7 +96,7 @@ mod tests { fn init_network(node_ids: &[NodeId]) -> Network { let regions = HashMap::from([(Region::Europe, node_ids.to_vec())]); let behaviour = HashMap::from([( - (Region::Europe, Region::Europe), + NetworkBehaviourKey::new(Region::Europe, Region::Europe), NetworkBehaviour::new(Duration::from_millis(100), 0.0), )]); let regions_data = RegionsData::new(regions, behaviour); @@ -126,7 +131,7 @@ mod tests { }; let mut rng = StepRng::new(1, 0); - let node_ids: Vec = (0..settings.node_count).map(Into::into).collect(); + let node_ids: Vec = (0..settings.node_count).map(node_id).collect(); let overlay = TreeOverlay::new(settings.overlay_settings.clone().try_into().unwrap()); let mut network = init_network(&node_ids); let view = ViewOverlay { @@ -142,7 +147,7 @@ mod tests { let producer = StreamProducer::default(); let runner: SimulationRunner = - SimulationRunner::new(network, nodes, producer, settings); + SimulationRunner::new(network, nodes, producer, settings).unwrap(); let mut nodes = runner.nodes.write(); runner.inner.write().step(&mut nodes); drop(nodes); @@ -161,7 +166,7 @@ mod tests { }; let mut rng = StepRng::new(1, 0); - let node_ids: Vec = (0..settings.node_count).map(Into::into).collect(); + let node_ids: Vec = (0..settings.node_count).map(node_id).collect(); let overlay = TreeOverlay::new(settings.overlay_settings.clone().try_into().unwrap()); let mut network = init_network(&node_ids); let view = ViewOverlay { @@ -188,7 +193,7 @@ mod tests { network.collect_messages(); let runner: SimulationRunner = - SimulationRunner::new(network, nodes, Default::default(), settings); + SimulationRunner::new(network, nodes, Default::default(), settings).unwrap(); let mut nodes = runner.nodes.write(); runner.inner.write().step(&mut nodes); diff --git a/simulations/src/settings.rs b/simulations/src/settings.rs index d94d39cc..6157b330 100644 --- a/simulations/src/settings.rs +++ b/simulations/src/settings.rs @@ -2,9 +2,9 @@ use crate::network::NetworkSettings; use crate::overlay::OverlaySettings; use crate::streaming::StreamSettings; use crate::warding::Ward; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug, Deserialize, Default)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] pub enum RunnerSettings { #[default] Sync, @@ -21,20 +21,25 @@ pub enum RunnerSettings { }, } -#[derive(Clone, Debug, Deserialize, Default)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] +#[serde(untagged)] pub enum NodeSettings { - Carnot, + Carnot { + #[serde(with = "humantime_serde")] + timeout: std::time::Duration, + }, #[default] Dummy, } -#[derive(Default, Deserialize)] +#[derive(Clone, Default, Debug, Serialize, Deserialize)] pub struct SimulationSettings { #[serde(default)] pub wards: Vec, pub network_settings: NetworkSettings, pub overlay_settings: OverlaySettings, pub node_settings: NodeSettings, + #[serde(default)] pub runner_settings: RunnerSettings, pub stream_settings: StreamSettings, pub node_count: usize, diff --git a/simulations/src/streaming/io.rs b/simulations/src/streaming/io.rs index 356d46b2..bfaef31c 100644 --- a/simulations/src/streaming/io.rs +++ b/simulations/src/streaming/io.rs @@ -1,15 +1,18 @@ use std::{any::Any, io::stdout, sync::Arc}; use super::{Receivers, StreamSettings, Subscriber}; +use crate::output_processors::{RecordType, Runtime}; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, Default, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct IOStreamSettings { + #[serde(rename = "type")] pub writer_type: WriteType, } -#[derive(Debug, Clone, Default, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] pub enum WriteType { #[default] Stdout, @@ -23,8 +26,7 @@ impl ToWriter for WriteType { fn to_writer(&self) -> anyhow::Result { match self { WriteType::Stdout => { - let stdout = Box::new(stdout()); - let boxed_any = Box::new(stdout) as Box; + let boxed_any = Box::new(stdout()) as Box; Ok(boxed_any.downcast::().map(|boxed| *boxed).map_err(|_| { std::io::Error::new(std::io::ErrorKind::Other, "Writer type mismatch") })?) @@ -53,7 +55,7 @@ pub struct IOSubscriber { impl Subscriber for IOSubscriber where W: std::io::Write + Send + Sync + 'static, - R: Serialize + Send + Sync + 'static, + R: crate::output_processors::Record + Serialize, { type Record = R; type Settings = IOStreamSettings; @@ -83,6 +85,8 @@ where loop { crossbeam::select! { recv(self.recvs.stop_rx) -> _ => { + // collect the run time meta + self.sink(Arc::new(R::from(Runtime::load()?)))?; break; } recv(self.recvs.recv) -> msg => { @@ -98,6 +102,10 @@ where serde_json::to_writer(&mut *self.writer.lock(), &state)?; Ok(()) } + + fn subscribe_data_type() -> RecordType { + RecordType::Data + } } #[cfg(test)] @@ -108,11 +116,12 @@ mod tests { network::{ behaviour::NetworkBehaviour, regions::{Region, RegionsData}, - Network, + Network, NetworkBehaviourKey, }, node::{dummy_streaming::DummyStreamingNode, Node, NodeId}, output_processors::OutData, runner::SimulationRunner, + util::node_id, warding::SimulationState, }; @@ -144,7 +153,7 @@ mod tests { }; let nodes = (0..6) - .map(|idx| DummyStreamingNode::new(NodeId::from(idx), ())) + .map(|idx| DummyStreamingNode::new(node_id(idx), ())) .collect::>(); let network = Network::new(RegionsData { regions: (0..6) @@ -158,7 +167,7 @@ mod tests { 5 => Region::Australia, _ => unreachable!(), }; - (region, vec![idx.into()]) + (region, vec![node_id(idx)]) }) .collect(), node_region: (0..6) @@ -172,7 +181,7 @@ mod tests { 5 => Region::Australia, _ => unreachable!(), }; - (idx.into(), region) + (node_id(idx), region) }) .collect(), region_network_behaviour: (0..6) @@ -187,7 +196,7 @@ mod tests { _ => unreachable!(), }; ( - (region, region), + NetworkBehaviourKey::new(region, region), NetworkBehaviour { delay: Duration::from_millis(100), drop: 0.0, @@ -197,7 +206,7 @@ mod tests { .collect(), }); let simulation_runner: SimulationRunner<(), DummyStreamingNode<()>, OutData> = - SimulationRunner::new(network, nodes, Default::default(), simulation_settings); + SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap(); simulation_runner .simulate() .unwrap() diff --git a/simulations/src/streaming/mod.rs b/simulations/src/streaming/mod.rs index 4b723bc7..6d6a5d7b 100644 --- a/simulations/src/streaming/mod.rs +++ b/simulations/src/streaming/mod.rs @@ -7,9 +7,19 @@ use std::{ use crossbeam::channel::{bounded, unbounded, Receiver, Sender}; use serde::{Deserialize, Serialize}; +use crate::output_processors::{Record, RecordType, Runtime}; + pub mod io; pub mod naive; pub mod polars; +pub mod runtime_subscriber; +pub mod settings_subscriber; + +pub enum SubscriberType { + Meta, + Settings, + Data, +} #[derive(Debug)] struct Receivers { @@ -30,6 +40,7 @@ impl FromStr for StreamType { fn from_str(s: &str) -> Result { match s.trim().to_ascii_lowercase().as_str() { + "io" => Ok(Self::IO), "naive" => Ok(Self::Naive), "polars" => Ok(Self::Polars), tag => Err(format!( @@ -49,7 +60,8 @@ impl<'de> serde::Deserialize<'de> for StreamType { } } -#[derive(Debug, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase", untagged)] pub enum StreamSettings { Naive(naive::NaiveSettings), IO(io::IOStreamSettings), @@ -122,7 +134,7 @@ where match handle.join() { Ok(rst) => rst?, Err(_) => { - eprintln!("Error joining subscriber thread"); + tracing::error!("Error joining subscriber thread"); } } } @@ -137,6 +149,7 @@ where #[derive(Debug)] struct Senders { + record_ty: RecordType, record_sender: Sender>, stop_sender: Sender<()>, } @@ -188,7 +201,7 @@ impl StreamProducer { impl StreamProducer where - R: Send + Sync + 'static, + R: Record + Send + Sync + 'static, { pub fn send(&self, record: R) -> anyhow::Result<()> { let mut inner = self.inner.lock().unwrap(); @@ -197,11 +210,18 @@ where Ok(()) } else { let record = Arc::new(record); + // cache record for new subscriber + inner.record_cache.push(record.clone()); + // if a send fails, then it means the corresponding subscriber is dropped, // we just remove the sender from the list of senders. - inner - .senders - .retain(|tx| tx.record_sender.send(record.clone()).is_ok()); + inner.senders.retain(|tx| { + if tx.record_ty != record.record_type() { + true + } else { + tx.record_sender.send(Arc::clone(&record)).is_ok() + } + }); Ok(()) } } @@ -213,9 +233,18 @@ where let (tx, rx) = unbounded(); let (stop_tx, stop_rx) = bounded(1); let mut inner = self.inner.lock().unwrap(); + + // send all previous records to the new subscriber + for record in inner.record_cache.iter() { + if S::subscribe_data_type() == record.record_type() { + tx.send(Arc::clone(record))?; + } + } + inner.senders.push(Senders { record_sender: tx, stop_sender: stop_tx.clone(), + record_ty: S::subscribe_data_type(), }); Ok(SubscriberHandle { handle: None, @@ -225,10 +254,22 @@ where } pub fn stop(self) -> anyhow::Result<()> { + let meta_record = Arc::new(R::from(Runtime::load()?)); let inner = self.inner.lock().unwrap(); + + // send runtime record to runtime subscribers + inner.senders.iter().for_each(|tx| { + if tx.record_ty == meta_record.record_type() { + if let Err(e) = tx.record_sender.send(Arc::clone(&meta_record)) { + tracing::error!("Error sending meta record: {e}"); + } + } + }); + + // send stop signal to all subscribers inner.senders.iter().for_each(|tx| { if let Err(e) = tx.stop_sender.send(()) { - eprintln!("Error stopping subscriber: {e}"); + tracing::error!("Error stopping subscriber: {e}"); } }); Ok(()) @@ -237,7 +278,7 @@ where pub trait Subscriber { type Settings; - type Record: Serialize + Send + Sync + 'static; + type Record: crate::output_processors::Record + Serialize; fn new( record_recv: Receiver>, @@ -260,4 +301,6 @@ pub trait Subscriber { } fn sink(&self, state: Arc) -> anyhow::Result<()>; + + fn subscribe_data_type() -> RecordType; } diff --git a/simulations/src/streaming/naive.rs b/simulations/src/streaming/naive.rs index 0035a4f2..480552bf 100644 --- a/simulations/src/streaming/naive.rs +++ b/simulations/src/streaming/naive.rs @@ -1,4 +1,5 @@ use super::{Receivers, StreamSettings, Subscriber}; +use crate::output_processors::{RecordType, Runtime}; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use std::{ @@ -41,7 +42,7 @@ pub struct NaiveSubscriber { impl Subscriber for NaiveSubscriber where - R: Serialize + Send + Sync + 'static, + R: crate::output_processors::Record + Serialize, { type Record = R; @@ -70,7 +71,11 @@ where )), recvs: Arc::new(recvs), }; - eprintln!("Subscribed to {}", settings.path.display()); + tracing::info!( + target = "simulation", + "subscribed to {}", + settings.path.display() + ); Ok(this) } @@ -82,6 +87,8 @@ where loop { crossbeam::select! { recv(self.recvs.stop_rx) -> _ => { + // collect the run time meta + self.sink(Arc::new(R::from(Runtime::load()?)))?; break; } recv(self.recvs.recv) -> msg => { @@ -99,6 +106,10 @@ where file.write_all(b",\n")?; Ok(()) } + + fn subscribe_data_type() -> RecordType { + RecordType::Data + } } #[cfg(test)] @@ -109,11 +120,12 @@ mod tests { network::{ behaviour::NetworkBehaviour, regions::{Region, RegionsData}, - Network, + Network, NetworkBehaviourKey, }, node::{dummy_streaming::DummyStreamingNode, Node, NodeId}, output_processors::OutData, runner::SimulationRunner, + util::node_id, warding::SimulationState, }; @@ -146,7 +158,7 @@ mod tests { }; let nodes = (0..6) - .map(|idx| DummyStreamingNode::new(NodeId::from(idx), ())) + .map(|idx| DummyStreamingNode::new(node_id(idx), ())) .collect::>(); let network = Network::new(RegionsData { regions: (0..6) @@ -160,7 +172,7 @@ mod tests { 5 => Region::Australia, _ => unreachable!(), }; - (region, vec![idx.into()]) + (region, vec![node_id(idx)]) }) .collect(), node_region: (0..6) @@ -174,7 +186,7 @@ mod tests { 5 => Region::Australia, _ => unreachable!(), }; - (idx.into(), region) + (node_id(idx), region) }) .collect(), region_network_behaviour: (0..6) @@ -189,7 +201,7 @@ mod tests { _ => unreachable!(), }; ( - (region, region), + NetworkBehaviourKey::new(region, region), NetworkBehaviour { delay: Duration::from_millis(100), drop: 0.0, @@ -199,7 +211,7 @@ mod tests { .collect(), }); let simulation_runner: SimulationRunner<(), DummyStreamingNode<()>, OutData> = - SimulationRunner::new(network, nodes, Default::default(), simulation_settings); + SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap(); simulation_runner.simulate().unwrap(); } } diff --git a/simulations/src/streaming/polars.rs b/simulations/src/streaming/polars.rs index d5652730..c000efec 100644 --- a/simulations/src/streaming/polars.rs +++ b/simulations/src/streaming/polars.rs @@ -1,3 +1,5 @@ +use super::{Receivers, StreamSettings}; +use crate::output_processors::{RecordType, Runtime}; use parking_lot::Mutex; use polars::prelude::*; use serde::{Deserialize, Serialize}; @@ -8,8 +10,6 @@ use std::{ str::FromStr, }; -use super::{Receivers, StreamSettings}; - #[derive(Debug, Clone, Copy, Serialize)] pub enum PolarsFormat { Json, @@ -45,7 +45,8 @@ impl<'de> Deserialize<'de> for PolarsFormat { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PolarsSettings { pub format: PolarsFormat, - pub path: PathBuf, + #[serde(skip_serializing_if = "Option::is_none")] + pub path: Option, } impl TryFrom for PolarsSettings { @@ -90,7 +91,7 @@ where impl super::Subscriber for PolarsSubscriber where - R: Serialize + Send + Sync + 'static, + R: crate::output_processors::Record + Serialize, { type Record = R; type Settings = PolarsSettings; @@ -110,9 +111,22 @@ where let this = PolarsSubscriber { data: Arc::new(Mutex::new(Vec::new())), recvs: Arc::new(recvs), - path: settings.path.clone(), + path: settings.path.clone().unwrap_or_else(|| { + let mut p = std::env::temp_dir().join("polars"); + match settings.format { + PolarsFormat::Json => p.set_extension("json"), + PolarsFormat::Csv => p.set_extension("csv"), + PolarsFormat::Parquet => p.set_extension("parquet"), + }; + p + }), format: settings.format, }; + tracing::info!( + target = "simulation", + "subscribed to {}", + this.path.display() + ); Ok(this) } @@ -124,6 +138,8 @@ where loop { crossbeam::select! { recv(self.recvs.stop_rx) -> _ => { + // collect the run time meta + self.sink(Arc::new(R::from(Runtime::load()?)))?; return self.persist(); } recv(self.recvs.recv) -> msg => { @@ -137,6 +153,10 @@ where self.data.lock().push(state); Ok(()) } + + fn subscribe_data_type() -> RecordType { + RecordType::Data + } } fn dump_dataframe_to_json(data: &mut DataFrame, out_path: &Path) -> anyhow::Result<()> { diff --git a/simulations/src/streaming/runtime_subscriber.rs b/simulations/src/streaming/runtime_subscriber.rs new file mode 100644 index 00000000..368c0bca --- /dev/null +++ b/simulations/src/streaming/runtime_subscriber.rs @@ -0,0 +1,202 @@ +use super::{Receivers, Subscriber}; +use crate::output_processors::{RecordType, Runtime}; +use serde::{Deserialize, Serialize}; +use std::{ + fs::{File, OpenOptions}, + io::Write, + path::PathBuf, + sync::{Arc, Mutex}, +}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RuntimeSettings { + pub path: PathBuf, +} + +impl Default for RuntimeSettings { + fn default() -> Self { + let mut tmp = std::env::temp_dir(); + tmp.push("simulation"); + tmp.set_extension("runtime"); + Self { path: tmp } + } +} + +#[derive(Debug)] +pub struct RuntimeSubscriber { + file: Arc>, + recvs: Arc>, +} + +impl Subscriber for RuntimeSubscriber +where + R: crate::output_processors::Record + Serialize, +{ + type Record = R; + + type Settings = RuntimeSettings; + + fn new( + record_recv: crossbeam::channel::Receiver>, + stop_recv: crossbeam::channel::Receiver<()>, + settings: Self::Settings, + ) -> anyhow::Result + where + Self: Sized, + { + let mut opts = OpenOptions::new(); + let recvs = Receivers { + stop_rx: stop_recv, + recv: record_recv, + }; + let this = RuntimeSubscriber { + file: Arc::new(Mutex::new( + opts.truncate(true) + .create(true) + .read(true) + .write(true) + .open(&settings.path)?, + )), + recvs: Arc::new(recvs), + }; + tracing::info!( + taget = "simulation", + "subscribed to {}", + settings.path.display() + ); + Ok(this) + } + + fn next(&self) -> Option>> { + Some(self.recvs.recv.recv().map_err(From::from)) + } + + fn run(self) -> anyhow::Result<()> { + crossbeam::select! { + recv(self.recvs.stop_rx) -> _ => { + // collect the run time meta + self.sink(Arc::new(R::from(Runtime::load()?)))?; + } + recv(self.recvs.recv) -> msg => { + self.sink(msg?)?; + } + } + + Ok(()) + } + + fn sink(&self, state: Arc) -> anyhow::Result<()> { + let mut file = self.file.lock().expect("failed to lock file"); + serde_json::to_writer(&mut *file, &state)?; + file.write_all(b",\n")?; + Ok(()) + } + + fn subscribe_data_type() -> RecordType { + RecordType::Data + } +} + +#[cfg(test)] +mod tests { + use std::{collections::HashMap, time::Duration}; + + use crate::{ + network::{ + behaviour::NetworkBehaviour, + regions::{Region, RegionsData}, + Network, NetworkBehaviourKey, + }, + node::{dummy_streaming::DummyStreamingNode, Node, NodeId}, + output_processors::OutData, + runner::SimulationRunner, + util::node_id, + warding::SimulationState, + }; + + use super::*; + #[derive(Debug, Clone, Serialize)] + struct RuntimeRecord { + states: HashMap, + } + + impl TryFrom<&SimulationState>> for RuntimeRecord { + type Error = anyhow::Error; + + fn try_from(value: &SimulationState>) -> Result { + Ok(Self { + states: value + .nodes + .read() + .iter() + .map(|node| (node.id(), node.current_view())) + .collect(), + }) + } + } + + #[test] + fn test_streaming() { + let simulation_settings = crate::settings::SimulationSettings { + seed: Some(1), + ..Default::default() + }; + + let nodes = (0..6) + .map(|idx| DummyStreamingNode::new(node_id(idx), ())) + .collect::>(); + let network = Network::new(RegionsData { + regions: (0..6) + .map(|idx| { + let region = match idx % 6 { + 0 => Region::Europe, + 1 => Region::NorthAmerica, + 2 => Region::SouthAmerica, + 3 => Region::Asia, + 4 => Region::Africa, + 5 => Region::Australia, + _ => unreachable!(), + }; + (region, vec![node_id(idx)]) + }) + .collect(), + node_region: (0..6) + .map(|idx| { + let region = match idx % 6 { + 0 => Region::Europe, + 1 => Region::NorthAmerica, + 2 => Region::SouthAmerica, + 3 => Region::Asia, + 4 => Region::Africa, + 5 => Region::Australia, + _ => unreachable!(), + }; + (node_id(idx), region) + }) + .collect(), + region_network_behaviour: (0..6) + .map(|idx| { + let region = match idx % 6 { + 0 => Region::Europe, + 1 => Region::NorthAmerica, + 2 => Region::SouthAmerica, + 3 => Region::Asia, + 4 => Region::Africa, + 5 => Region::Australia, + _ => unreachable!(), + }; + ( + NetworkBehaviourKey::new(region, region), + NetworkBehaviour { + delay: Duration::from_millis(100), + drop: 0.0, + }, + ) + }) + .collect(), + }); + let simulation_runner: SimulationRunner<(), DummyStreamingNode<()>, OutData> = + SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap(); + simulation_runner.simulate().unwrap(); + } +} diff --git a/simulations/src/streaming/settings_subscriber.rs b/simulations/src/streaming/settings_subscriber.rs new file mode 100644 index 00000000..17612be5 --- /dev/null +++ b/simulations/src/streaming/settings_subscriber.rs @@ -0,0 +1,202 @@ +use super::{Receivers, Subscriber}; +use crate::output_processors::{RecordType, Runtime}; +use serde::{Deserialize, Serialize}; +use std::{ + fs::{File, OpenOptions}, + io::Write, + path::PathBuf, + sync::{Arc, Mutex}, +}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SettingsSubscriberSettings { + pub path: PathBuf, +} + +impl Default for SettingsSubscriberSettings { + fn default() -> Self { + let mut tmp = std::env::temp_dir(); + tmp.push("simulation"); + tmp.set_extension("conf"); + Self { path: tmp } + } +} + +#[derive(Debug)] +pub struct SettingsSubscriber { + file: Arc>, + recvs: Arc>, +} + +impl Subscriber for SettingsSubscriber +where + R: crate::output_processors::Record + Serialize, +{ + type Record = R; + + type Settings = SettingsSubscriberSettings; + + fn new( + record_recv: crossbeam::channel::Receiver>, + stop_recv: crossbeam::channel::Receiver<()>, + settings: Self::Settings, + ) -> anyhow::Result + where + Self: Sized, + { + let mut opts = OpenOptions::new(); + let recvs = Receivers { + stop_rx: stop_recv, + recv: record_recv, + }; + let this = SettingsSubscriber { + file: Arc::new(Mutex::new( + opts.truncate(true) + .create(true) + .read(true) + .write(true) + .open(&settings.path)?, + )), + recvs: Arc::new(recvs), + }; + tracing::info!( + target = "simulation", + "subscribed to {}", + settings.path.display() + ); + Ok(this) + } + + fn next(&self) -> Option>> { + Some(self.recvs.recv.recv().map_err(From::from)) + } + + fn run(self) -> anyhow::Result<()> { + crossbeam::select! { + recv(self.recvs.stop_rx) -> _ => { + // collect the run time meta + self.sink(Arc::new(R::from(Runtime::load()?)))?; + } + recv(self.recvs.recv) -> msg => { + self.sink(msg?)?; + } + } + + Ok(()) + } + + fn sink(&self, state: Arc) -> anyhow::Result<()> { + let mut file = self.file.lock().expect("failed to lock file"); + serde_json::to_writer(&mut *file, &state)?; + file.write_all(b",\n")?; + Ok(()) + } + + fn subscribe_data_type() -> RecordType { + RecordType::Data + } +} + +#[cfg(test)] +mod tests { + use std::{collections::HashMap, time::Duration}; + + use crate::{ + network::{ + behaviour::NetworkBehaviour, + regions::{Region, RegionsData}, + Network, NetworkBehaviourKey, + }, + node::{dummy_streaming::DummyStreamingNode, Node, NodeId}, + output_processors::OutData, + runner::SimulationRunner, + util::node_id, + warding::SimulationState, + }; + + use super::*; + #[derive(Debug, Clone, Serialize)] + struct SettingsRecord { + states: HashMap, + } + + impl TryFrom<&SimulationState>> for SettingsRecord { + type Error = anyhow::Error; + + fn try_from(value: &SimulationState>) -> Result { + Ok(Self { + states: value + .nodes + .read() + .iter() + .map(|node| (node.id(), node.current_view())) + .collect(), + }) + } + } + + #[test] + fn test_streaming() { + let simulation_settings = crate::settings::SimulationSettings { + seed: Some(1), + ..Default::default() + }; + + let nodes = (0..6) + .map(|idx| DummyStreamingNode::new(node_id(idx), ())) + .collect::>(); + let network = Network::new(RegionsData { + regions: (0..6) + .map(|idx| { + let region = match idx % 6 { + 0 => Region::Europe, + 1 => Region::NorthAmerica, + 2 => Region::SouthAmerica, + 3 => Region::Asia, + 4 => Region::Africa, + 5 => Region::Australia, + _ => unreachable!(), + }; + (region, vec![node_id(idx)]) + }) + .collect(), + node_region: (0..6) + .map(|idx| { + let region = match idx % 6 { + 0 => Region::Europe, + 1 => Region::NorthAmerica, + 2 => Region::SouthAmerica, + 3 => Region::Asia, + 4 => Region::Africa, + 5 => Region::Australia, + _ => unreachable!(), + }; + (node_id(idx), region) + }) + .collect(), + region_network_behaviour: (0..6) + .map(|idx| { + let region = match idx % 6 { + 0 => Region::Europe, + 1 => Region::NorthAmerica, + 2 => Region::SouthAmerica, + 3 => Region::Asia, + 4 => Region::Africa, + 5 => Region::Australia, + _ => unreachable!(), + }; + ( + NetworkBehaviourKey::new(region, region), + NetworkBehaviour { + delay: Duration::from_millis(100), + drop: 0.0, + }, + ) + }) + .collect(), + }); + let simulation_runner: SimulationRunner<(), DummyStreamingNode<()>, OutData> = + SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap(); + simulation_runner.simulate().unwrap(); + } +} diff --git a/simulations/src/util.rs b/simulations/src/util.rs new file mode 100644 index 00000000..848efea2 --- /dev/null +++ b/simulations/src/util.rs @@ -0,0 +1,20 @@ +/// Create a random node id. +/// +/// The format is: +/// +/// [0..4]: node index in big endian +/// [4..32]: zeros +pub fn node_id(id: usize) -> consensus_engine::NodeId { + let mut bytes = [0; 32]; + bytes[..4].copy_from_slice((id as u32).to_be_bytes().as_ref()); + bytes +} + +/// Parse the original index from NodeId +pub(crate) fn parse_idx(id: &consensus_engine::NodeId) -> usize { + let mut bytes = [0; 4]; + bytes.copy_from_slice(&id[..4]); + u32::from_be_bytes(bytes) as usize +} + +pub(crate) mod millis_duration {} diff --git a/simulations/src/warding/minmax.rs b/simulations/src/warding/minmax.rs index 3151242b..9c358dea 100644 --- a/simulations/src/warding/minmax.rs +++ b/simulations/src/warding/minmax.rs @@ -1,10 +1,11 @@ use crate::node::Node; use crate::warding::{SimulationState, SimulationWard}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; /// MinMaxView. It monitors the gap between a min view and max view, triggers when surpassing /// the max view - min view is larger than a gap. -#[derive(Debug, Deserialize, Copy, Clone)] +#[derive(Debug, Serialize, Deserialize, Copy, Clone)] +#[serde(transparent)] pub struct MinMaxViewWard { max_gap: usize, } diff --git a/simulations/src/warding/mod.rs b/simulations/src/warding/mod.rs index 5a9d588e..8dec763a 100644 --- a/simulations/src/warding/mod.rs +++ b/simulations/src/warding/mod.rs @@ -2,7 +2,7 @@ use std::sync::Arc; // crates use parking_lot::RwLock; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; // internal use crate::node::Node; @@ -32,7 +32,7 @@ pub trait SimulationWard { /// Ward dispatcher /// Enum to avoid Boxing (Box) wards. -#[derive(Debug, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum Ward { MaxView(ttf::MaxViewWard), diff --git a/simulations/src/warding/stalled.rs b/simulations/src/warding/stalled.rs index 7612c7a3..2da479b3 100644 --- a/simulations/src/warding/stalled.rs +++ b/simulations/src/warding/stalled.rs @@ -1,9 +1,9 @@ use crate::node::Node; use crate::warding::{SimulationState, SimulationWard}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; /// StalledView. Track stalled nodes (e.g incoming queue is empty, the node doesn't write to other queues) -#[derive(Debug, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct StalledViewWard { // the hash checksum consecutive_viewed_checkpoint: Option, diff --git a/simulations/src/warding/ttf.rs b/simulations/src/warding/ttf.rs index c9a54597..a87c5b85 100644 --- a/simulations/src/warding/ttf.rs +++ b/simulations/src/warding/ttf.rs @@ -1,10 +1,11 @@ use crate::node::Node; use crate::warding::{SimulationState, SimulationWard}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; /// Time to finality ward. It monitors the amount of rounds of the simulations, triggers when surpassing /// the set threshold. -#[derive(Debug, Deserialize, Copy, Clone)] +#[derive(Debug, Serialize, Deserialize, Copy, Clone)] +#[serde(transparent)] pub struct MaxViewWard { max_view: usize, }