diff --git a/consensus-engine/src/lib.rs b/consensus-engine/src/lib.rs index 4ef9624a..dabbc6b0 100644 --- a/consensus-engine/src/lib.rs +++ b/consensus-engine/src/lib.rs @@ -396,7 +396,7 @@ impl Carnot { mod test { use std::convert::Infallible; - use crate::overlay::{FlatOverlay, RoundRobin, Settings}; + use crate::overlay::{FlatOverlay, FlatOverlaySettings, RoundRobin}; use super::*; @@ -413,7 +413,7 @@ mod test { leader_id: *nodes.first().unwrap(), }, }, - FlatOverlay::new(Settings { + FlatOverlay::new(FlatOverlaySettings { nodes, leader: RoundRobin::default(), leader_super_majority_threshold: None, diff --git a/consensus-engine/src/overlay/flat_overlay.rs b/consensus-engine/src/overlay/flat_overlay.rs index ef9270a9..cb466d9d 100644 --- a/consensus-engine/src/overlay/flat_overlay.rs +++ b/consensus-engine/src/overlay/flat_overlay.rs @@ -17,11 +17,11 @@ impl Overlay for FlatOverlay where L: LeaderSelection + Send + Sync + 'static, { - type Settings = Settings; + type Settings = FlatOverlaySettings; type LeaderSelection = L; fn new( - Settings { + FlatOverlaySettings { leader, nodes, leader_super_majority_threshold, @@ -135,7 +135,7 @@ impl LeaderSelection for RoundRobin { #[derive(Clone, Debug, Default)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub struct Settings { +pub struct FlatOverlaySettings { pub nodes: Vec, /// A fraction representing the threshold in the form `/' /// Defaults to 2/3 diff --git a/consensus-engine/tests/fuzz/sut.rs b/consensus-engine/tests/fuzz/sut.rs index d914d12e..2c29111b 100644 --- a/consensus-engine/tests/fuzz/sut.rs +++ b/consensus-engine/tests/fuzz/sut.rs @@ -1,7 +1,7 @@ use std::{collections::HashSet, panic}; use consensus_engine::{ - overlay::{FlatOverlay, RoundRobin, Settings}, + overlay::{FlatOverlay, FlatOverlaySettings, RoundRobin}, *, }; use proptest_state_machine::{ReferenceStateMachine, StateMachineTest}; @@ -28,7 +28,7 @@ impl ConsensusEngineTest { leader_id: NodeId::new([0; 32]), }, }, - FlatOverlay::new(Settings { + FlatOverlay::new(FlatOverlaySettings { nodes: vec![NodeId::new([0; 32])], leader: RoundRobin::default(), leader_super_majority_threshold: None, diff --git a/simulations/config/carnot.json b/simulations/config/carnot.json index 61aa17c4..9231f25b 100644 --- a/simulations/config/carnot.json +++ b/simulations/config/carnot.json @@ -17,7 +17,9 @@ "asia": 0.3 } }, - "overlay_settings": "Flat", + "overlay_settings": { + "number_of_committees": 3 + }, "node_settings": { "timeout": "1000ms" }, diff --git a/simulations/src/bin/app/main.rs b/simulations/src/bin/app/main.rs index 86da1635..9a77c161 100644 --- a/simulations/src/bin/app/main.rs +++ b/simulations/src/bin/app/main.rs @@ -1,13 +1,13 @@ // std use anyhow::Ok; use serde::Serialize; -use simulations::node::carnot::CarnotSettings; +use simulations::node::carnot::{CarnotSettings, CarnotState}; use std::fs::File; use std::path::{Path, PathBuf}; use std::time::{SystemTime, UNIX_EPOCH}; // crates use clap::Parser; -use consensus_engine::overlay::{FlatOverlay, RandomBeaconState, RoundRobin}; +use consensus_engine::overlay::RandomBeaconState; use consensus_engine::{Block, View}; use crossbeam::channel; use rand::rngs::SmallRng; @@ -17,18 +17,18 @@ use serde::de::DeserializeOwned; use simulations::network::behaviour::create_behaviours; use simulations::network::regions::{create_regions, RegionsData}; use simulations::network::{InMemoryNetworkInterface, Network}; -use simulations::node::{Node, NodeId, NodeIdExt}; +use simulations::node::{NodeId, NodeIdExt}; use simulations::output_processors::Record; -use simulations::runner::SimulationRunnerHandle; +use simulations::runner::{BoxedNode, SimulationRunnerHandle}; use simulations::streaming::{ io::IOSubscriber, naive::NaiveSubscriber, polars::PolarsSubscriber, StreamType, }; // internal use simulations::{ - node::carnot::CarnotNode, output_processors::OutData, runner::SimulationRunner, - settings::SimulationSettings, + output_processors::OutData, runner::SimulationRunner, settings::SimulationSettings, }; mod log; +mod overlay_node; /// Main simulation wrapper /// Pipes together the cli arguments with the execution @@ -73,7 +73,7 @@ impl SimulationApp { let ids = node_ids.clone(); let mut network = Network::new(regions_data); - let nodes = node_ids + let nodes: Vec> = node_ids .iter() .copied() .map(|node_id| { @@ -86,11 +86,7 @@ impl SimulationApp { ); let nodes: Vec = ids.clone().into_iter().map(Into::into).collect(); let leader = nodes.first().copied().unwrap(); - let overlay_settings = consensus_engine::overlay::Settings { - nodes: nodes.to_vec(), - leader: RoundRobin::new(), - leader_super_majority_threshold: None, - }; + // FIXME: Actually use a proposer and a key to generate random beacon state let genesis = nomos_core::block::Block::new( View::new(0), @@ -101,39 +97,36 @@ impl SimulationApp { entropy: Box::new([0; 32]), }, ); - CarnotNode::>::new( + overlay_node::to_overlay_node( node_id, - CarnotSettings::new( - simulation_settings.node_settings.timeout, - simulation_settings.record_settings.clone(), - ), - overlay_settings, - genesis, + nodes, + leader, network_interface, + genesis, &mut rng, + &simulation_settings, ) }) .collect(); - run(network, nodes, simulation_settings, stream_type)?; + run::<_, _, _>(network, nodes, simulation_settings, stream_type)?; Ok(()) } } -fn run( +fn run( network: Network, - nodes: Vec, + nodes: Vec>, settings: SimulationSettings, stream_type: Option, ) -> anyhow::Result<()> where M: Clone + Send + Sync + 'static, - N: Send + Sync + 'static, - N::Settings: Clone + Send, - N::State: Serialize, + S: 'static, + T: Serialize + 'static, { let stream_settings = settings.stream_settings.clone(); let runner = - SimulationRunner::<_, _, OutData>::new(network, nodes, Default::default(), settings)?; + SimulationRunner::<_, OutData, S, T>::new(network, nodes, Default::default(), settings)?; let handle = match stream_type { Some(StreamType::Naive) => { diff --git a/simulations/src/bin/app/overlay_node.rs b/simulations/src/bin/app/overlay_node.rs new file mode 100644 index 00000000..d6e6ab34 --- /dev/null +++ b/simulations/src/bin/app/overlay_node.rs @@ -0,0 +1,62 @@ +use consensus_engine::{ + overlay::{FlatOverlay, RoundRobin, TreeOverlay}, + NodeId, +}; +use rand::Rng; +use simulations::{ + network::InMemoryNetworkInterface, + node::carnot::{messages::CarnotMessage, CarnotNode, CarnotSettings, CarnotState}, + runner::BoxedNode, + settings::SimulationSettings, +}; + +pub fn to_overlay_node( + node_id: NodeId, + nodes: Vec, + leader: NodeId, + network_interface: InMemoryNetworkInterface, + genesis: nomos_core::block::Block<[u8; 32]>, + mut rng: R, + settings: &SimulationSettings, +) -> BoxedNode { + match &settings.overlay_settings { + simulations::settings::OverlaySettings::Flat => { + let overlay_settings = consensus_engine::overlay::FlatOverlaySettings { + nodes: nodes.to_vec(), + leader: RoundRobin::new(), + leader_super_majority_threshold: None, + }; + Box::new(CarnotNode::>::new( + node_id, + CarnotSettings::new( + settings.node_settings.timeout, + settings.record_settings.clone(), + ), + overlay_settings, + genesis, + network_interface, + &mut rng, + )) + } + simulations::settings::OverlaySettings::Tree(tree_settings) => { + let overlay_settings = consensus_engine::overlay::TreeOverlaySettings { + nodes, + current_leader: leader, + entropy: [0; 32], + number_of_committees: tree_settings.number_of_committees, + leader: RoundRobin::new(), + }; + Box::new(CarnotNode::>::new( + node_id, + CarnotSettings::new( + settings.node_settings.timeout, + settings.record_settings.clone(), + ), + overlay_settings, + genesis, + network_interface, + &mut rng, + )) + } + } +} diff --git a/simulations/src/node/carnot/mod.rs b/simulations/src/node/carnot/mod.rs index baae6055..035ec25b 100644 --- a/simulations/src/node/carnot/mod.rs +++ b/simulations/src/node/carnot/mod.rs @@ -2,7 +2,7 @@ mod event_builder; mod message_cache; -mod messages; +pub mod messages; mod tally; mod timeout; diff --git a/simulations/src/output_processors/mod.rs b/simulations/src/output_processors/mod.rs index 07726e51..8fb4d8c2 100644 --- a/simulations/src/output_processors/mod.rs +++ b/simulations/src/output_processors/mod.rs @@ -95,17 +95,20 @@ impl OutData { } } -impl TryFrom<&SimulationState> for OutData -where - N: crate::node::Node, - N::State: Serialize, -{ +impl TryFrom<&SimulationState> for OutData { type Error = anyhow::Error; - fn try_from(state: &crate::warding::SimulationState) -> Result { - serde_json::to_value(state.nodes.read().iter().map(N::state).collect::>()) - .map(OutData::new) - .map_err(From::from) + fn try_from(state: &SimulationState) -> Result { + serde_json::to_value( + state + .nodes + .read() + .iter() + .map(|n| n.state()) + .collect::>(), + ) + .map(OutData::new) + .map_err(From::from) } } diff --git a/simulations/src/runner/async_runner.rs b/simulations/src/runner/async_runner.rs index b61b20fb..23047124 100644 --- a/simulations/src/runner/async_runner.rs +++ b/simulations/src/runner/async_runner.rs @@ -1,4 +1,4 @@ -use crate::node::{Node, NodeId}; +use crate::node::NodeId; use crate::output_processors::Record; use crate::runner::SimulationRunner; use crate::warding::SimulationState; @@ -6,7 +6,6 @@ use crossbeam::channel::bounded; use crossbeam::select; use rand::prelude::SliceRandom; use rayon::prelude::*; -use serde::Serialize; use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; @@ -14,26 +13,25 @@ use std::time::Duration; use super::SimulationRunnerHandle; /// Simulate with sending the network state to any subscriber -pub fn simulate( - runner: SimulationRunner, +pub fn simulate( + runner: SimulationRunner, chunk_size: usize, ) -> anyhow::Result> where M: Clone + Send + Sync + 'static, - N: Send + Sync + 'static, - N::Settings: Clone + Send, - N::State: Serialize, R: Record - + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + Send + Sync + 'static, + S: 'static, + T: 'static, { - let simulation_state = SimulationState:: { + let simulation_state = SimulationState { nodes: Arc::clone(&runner.nodes), }; - let mut node_ids: Vec = runner.nodes.read().iter().map(N::id).collect(); + let mut node_ids: Vec = runner.nodes.read().iter().map(|n| n.id()).collect(); let mut inner_runner = runner.inner; let nodes = runner.nodes; diff --git a/simulations/src/runner/glauber_runner.rs b/simulations/src/runner/glauber_runner.rs index 8859c3ed..e8f452fb 100644 --- a/simulations/src/runner/glauber_runner.rs +++ b/simulations/src/runner/glauber_runner.rs @@ -5,7 +5,7 @@ use crate::warding::SimulationState; use crossbeam::channel::bounded; use crossbeam::select; use rand::prelude::IteratorRandom; -use serde::Serialize; + use std::collections::BTreeSet; use std::sync::Arc; use std::time::Duration; @@ -15,21 +15,20 @@ use super::SimulationRunnerHandle; /// Simulate with sending the network state to any subscriber. /// /// [Glauber dynamics simulation](https://en.wikipedia.org/wiki/Glauber_dynamics) -pub fn simulate( - runner: SimulationRunner, +pub fn simulate( + runner: SimulationRunner, update_rate: usize, maximum_iterations: usize, ) -> anyhow::Result> where M: Send + Sync + Clone + 'static, - N: Send + Sync + 'static, - N::Settings: Clone + Send, - N::State: Serialize, R: Record - + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + Send + Sync + 'static, + S: 'static, + T: 'static, { let simulation_state = SimulationState { nodes: Arc::clone(&runner.nodes), @@ -60,7 +59,7 @@ where { let mut shared_nodes = nodes.write(); - let node: &mut N = shared_nodes + let node: &mut dyn Node = &mut **shared_nodes .get_mut(node_id.index()) .expect("Node should be present"); node.step(elapsed); diff --git a/simulations/src/runner/layered_runner.rs b/simulations/src/runner/layered_runner.rs index 6b94d868..c0a91b59 100644 --- a/simulations/src/runner/layered_runner.rs +++ b/simulations/src/runner/layered_runner.rs @@ -35,10 +35,10 @@ use std::ops::Not; use std::sync::Arc; use std::time::Duration; // crates + use fixed_slice_deque::FixedSliceDeque; use rand::prelude::{IteratorRandom, SliceRandom}; use rand::rngs::SmallRng; -use serde::Serialize; // internal use crate::node::{Node, NodeId, NodeIdExt}; use crate::output_processors::Record; @@ -48,28 +48,27 @@ use crate::warding::SimulationState; use super::SimulationRunnerHandle; /// Simulate with sending the network state to any subscriber -pub fn simulate( - runner: SimulationRunner, +pub fn simulate( + runner: SimulationRunner, gap: usize, distribution: Option>, ) -> anyhow::Result> where M: Send + Sync + Clone + 'static, - N: Send + Sync + 'static, - N::Settings: Clone + Send, - N::State: Serialize, R: Record - + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + Send + Sync + 'static, + S: 'static, + T: 'static, { let distribution = distribution.unwrap_or_else(|| std::iter::repeat(1.0f32).take(gap).collect()); let layers: Vec = (0..gap).collect(); - let mut deque = build_node_ids_deque::(gap, &runner); + let mut deque = build_node_ids_deque::(gap, &runner); let simulation_state = SimulationState { nodes: Arc::clone(&runner.nodes), @@ -96,7 +95,7 @@ where { let mut shared_nodes = nodes.write(); - let node: &mut N = shared_nodes + let node: &mut dyn Node = &mut **shared_nodes .get_mut(node_id.index()) .expect("Node should be present"); let prev_view = node.current_view(); @@ -164,13 +163,10 @@ fn choose_random_layer_and_node_id( (i, *node_id) } -fn build_node_ids_deque( +fn build_node_ids_deque( gap: usize, - runner: &SimulationRunner, -) -> FixedSliceDeque> -where - N: Node, -{ + runner: &SimulationRunner, +) -> FixedSliceDeque> { // add a +1 so we always have let mut deque = FixedSliceDeque::new(gap + 1); // push first layer diff --git a/simulations/src/runner/mod.rs b/simulations/src/runner/mod.rs index 44c64bbd..6fbaf4e7 100644 --- a/simulations/src/runner/mod.rs +++ b/simulations/src/runner/mod.rs @@ -26,6 +26,8 @@ use crate::node::Node; use crate::settings::{RunnerSettings, SimulationSettings}; use crate::warding::{SimulationState, SimulationWard, Ward}; +pub type BoxedNode = Box + Send + Sync>; + pub struct SimulationRunnerHandle { producer: StreamProducer, stop_tx: Sender<()>, @@ -68,24 +70,14 @@ impl SimulationRunnerInner where M: Send + Sync + Clone, { - fn check_wards(&mut self, state: &SimulationState) -> bool - where - N: Node + Send + Sync, - N::Settings: Clone + Send, - N::State: Serialize, - { + fn check_wards(&mut self, state: &SimulationState) -> bool { self.wards .par_iter_mut() .map(|ward| ward.analyze(state)) .any(|x| x) } - fn step(&mut self, nodes: &mut [N], elapsed: Duration) - where - N: Node + Send + Sync, - N::Settings: Clone + Send, - N::State: Serialize, - { + fn step(&mut self, nodes: &mut [BoxedNode], elapsed: Duration) { self.network.dispatch_after(elapsed); nodes.par_iter_mut().for_each(|node| { node.step(elapsed); @@ -96,31 +88,27 @@ where /// Encapsulation solution for the simulations runner /// Holds the network state, the simulating nodes and the simulation settings. -pub struct SimulationRunner -where - N: Node, -{ +pub struct SimulationRunner { inner: SimulationRunnerInner, - nodes: Arc>>, + nodes: Arc>>>, runner_settings: RunnerSettings, producer: StreamProducer, } -impl SimulationRunner +impl SimulationRunner where M: Clone + Send + Sync + 'static, - N: Send + Sync + 'static, - N::Settings: Clone + Send, - N::State: Serialize, R: Record - + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + Send + Sync + 'static, + S: 'static, + T: Serialize + 'static, { pub fn new( network: Network, - nodes: Vec, + nodes: Vec>, producer: StreamProducer, mut settings: SimulationSettings, ) -> anyhow::Result { @@ -182,28 +170,27 @@ where } } -impl SimulationRunner +impl SimulationRunner where M: Clone + Send + Sync + 'static, - N: Send + Sync + 'static, - N::Settings: Clone + Send, - N::State: Serialize, R: Record + serde::Serialize - + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + Send + Sync + 'static, + S: 'static, + T: Serialize + 'static, { - pub fn simulate_and_subscribe( + pub fn simulate_and_subscribe( self, - settings: S::Settings, + settings: B::Settings, ) -> anyhow::Result> where - S: Subscriber + Send + Sync + 'static, + B: Subscriber + Send + Sync + 'static, { let handle = self.simulate()?; - let mut data_subscriber_handle = handle.subscribe::(settings)?; + let mut data_subscriber_handle = handle.subscribe::(settings)?; let mut runtime_subscriber_handle = handle.subscribe::>(Default::default())?; let mut settings_subscriber_handle = diff --git a/simulations/src/runner/sync_runner.rs b/simulations/src/runner/sync_runner.rs index 5436b878..b1e471a1 100644 --- a/simulations/src/runner/sync_runner.rs +++ b/simulations/src/runner/sync_runner.rs @@ -1,26 +1,23 @@ -use serde::Serialize; - use super::{SimulationRunner, SimulationRunnerHandle}; +use crate::output_processors::Record; use crate::warding::SimulationState; -use crate::{node::Node, output_processors::Record}; use crossbeam::channel::{bounded, select}; use std::sync::Arc; use std::time::Duration; /// Simulate with sending the network state to any subscriber -pub fn simulate( - runner: SimulationRunner, +pub fn simulate( + runner: SimulationRunner, ) -> anyhow::Result> where M: Send + Sync + Clone + 'static, - N: Send + Sync + 'static, - N::Settings: Clone + Send, - N::State: Serialize, R: Record - + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + Send + Sync + 'static, + S: 'static, + T: 'static, { let state = SimulationState { nodes: Arc::clone(&runner.nodes), @@ -74,11 +71,14 @@ mod tests { InMemoryNetworkInterface, Network, NetworkBehaviourKey, }, node::{ - dummy::{DummyMessage, DummyNode}, + dummy::{DummyMessage, DummyNode, DummySettings, DummyState}, Node, NodeId, NodeIdExt, OverlayState, SharedState, ViewOverlay, }, output_processors::OutData, - overlay::{tree::TreeOverlay, Overlay, SimulationOverlay}, + overlay::{ + tree::{TreeOverlay, TreeSettings}, + Overlay, SimulationOverlay, + }, runner::SimulationRunner, settings::SimulationSettings, streaming::StreamProducer, @@ -131,7 +131,7 @@ mod tests { let mut rng = StepRng::new(1, 0); let node_ids: Vec = (0..settings.node_count).map(NodeId::from_index).collect(); - let overlay = TreeOverlay::new(settings.overlay_settings.clone().try_into().unwrap()); + let overlay = TreeOverlay::new(TreeSettings::default()); let mut network = init_network(&node_ids); let view = ViewOverlay { leaders: overlay.leaders(&node_ids, 1, &mut rng).collect(), @@ -142,11 +142,24 @@ mod tests { overlay: SimulationOverlay::Tree(overlay), overlays: BTreeMap::from([(0, view.clone()), (1, view)]), })); - let nodes = init_dummy_nodes(&node_ids, &mut network, overlay_state); + let nodes = init_dummy_nodes(&node_ids, &mut network, overlay_state) + .into_iter() + .map(|n| { + Box::new(n) + as Box< + dyn Node + + std::marker::Send + + Sync, + > + }) + .collect(); let producer = StreamProducer::default(); - let mut runner: SimulationRunner = - SimulationRunner::new(network, nodes, producer, settings).unwrap(); + let mut runner: SimulationRunner = + SimulationRunner::<_, OutData, DummySettings, DummyState>::new( + network, nodes, producer, settings, + ) + .unwrap(); let mut nodes = runner.nodes.write(); runner.inner.step(&mut nodes, Duration::from_millis(100)); drop(nodes); @@ -166,7 +179,7 @@ mod tests { let mut rng = StepRng::new(1, 0); let node_ids: Vec = (0..settings.node_count).map(NodeId::from_index).collect(); - let overlay = TreeOverlay::new(settings.overlay_settings.clone().try_into().unwrap()); + let overlay = TreeOverlay::new(TreeSettings::default()); let mut network = init_network(&node_ids); let view = ViewOverlay { leaders: overlay.leaders(&node_ids, 1, &mut rng).collect(), @@ -182,7 +195,7 @@ mod tests { (43, view), ]), })); - let nodes = init_dummy_nodes(&node_ids, &mut network, overlay_state); + let nodes = init_dummy_nodes(&node_ids, &mut network, overlay_state.clone()); for node in nodes.iter() { // All nodes send one message to NodeId(1). @@ -191,7 +204,19 @@ mod tests { } network.collect_messages(); - let mut runner: SimulationRunner = + let nodes = init_dummy_nodes(&node_ids, &mut network, overlay_state) + .into_iter() + .map(|n| { + Box::new(n) + as Box< + dyn Node + + std::marker::Send + + Sync, + > + }) + .collect(); + + let mut runner: SimulationRunner = SimulationRunner::new(network, nodes, Default::default(), settings).unwrap(); let mut nodes = runner.nodes.write(); diff --git a/simulations/src/settings.rs b/simulations/src/settings.rs index 5aeed54c..030d9197 100644 --- a/simulations/src/settings.rs +++ b/simulations/src/settings.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use crate::network::NetworkSettings; -use crate::overlay::OverlaySettings; use crate::streaming::StreamSettings; use crate::warding::Ward; use serde::{Deserialize, Serialize}; @@ -23,6 +22,19 @@ pub enum RunnerSettings { }, } +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +#[serde(untagged)] +pub enum OverlaySettings { + #[default] + Flat, + Tree(TreeSettings), +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct TreeSettings { + pub number_of_committees: usize, +} + #[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct NodeSettings { #[serde(with = "humantime_serde")] diff --git a/simulations/src/streaming/io.rs b/simulations/src/streaming/io.rs index 9dc3c8f2..87db5cc8 100644 --- a/simulations/src/streaming/io.rs +++ b/simulations/src/streaming/io.rs @@ -118,7 +118,10 @@ mod tests { regions::{Region, RegionsData}, Network, NetworkBehaviourKey, }, - node::{dummy_streaming::DummyStreamingNode, Node, NodeId, NodeIdExt}, + node::{ + dummy_streaming::{DummyStreamingNode, DummyStreamingState}, + Node, NodeId, NodeIdExt, + }, output_processors::OutData, runner::SimulationRunner, warding::SimulationState, @@ -130,10 +133,10 @@ mod tests { states: HashMap, } - impl TryFrom<&SimulationState>> for IORecord { + impl TryFrom<&SimulationState> for IORecord { type Error = anyhow::Error; - fn try_from(value: &SimulationState>) -> Result { + fn try_from(value: &SimulationState) -> Result { let nodes = value.nodes.read(); Ok(Self { states: nodes @@ -152,7 +155,14 @@ mod tests { }; let nodes = (0..6) - .map(|idx| DummyStreamingNode::new(NodeId::from_index(idx), ())) + .map(|idx| { + Box::new(DummyStreamingNode::new(NodeId::from_index(idx), ())) + as Box< + dyn Node + + std::marker::Send + + Sync, + > + }) .collect::>(); let network = Network::new(RegionsData { regions: (0..6) @@ -204,7 +214,7 @@ mod tests { }) .collect(), }); - let simulation_runner: SimulationRunner<(), DummyStreamingNode<()>, OutData> = + let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> = SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap(); simulation_runner .simulate() diff --git a/simulations/src/streaming/naive.rs b/simulations/src/streaming/naive.rs index dce47dab..d37d22c0 100644 --- a/simulations/src/streaming/naive.rs +++ b/simulations/src/streaming/naive.rs @@ -122,7 +122,10 @@ mod tests { regions::{Region, RegionsData}, Network, NetworkBehaviourKey, }, - node::{dummy_streaming::DummyStreamingNode, Node, NodeId, NodeIdExt}, + node::{ + dummy_streaming::{DummyStreamingNode, DummyStreamingState}, + Node, NodeId, NodeIdExt, + }, output_processors::OutData, runner::SimulationRunner, warding::SimulationState, @@ -134,10 +137,10 @@ mod tests { states: HashMap, } - impl TryFrom<&SimulationState>> for NaiveRecord { + impl TryFrom<&SimulationState> for NaiveRecord { type Error = anyhow::Error; - fn try_from(value: &SimulationState>) -> Result { + fn try_from(value: &SimulationState) -> Result { Ok(Self { states: value .nodes @@ -157,7 +160,14 @@ mod tests { }; let nodes = (0..6) - .map(|idx| DummyStreamingNode::new(NodeId::from_index(idx), ())) + .map(|idx| { + Box::new(DummyStreamingNode::new(NodeId::from_index(idx), ())) + as Box< + dyn Node + + std::marker::Send + + Sync, + > + }) .collect::>(); let network = Network::new(RegionsData { regions: (0..6) @@ -209,7 +219,7 @@ mod tests { }) .collect(), }); - let simulation_runner: SimulationRunner<(), DummyStreamingNode<()>, OutData> = + let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> = SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap(); simulation_runner.simulate().unwrap(); } diff --git a/simulations/src/streaming/runtime_subscriber.rs b/simulations/src/streaming/runtime_subscriber.rs index 313e06cd..4d69c2c7 100644 --- a/simulations/src/streaming/runtime_subscriber.rs +++ b/simulations/src/streaming/runtime_subscriber.rs @@ -107,7 +107,10 @@ mod tests { regions::{Region, RegionsData}, Network, NetworkBehaviourKey, }, - node::{dummy_streaming::DummyStreamingNode, Node, NodeId, NodeIdExt}, + node::{ + dummy_streaming::{DummyStreamingNode, DummyStreamingState}, + Node, NodeId, NodeIdExt, + }, output_processors::OutData, runner::SimulationRunner, warding::SimulationState, @@ -119,10 +122,10 @@ mod tests { states: HashMap, } - impl TryFrom<&SimulationState>> for RuntimeRecord { + impl TryFrom<&SimulationState> for RuntimeRecord { type Error = anyhow::Error; - fn try_from(value: &SimulationState>) -> Result { + fn try_from(value: &SimulationState) -> Result { Ok(Self { states: value .nodes @@ -142,7 +145,14 @@ mod tests { }; let nodes = (0..6) - .map(|idx| DummyStreamingNode::new(NodeId::from_index(idx), ())) + .map(|idx| { + Box::new(DummyStreamingNode::new(NodeId::from_index(idx), ())) + as Box< + dyn Node + + std::marker::Send + + Sync, + > + }) .collect::>(); let network = Network::new(RegionsData { regions: (0..6) @@ -194,7 +204,7 @@ mod tests { }) .collect(), }); - let simulation_runner: SimulationRunner<(), DummyStreamingNode<()>, OutData> = + let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> = SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap(); simulation_runner.simulate().unwrap(); } diff --git a/simulations/src/streaming/settings_subscriber.rs b/simulations/src/streaming/settings_subscriber.rs index 6b1bfcf5..f76a8c37 100644 --- a/simulations/src/streaming/settings_subscriber.rs +++ b/simulations/src/streaming/settings_subscriber.rs @@ -107,7 +107,10 @@ mod tests { regions::{Region, RegionsData}, Network, NetworkBehaviourKey, }, - node::{dummy_streaming::DummyStreamingNode, Node, NodeId, NodeIdExt}, + node::{ + dummy_streaming::{DummyStreamingNode, DummyStreamingState}, + Node, NodeId, NodeIdExt, + }, output_processors::OutData, runner::SimulationRunner, warding::SimulationState, @@ -119,10 +122,10 @@ mod tests { states: HashMap, } - impl TryFrom<&SimulationState>> for SettingsRecord { + impl TryFrom<&SimulationState> for SettingsRecord { type Error = anyhow::Error; - fn try_from(value: &SimulationState>) -> Result { + fn try_from(value: &SimulationState) -> Result { Ok(Self { states: value .nodes @@ -142,7 +145,14 @@ mod tests { }; let nodes = (0..6) - .map(|idx| DummyStreamingNode::new(NodeId::from_index(idx), ())) + .map(|idx| { + Box::new(DummyStreamingNode::new(NodeId::from_index(idx), ())) + as Box< + dyn Node + + std::marker::Send + + Sync, + > + }) .collect::>(); let network = Network::new(RegionsData { regions: (0..6) @@ -194,7 +204,7 @@ mod tests { }) .collect(), }); - let simulation_runner: SimulationRunner<(), DummyStreamingNode<()>, OutData> = + let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> = SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap(); simulation_runner.simulate().unwrap(); } diff --git a/simulations/src/warding/minmax.rs b/simulations/src/warding/minmax.rs index 9c358dea..d02f0765 100644 --- a/simulations/src/warding/minmax.rs +++ b/simulations/src/warding/minmax.rs @@ -1,4 +1,3 @@ -use crate::node::Node; use crate::warding::{SimulationState, SimulationWard}; use serde::{Deserialize, Serialize}; @@ -10,8 +9,8 @@ pub struct MinMaxViewWard { max_gap: usize, } -impl SimulationWard for MinMaxViewWard { - type SimulationState = SimulationState; +impl SimulationWard for MinMaxViewWard { + type SimulationState = SimulationState; fn analyze(&mut self, state: &Self::SimulationState) -> bool { let mut min = usize::MAX; let mut max = 0; @@ -36,13 +35,13 @@ mod test { fn rebase_threshold() { let mut minmax = MinMaxViewWard { max_gap: 5 }; let state = SimulationState { - nodes: Arc::new(RwLock::new(vec![10])), + nodes: Arc::new(RwLock::new(vec![Box::new(10)])), }; // we only have one node, so always false assert!(!minmax.analyze(&state)); // push a new node with 10 - state.nodes.write().push(20); + state.nodes.write().push(Box::new(20)); // we now have two nodes and the max - min is 10 > max_gap 5, so true assert!(minmax.analyze(&state)); } diff --git a/simulations/src/warding/mod.rs b/simulations/src/warding/mod.rs index 8dec763a..54c3c26f 100644 --- a/simulations/src/warding/mod.rs +++ b/simulations/src/warding/mod.rs @@ -4,19 +4,19 @@ use std::sync::Arc; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; // internal -use crate::node::Node; +use crate::runner::BoxedNode; mod minmax; mod stalled; mod ttf; -pub struct SimulationState { - pub nodes: Arc>>, +pub struct SimulationState { + pub nodes: Arc>>>, } -impl SimulationState { +impl SimulationState { #[inline] - pub fn new(nodes: Vec) -> Self { + pub fn new(nodes: Vec>) -> Self { Self { nodes: Arc::new(RwLock::new(nodes)), } @@ -25,7 +25,7 @@ impl SimulationState { /// A ward is a computation over the `NetworkState`, it must return true if the state satisfies /// the warding conditions. It is used to stop the consensus simulation if such condition is reached. -pub trait SimulationWard { +pub trait SimulationWard { type SimulationState; fn analyze(&mut self, state: &Self::SimulationState) -> bool; } @@ -41,9 +41,9 @@ pub enum Ward { } impl Ward { - pub fn simulation_ward_mut( + pub fn simulation_ward_mut( &mut self, - ) -> &mut dyn SimulationWard> { + ) -> &mut dyn SimulationWard> { match self { Ward::MaxView(ward) => ward, Ward::MinMaxView(ward) => ward, @@ -52,8 +52,8 @@ impl Ward { } } -impl SimulationWard for Ward { - type SimulationState = SimulationState; +impl SimulationWard for Ward { + type SimulationState = SimulationState; fn analyze(&mut self, state: &Self::SimulationState) -> bool { self.simulation_ward_mut().analyze(state) } diff --git a/simulations/src/warding/stalled.rs b/simulations/src/warding/stalled.rs index 2da479b3..0a54c4de 100644 --- a/simulations/src/warding/stalled.rs +++ b/simulations/src/warding/stalled.rs @@ -1,4 +1,4 @@ -use crate::node::Node; +use crate::runner::BoxedNode; use crate::warding::{SimulationState, SimulationWard}; use serde::{Deserialize, Serialize}; @@ -31,8 +31,8 @@ impl StalledViewWard { } } -impl SimulationWard for StalledViewWard { - type SimulationState = SimulationState; +impl SimulationWard for StalledViewWard { + type SimulationState = SimulationState; fn analyze(&mut self, state: &Self::SimulationState) -> bool { let nodes = state.nodes.read(); self.update_state(checksum(nodes.as_slice())); @@ -41,7 +41,7 @@ impl SimulationWard for StalledViewWard { } #[inline] -fn checksum(nodes: &[N]) -> u32 { +fn checksum(nodes: &[BoxedNode]) -> u32 { let mut hash = crc32fast::Hasher::new(); for node in nodes.iter() { hash.update(&node.current_view().to_be_bytes()); @@ -65,7 +65,7 @@ mod test { threshold: 2, }; let state = SimulationState { - nodes: Arc::new(RwLock::new(vec![10])), + nodes: Arc::new(RwLock::new(vec![Box::new(10)])), }; // increase the criterion, 1 @@ -76,7 +76,7 @@ mod test { assert!(stalled.analyze(&state)); // push a new one, so the criterion is reset to 0 - state.nodes.write().push(20); + state.nodes.write().push(Box::new(20)); assert!(!stalled.analyze(&state)); // increase the criterion, 2 diff --git a/simulations/src/warding/ttf.rs b/simulations/src/warding/ttf.rs index a87c5b85..f0404a4a 100644 --- a/simulations/src/warding/ttf.rs +++ b/simulations/src/warding/ttf.rs @@ -1,4 +1,3 @@ -use crate::node::Node; use crate::warding::{SimulationState, SimulationWard}; use serde::{Deserialize, Serialize}; @@ -10,8 +9,8 @@ pub struct MaxViewWard { max_view: usize, } -impl SimulationWard for MaxViewWard { - type SimulationState = SimulationState; +impl SimulationWard for MaxViewWard { + type SimulationState = SimulationState; fn analyze(&mut self, state: &Self::SimulationState) -> bool { state .nodes @@ -34,11 +33,11 @@ mod test { let node = 11; let state = SimulationState { - nodes: Arc::new(RwLock::new(vec![node])), + nodes: Arc::new(RwLock::new(vec![Box::new(node)])), }; assert!(ttf.analyze(&state)); - state.nodes.write().push(9); + state.nodes.write().push(Box::new(9)); assert!(!ttf.analyze(&state)); } } diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index 897bd44c..58179231 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -4,7 +4,7 @@ use std::process::{Child, Command, Stdio}; use std::time::Duration; // internal use crate::{get_available_port, Node, SpawnConfig, RNG}; -use consensus_engine::overlay::{RoundRobin, Settings}; +use consensus_engine::overlay::{FlatOverlaySettings, RoundRobin}; use consensus_engine::NodeId; use nomos_consensus::{CarnotInfo, CarnotSettings}; use nomos_http::backends::axum::AxumBackendSettings; @@ -208,7 +208,7 @@ fn create_node_config( consensus: CarnotSettings { private_key, fountain_settings: (), - overlay_settings: Settings { + overlay_settings: FlatOverlaySettings { nodes, leader: RoundRobin::new(), // By setting the leader_threshold to 1 we ensure that all nodes come