Tree settings for sim overlay (#257)

* Tree settings for sim overlay

* WIP dyn node simulation

* Allow generic settings and states for dispatched nodes

* Update tests
This commit is contained in:
gusto 2023-07-18 14:44:23 +03:00 committed by GitHub
parent fe9f2ba006
commit 94e384f609
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 291 additions and 176 deletions

View File

@ -396,7 +396,7 @@ impl<O: Overlay> Carnot<O> {
mod test {
use std::convert::Infallible;
use crate::overlay::{FlatOverlay, RoundRobin, Settings};
use crate::overlay::{FlatOverlay, FlatOverlaySettings, RoundRobin};
use super::*;
@ -413,7 +413,7 @@ mod test {
leader_id: *nodes.first().unwrap(),
},
},
FlatOverlay::new(Settings {
FlatOverlay::new(FlatOverlaySettings {
nodes,
leader: RoundRobin::default(),
leader_super_majority_threshold: None,

View File

@ -17,11 +17,11 @@ impl<L> Overlay for FlatOverlay<L>
where
L: LeaderSelection + Send + Sync + 'static,
{
type Settings = Settings<L>;
type Settings = FlatOverlaySettings<L>;
type LeaderSelection = L;
fn new(
Settings {
FlatOverlaySettings {
leader,
nodes,
leader_super_majority_threshold,
@ -135,7 +135,7 @@ impl LeaderSelection for RoundRobin {
#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Settings<L> {
pub struct FlatOverlaySettings<L> {
pub nodes: Vec<NodeId>,
/// A fraction representing the threshold in the form `<num>/<den>'
/// Defaults to 2/3

View File

@ -1,7 +1,7 @@
use std::{collections::HashSet, panic};
use consensus_engine::{
overlay::{FlatOverlay, RoundRobin, Settings},
overlay::{FlatOverlay, FlatOverlaySettings, RoundRobin},
*,
};
use proptest_state_machine::{ReferenceStateMachine, StateMachineTest};
@ -28,7 +28,7 @@ impl ConsensusEngineTest {
leader_id: NodeId::new([0; 32]),
},
},
FlatOverlay::new(Settings {
FlatOverlay::new(FlatOverlaySettings {
nodes: vec![NodeId::new([0; 32])],
leader: RoundRobin::default(),
leader_super_majority_threshold: None,

View File

@ -17,7 +17,9 @@
"asia": 0.3
}
},
"overlay_settings": "Flat",
"overlay_settings": {
"number_of_committees": 3
},
"node_settings": {
"timeout": "1000ms"
},

View File

@ -1,13 +1,13 @@
// std
use anyhow::Ok;
use serde::Serialize;
use simulations::node::carnot::CarnotSettings;
use simulations::node::carnot::{CarnotSettings, CarnotState};
use std::fs::File;
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
// crates
use clap::Parser;
use consensus_engine::overlay::{FlatOverlay, RandomBeaconState, RoundRobin};
use consensus_engine::overlay::RandomBeaconState;
use consensus_engine::{Block, View};
use crossbeam::channel;
use rand::rngs::SmallRng;
@ -17,18 +17,18 @@ use serde::de::DeserializeOwned;
use simulations::network::behaviour::create_behaviours;
use simulations::network::regions::{create_regions, RegionsData};
use simulations::network::{InMemoryNetworkInterface, Network};
use simulations::node::{Node, NodeId, NodeIdExt};
use simulations::node::{NodeId, NodeIdExt};
use simulations::output_processors::Record;
use simulations::runner::SimulationRunnerHandle;
use simulations::runner::{BoxedNode, SimulationRunnerHandle};
use simulations::streaming::{
io::IOSubscriber, naive::NaiveSubscriber, polars::PolarsSubscriber, StreamType,
};
// internal
use simulations::{
node::carnot::CarnotNode, output_processors::OutData, runner::SimulationRunner,
settings::SimulationSettings,
output_processors::OutData, runner::SimulationRunner, settings::SimulationSettings,
};
mod log;
mod overlay_node;
/// Main simulation wrapper
/// Pipes together the cli arguments with the execution
@ -73,7 +73,7 @@ impl SimulationApp {
let ids = node_ids.clone();
let mut network = Network::new(regions_data);
let nodes = node_ids
let nodes: Vec<BoxedNode<CarnotSettings, CarnotState>> = node_ids
.iter()
.copied()
.map(|node_id| {
@ -86,11 +86,7 @@ impl SimulationApp {
);
let nodes: Vec<NodeId> = ids.clone().into_iter().map(Into::into).collect();
let leader = nodes.first().copied().unwrap();
let overlay_settings = consensus_engine::overlay::Settings {
nodes: nodes.to_vec(),
leader: RoundRobin::new(),
leader_super_majority_threshold: None,
};
// FIXME: Actually use a proposer and a key to generate random beacon state
let genesis = nomos_core::block::Block::new(
View::new(0),
@ -101,39 +97,36 @@ impl SimulationApp {
entropy: Box::new([0; 32]),
},
);
CarnotNode::<FlatOverlay<RoundRobin>>::new(
overlay_node::to_overlay_node(
node_id,
CarnotSettings::new(
simulation_settings.node_settings.timeout,
simulation_settings.record_settings.clone(),
),
overlay_settings,
genesis,
nodes,
leader,
network_interface,
genesis,
&mut rng,
&simulation_settings,
)
})
.collect();
run(network, nodes, simulation_settings, stream_type)?;
run::<_, _, _>(network, nodes, simulation_settings, stream_type)?;
Ok(())
}
}
fn run<M, N: Node>(
fn run<M, S, T>(
network: Network<M>,
nodes: Vec<N>,
nodes: Vec<BoxedNode<S, T>>,
settings: SimulationSettings,
stream_type: Option<StreamType>,
) -> anyhow::Result<()>
where
M: Clone + Send + Sync + 'static,
N: Send + Sync + 'static,
N::Settings: Clone + Send,
N::State: Serialize,
S: 'static,
T: Serialize + 'static,
{
let stream_settings = settings.stream_settings.clone();
let runner =
SimulationRunner::<_, _, OutData>::new(network, nodes, Default::default(), settings)?;
SimulationRunner::<_, OutData, S, T>::new(network, nodes, Default::default(), settings)?;
let handle = match stream_type {
Some(StreamType::Naive) => {

View File

@ -0,0 +1,62 @@
use consensus_engine::{
overlay::{FlatOverlay, RoundRobin, TreeOverlay},
NodeId,
};
use rand::Rng;
use simulations::{
network::InMemoryNetworkInterface,
node::carnot::{messages::CarnotMessage, CarnotNode, CarnotSettings, CarnotState},
runner::BoxedNode,
settings::SimulationSettings,
};
pub fn to_overlay_node<R: Rng>(
node_id: NodeId,
nodes: Vec<NodeId>,
leader: NodeId,
network_interface: InMemoryNetworkInterface<CarnotMessage>,
genesis: nomos_core::block::Block<[u8; 32]>,
mut rng: R,
settings: &SimulationSettings,
) -> BoxedNode<CarnotSettings, CarnotState> {
match &settings.overlay_settings {
simulations::settings::OverlaySettings::Flat => {
let overlay_settings = consensus_engine::overlay::FlatOverlaySettings {
nodes: nodes.to_vec(),
leader: RoundRobin::new(),
leader_super_majority_threshold: None,
};
Box::new(CarnotNode::<FlatOverlay<RoundRobin>>::new(
node_id,
CarnotSettings::new(
settings.node_settings.timeout,
settings.record_settings.clone(),
),
overlay_settings,
genesis,
network_interface,
&mut rng,
))
}
simulations::settings::OverlaySettings::Tree(tree_settings) => {
let overlay_settings = consensus_engine::overlay::TreeOverlaySettings {
nodes,
current_leader: leader,
entropy: [0; 32],
number_of_committees: tree_settings.number_of_committees,
leader: RoundRobin::new(),
};
Box::new(CarnotNode::<TreeOverlay<RoundRobin>>::new(
node_id,
CarnotSettings::new(
settings.node_settings.timeout,
settings.record_settings.clone(),
),
overlay_settings,
genesis,
network_interface,
&mut rng,
))
}
}
}

View File

@ -2,7 +2,7 @@
mod event_builder;
mod message_cache;
mod messages;
pub mod messages;
mod tally;
mod timeout;

View File

@ -95,17 +95,20 @@ impl OutData {
}
}
impl<N> TryFrom<&SimulationState<N>> for OutData
where
N: crate::node::Node,
N::State: Serialize,
{
impl<S, T: Serialize> TryFrom<&SimulationState<S, T>> for OutData {
type Error = anyhow::Error;
fn try_from(state: &crate::warding::SimulationState<N>) -> Result<Self, Self::Error> {
serde_json::to_value(state.nodes.read().iter().map(N::state).collect::<Vec<_>>())
.map(OutData::new)
.map_err(From::from)
fn try_from(state: &SimulationState<S, T>) -> Result<Self, Self::Error> {
serde_json::to_value(
state
.nodes
.read()
.iter()
.map(|n| n.state())
.collect::<Vec<_>>(),
)
.map(OutData::new)
.map_err(From::from)
}
}

View File

@ -1,4 +1,4 @@
use crate::node::{Node, NodeId};
use crate::node::NodeId;
use crate::output_processors::Record;
use crate::runner::SimulationRunner;
use crate::warding::SimulationState;
@ -6,7 +6,6 @@ use crossbeam::channel::bounded;
use crossbeam::select;
use rand::prelude::SliceRandom;
use rayon::prelude::*;
use serde::Serialize;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
@ -14,26 +13,25 @@ use std::time::Duration;
use super::SimulationRunnerHandle;
/// Simulate with sending the network state to any subscriber
pub fn simulate<M, N: Node, R>(
runner: SimulationRunner<M, N, R>,
pub fn simulate<M, R, S, T>(
runner: SimulationRunner<M, R, S, T>,
chunk_size: usize,
) -> anyhow::Result<SimulationRunnerHandle<R>>
where
M: Clone + Send + Sync + 'static,
N: Send + Sync + 'static,
N::Settings: Clone + Send,
N::State: Serialize,
R: Record
+ for<'a> TryFrom<&'a SimulationState<N>, Error = anyhow::Error>
+ for<'a> TryFrom<&'a SimulationState<S, T>, Error = anyhow::Error>
+ Send
+ Sync
+ 'static,
S: 'static,
T: 'static,
{
let simulation_state = SimulationState::<N> {
let simulation_state = SimulationState {
nodes: Arc::clone(&runner.nodes),
};
let mut node_ids: Vec<NodeId> = runner.nodes.read().iter().map(N::id).collect();
let mut node_ids: Vec<NodeId> = runner.nodes.read().iter().map(|n| n.id()).collect();
let mut inner_runner = runner.inner;
let nodes = runner.nodes;

View File

@ -5,7 +5,7 @@ use crate::warding::SimulationState;
use crossbeam::channel::bounded;
use crossbeam::select;
use rand::prelude::IteratorRandom;
use serde::Serialize;
use std::collections::BTreeSet;
use std::sync::Arc;
use std::time::Duration;
@ -15,21 +15,20 @@ use super::SimulationRunnerHandle;
/// Simulate with sending the network state to any subscriber.
///
/// [Glauber dynamics simulation](https://en.wikipedia.org/wiki/Glauber_dynamics)
pub fn simulate<M, N: Node, R>(
runner: SimulationRunner<M, N, R>,
pub fn simulate<M, R, S, T>(
runner: SimulationRunner<M, R, S, T>,
update_rate: usize,
maximum_iterations: usize,
) -> anyhow::Result<SimulationRunnerHandle<R>>
where
M: Send + Sync + Clone + 'static,
N: Send + Sync + 'static,
N::Settings: Clone + Send,
N::State: Serialize,
R: Record
+ for<'a> TryFrom<&'a SimulationState<N>, Error = anyhow::Error>
+ for<'a> TryFrom<&'a SimulationState<S, T>, Error = anyhow::Error>
+ Send
+ Sync
+ 'static,
S: 'static,
T: 'static,
{
let simulation_state = SimulationState {
nodes: Arc::clone(&runner.nodes),
@ -60,7 +59,7 @@ where
{
let mut shared_nodes = nodes.write();
let node: &mut N = shared_nodes
let node: &mut dyn Node<Settings = S, State = T> = &mut **shared_nodes
.get_mut(node_id.index())
.expect("Node should be present");
node.step(elapsed);

View File

@ -35,10 +35,10 @@ use std::ops::Not;
use std::sync::Arc;
use std::time::Duration;
// crates
use fixed_slice_deque::FixedSliceDeque;
use rand::prelude::{IteratorRandom, SliceRandom};
use rand::rngs::SmallRng;
use serde::Serialize;
// internal
use crate::node::{Node, NodeId, NodeIdExt};
use crate::output_processors::Record;
@ -48,28 +48,27 @@ use crate::warding::SimulationState;
use super::SimulationRunnerHandle;
/// Simulate with sending the network state to any subscriber
pub fn simulate<M, N: Node, R>(
runner: SimulationRunner<M, N, R>,
pub fn simulate<M, R, S, T>(
runner: SimulationRunner<M, R, S, T>,
gap: usize,
distribution: Option<Vec<f32>>,
) -> anyhow::Result<SimulationRunnerHandle<R>>
where
M: Send + Sync + Clone + 'static,
N: Send + Sync + 'static,
N::Settings: Clone + Send,
N::State: Serialize,
R: Record
+ for<'a> TryFrom<&'a SimulationState<N>, Error = anyhow::Error>
+ for<'a> TryFrom<&'a SimulationState<S, T>, Error = anyhow::Error>
+ Send
+ Sync
+ 'static,
S: 'static,
T: 'static,
{
let distribution =
distribution.unwrap_or_else(|| std::iter::repeat(1.0f32).take(gap).collect());
let layers: Vec<usize> = (0..gap).collect();
let mut deque = build_node_ids_deque::<M, N, R>(gap, &runner);
let mut deque = build_node_ids_deque::<M, R, S, T>(gap, &runner);
let simulation_state = SimulationState {
nodes: Arc::clone(&runner.nodes),
@ -96,7 +95,7 @@ where
{
let mut shared_nodes = nodes.write();
let node: &mut N = shared_nodes
let node: &mut dyn Node<Settings = S, State = T> = &mut **shared_nodes
.get_mut(node_id.index())
.expect("Node should be present");
let prev_view = node.current_view();
@ -164,13 +163,10 @@ fn choose_random_layer_and_node_id(
(i, *node_id)
}
fn build_node_ids_deque<M, N, R>(
fn build_node_ids_deque<M, R, S, T>(
gap: usize,
runner: &SimulationRunner<M, N, R>,
) -> FixedSliceDeque<BTreeSet<NodeId>>
where
N: Node,
{
runner: &SimulationRunner<M, R, S, T>,
) -> FixedSliceDeque<BTreeSet<NodeId>> {
// add a +1 so we always have
let mut deque = FixedSliceDeque::new(gap + 1);
// push first layer

View File

@ -26,6 +26,8 @@ use crate::node::Node;
use crate::settings::{RunnerSettings, SimulationSettings};
use crate::warding::{SimulationState, SimulationWard, Ward};
pub type BoxedNode<S, T> = Box<dyn Node<Settings = S, State = T> + Send + Sync>;
pub struct SimulationRunnerHandle<R> {
producer: StreamProducer<R>,
stop_tx: Sender<()>,
@ -68,24 +70,14 @@ impl<M> SimulationRunnerInner<M>
where
M: Send + Sync + Clone,
{
fn check_wards<N>(&mut self, state: &SimulationState<N>) -> bool
where
N: Node + Send + Sync,
N::Settings: Clone + Send,
N::State: Serialize,
{
fn check_wards<S, T>(&mut self, state: &SimulationState<S, T>) -> bool {
self.wards
.par_iter_mut()
.map(|ward| ward.analyze(state))
.any(|x| x)
}
fn step<N>(&mut self, nodes: &mut [N], elapsed: Duration)
where
N: Node + Send + Sync,
N::Settings: Clone + Send,
N::State: Serialize,
{
fn step<S, T>(&mut self, nodes: &mut [BoxedNode<S, T>], elapsed: Duration) {
self.network.dispatch_after(elapsed);
nodes.par_iter_mut().for_each(|node| {
node.step(elapsed);
@ -96,31 +88,27 @@ where
/// Encapsulation solution for the simulations runner
/// Holds the network state, the simulating nodes and the simulation settings.
pub struct SimulationRunner<M, N, R>
where
N: Node,
{
pub struct SimulationRunner<M, R, S, T> {
inner: SimulationRunnerInner<M>,
nodes: Arc<RwLock<Vec<N>>>,
nodes: Arc<RwLock<Vec<BoxedNode<S, T>>>>,
runner_settings: RunnerSettings,
producer: StreamProducer<R>,
}
impl<M, N: Node, R> SimulationRunner<M, N, R>
impl<M, R, S, T> SimulationRunner<M, R, S, T>
where
M: Clone + Send + Sync + 'static,
N: Send + Sync + 'static,
N::Settings: Clone + Send,
N::State: Serialize,
R: Record
+ for<'a> TryFrom<&'a SimulationState<N>, Error = anyhow::Error>
+ for<'a> TryFrom<&'a SimulationState<S, T>, Error = anyhow::Error>
+ Send
+ Sync
+ 'static,
S: 'static,
T: Serialize + 'static,
{
pub fn new(
network: Network<M>,
nodes: Vec<N>,
nodes: Vec<BoxedNode<S, T>>,
producer: StreamProducer<R>,
mut settings: SimulationSettings,
) -> anyhow::Result<Self> {
@ -182,28 +170,27 @@ where
}
}
impl<M, N: Node, R> SimulationRunner<M, N, R>
impl<M, R, S, T> SimulationRunner<M, R, S, T>
where
M: Clone + Send + Sync + 'static,
N: Send + Sync + 'static,
N::Settings: Clone + Send,
N::State: Serialize,
R: Record
+ serde::Serialize
+ for<'a> TryFrom<&'a SimulationState<N>, Error = anyhow::Error>
+ for<'a> TryFrom<&'a SimulationState<S, T>, Error = anyhow::Error>
+ Send
+ Sync
+ 'static,
S: 'static,
T: Serialize + 'static,
{
pub fn simulate_and_subscribe<S>(
pub fn simulate_and_subscribe<B>(
self,
settings: S::Settings,
settings: B::Settings,
) -> anyhow::Result<SimulationRunnerHandle<R>>
where
S: Subscriber<Record = R> + Send + Sync + 'static,
B: Subscriber<Record = R> + Send + Sync + 'static,
{
let handle = self.simulate()?;
let mut data_subscriber_handle = handle.subscribe::<S>(settings)?;
let mut data_subscriber_handle = handle.subscribe::<B>(settings)?;
let mut runtime_subscriber_handle =
handle.subscribe::<RuntimeSubscriber<R>>(Default::default())?;
let mut settings_subscriber_handle =

View File

@ -1,26 +1,23 @@
use serde::Serialize;
use super::{SimulationRunner, SimulationRunnerHandle};
use crate::output_processors::Record;
use crate::warding::SimulationState;
use crate::{node::Node, output_processors::Record};
use crossbeam::channel::{bounded, select};
use std::sync::Arc;
use std::time::Duration;
/// Simulate with sending the network state to any subscriber
pub fn simulate<M, N: Node, R>(
runner: SimulationRunner<M, N, R>,
pub fn simulate<M, R, S, T>(
runner: SimulationRunner<M, R, S, T>,
) -> anyhow::Result<SimulationRunnerHandle<R>>
where
M: Send + Sync + Clone + 'static,
N: Send + Sync + 'static,
N::Settings: Clone + Send,
N::State: Serialize,
R: Record
+ for<'a> TryFrom<&'a SimulationState<N>, Error = anyhow::Error>
+ for<'a> TryFrom<&'a SimulationState<S, T>, Error = anyhow::Error>
+ Send
+ Sync
+ 'static,
S: 'static,
T: 'static,
{
let state = SimulationState {
nodes: Arc::clone(&runner.nodes),
@ -74,11 +71,14 @@ mod tests {
InMemoryNetworkInterface, Network, NetworkBehaviourKey,
},
node::{
dummy::{DummyMessage, DummyNode},
dummy::{DummyMessage, DummyNode, DummySettings, DummyState},
Node, NodeId, NodeIdExt, OverlayState, SharedState, ViewOverlay,
},
output_processors::OutData,
overlay::{tree::TreeOverlay, Overlay, SimulationOverlay},
overlay::{
tree::{TreeOverlay, TreeSettings},
Overlay, SimulationOverlay,
},
runner::SimulationRunner,
settings::SimulationSettings,
streaming::StreamProducer,
@ -131,7 +131,7 @@ mod tests {
let mut rng = StepRng::new(1, 0);
let node_ids: Vec<NodeId> = (0..settings.node_count).map(NodeId::from_index).collect();
let overlay = TreeOverlay::new(settings.overlay_settings.clone().try_into().unwrap());
let overlay = TreeOverlay::new(TreeSettings::default());
let mut network = init_network(&node_ids);
let view = ViewOverlay {
leaders: overlay.leaders(&node_ids, 1, &mut rng).collect(),
@ -142,11 +142,24 @@ mod tests {
overlay: SimulationOverlay::Tree(overlay),
overlays: BTreeMap::from([(0, view.clone()), (1, view)]),
}));
let nodes = init_dummy_nodes(&node_ids, &mut network, overlay_state);
let nodes = init_dummy_nodes(&node_ids, &mut network, overlay_state)
.into_iter()
.map(|n| {
Box::new(n)
as Box<
dyn Node<State = DummyState, Settings = DummySettings>
+ std::marker::Send
+ Sync,
>
})
.collect();
let producer = StreamProducer::default();
let mut runner: SimulationRunner<DummyMessage, DummyNode, OutData> =
SimulationRunner::new(network, nodes, producer, settings).unwrap();
let mut runner: SimulationRunner<DummyMessage, OutData, DummySettings, DummyState> =
SimulationRunner::<_, OutData, DummySettings, DummyState>::new(
network, nodes, producer, settings,
)
.unwrap();
let mut nodes = runner.nodes.write();
runner.inner.step(&mut nodes, Duration::from_millis(100));
drop(nodes);
@ -166,7 +179,7 @@ mod tests {
let mut rng = StepRng::new(1, 0);
let node_ids: Vec<NodeId> = (0..settings.node_count).map(NodeId::from_index).collect();
let overlay = TreeOverlay::new(settings.overlay_settings.clone().try_into().unwrap());
let overlay = TreeOverlay::new(TreeSettings::default());
let mut network = init_network(&node_ids);
let view = ViewOverlay {
leaders: overlay.leaders(&node_ids, 1, &mut rng).collect(),
@ -182,7 +195,7 @@ mod tests {
(43, view),
]),
}));
let nodes = init_dummy_nodes(&node_ids, &mut network, overlay_state);
let nodes = init_dummy_nodes(&node_ids, &mut network, overlay_state.clone());
for node in nodes.iter() {
// All nodes send one message to NodeId(1).
@ -191,7 +204,19 @@ mod tests {
}
network.collect_messages();
let mut runner: SimulationRunner<DummyMessage, DummyNode, OutData> =
let nodes = init_dummy_nodes(&node_ids, &mut network, overlay_state)
.into_iter()
.map(|n| {
Box::new(n)
as Box<
dyn Node<State = DummyState, Settings = DummySettings>
+ std::marker::Send
+ Sync,
>
})
.collect();
let mut runner: SimulationRunner<DummyMessage, OutData, DummySettings, DummyState> =
SimulationRunner::new(network, nodes, Default::default(), settings).unwrap();
let mut nodes = runner.nodes.write();

View File

@ -1,7 +1,6 @@
use std::collections::HashMap;
use crate::network::NetworkSettings;
use crate::overlay::OverlaySettings;
use crate::streaming::StreamSettings;
use crate::warding::Ward;
use serde::{Deserialize, Serialize};
@ -23,6 +22,19 @@ pub enum RunnerSettings {
},
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
#[serde(untagged)]
pub enum OverlaySettings {
#[default]
Flat,
Tree(TreeSettings),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TreeSettings {
pub number_of_committees: usize,
}
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct NodeSettings {
#[serde(with = "humantime_serde")]

View File

@ -118,7 +118,10 @@ mod tests {
regions::{Region, RegionsData},
Network, NetworkBehaviourKey,
},
node::{dummy_streaming::DummyStreamingNode, Node, NodeId, NodeIdExt},
node::{
dummy_streaming::{DummyStreamingNode, DummyStreamingState},
Node, NodeId, NodeIdExt,
},
output_processors::OutData,
runner::SimulationRunner,
warding::SimulationState,
@ -130,10 +133,10 @@ mod tests {
states: HashMap<NodeId, usize>,
}
impl TryFrom<&SimulationState<DummyStreamingNode<()>>> for IORecord {
impl<S, T: Serialize> TryFrom<&SimulationState<S, T>> for IORecord {
type Error = anyhow::Error;
fn try_from(value: &SimulationState<DummyStreamingNode<()>>) -> Result<Self, Self::Error> {
fn try_from(value: &SimulationState<S, T>) -> Result<Self, Self::Error> {
let nodes = value.nodes.read();
Ok(Self {
states: nodes
@ -152,7 +155,14 @@ mod tests {
};
let nodes = (0..6)
.map(|idx| DummyStreamingNode::new(NodeId::from_index(idx), ()))
.map(|idx| {
Box::new(DummyStreamingNode::new(NodeId::from_index(idx), ()))
as Box<
dyn Node<State = DummyStreamingState, Settings = ()>
+ std::marker::Send
+ Sync,
>
})
.collect::<Vec<_>>();
let network = Network::new(RegionsData {
regions: (0..6)
@ -204,7 +214,7 @@ mod tests {
})
.collect(),
});
let simulation_runner: SimulationRunner<(), DummyStreamingNode<()>, OutData> =
let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> =
SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap();
simulation_runner
.simulate()

View File

@ -122,7 +122,10 @@ mod tests {
regions::{Region, RegionsData},
Network, NetworkBehaviourKey,
},
node::{dummy_streaming::DummyStreamingNode, Node, NodeId, NodeIdExt},
node::{
dummy_streaming::{DummyStreamingNode, DummyStreamingState},
Node, NodeId, NodeIdExt,
},
output_processors::OutData,
runner::SimulationRunner,
warding::SimulationState,
@ -134,10 +137,10 @@ mod tests {
states: HashMap<NodeId, usize>,
}
impl TryFrom<&SimulationState<DummyStreamingNode<()>>> for NaiveRecord {
impl<S, T: Serialize> TryFrom<&SimulationState<S, T>> for NaiveRecord {
type Error = anyhow::Error;
fn try_from(value: &SimulationState<DummyStreamingNode<()>>) -> Result<Self, Self::Error> {
fn try_from(value: &SimulationState<S, T>) -> Result<Self, Self::Error> {
Ok(Self {
states: value
.nodes
@ -157,7 +160,14 @@ mod tests {
};
let nodes = (0..6)
.map(|idx| DummyStreamingNode::new(NodeId::from_index(idx), ()))
.map(|idx| {
Box::new(DummyStreamingNode::new(NodeId::from_index(idx), ()))
as Box<
dyn Node<State = DummyStreamingState, Settings = ()>
+ std::marker::Send
+ Sync,
>
})
.collect::<Vec<_>>();
let network = Network::new(RegionsData {
regions: (0..6)
@ -209,7 +219,7 @@ mod tests {
})
.collect(),
});
let simulation_runner: SimulationRunner<(), DummyStreamingNode<()>, OutData> =
let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> =
SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap();
simulation_runner.simulate().unwrap();
}

View File

@ -107,7 +107,10 @@ mod tests {
regions::{Region, RegionsData},
Network, NetworkBehaviourKey,
},
node::{dummy_streaming::DummyStreamingNode, Node, NodeId, NodeIdExt},
node::{
dummy_streaming::{DummyStreamingNode, DummyStreamingState},
Node, NodeId, NodeIdExt,
},
output_processors::OutData,
runner::SimulationRunner,
warding::SimulationState,
@ -119,10 +122,10 @@ mod tests {
states: HashMap<NodeId, usize>,
}
impl TryFrom<&SimulationState<DummyStreamingNode<()>>> for RuntimeRecord {
impl<S, T: Serialize> TryFrom<&SimulationState<S, T>> for RuntimeRecord {
type Error = anyhow::Error;
fn try_from(value: &SimulationState<DummyStreamingNode<()>>) -> Result<Self, Self::Error> {
fn try_from(value: &SimulationState<S, T>) -> Result<Self, Self::Error> {
Ok(Self {
states: value
.nodes
@ -142,7 +145,14 @@ mod tests {
};
let nodes = (0..6)
.map(|idx| DummyStreamingNode::new(NodeId::from_index(idx), ()))
.map(|idx| {
Box::new(DummyStreamingNode::new(NodeId::from_index(idx), ()))
as Box<
dyn Node<State = DummyStreamingState, Settings = ()>
+ std::marker::Send
+ Sync,
>
})
.collect::<Vec<_>>();
let network = Network::new(RegionsData {
regions: (0..6)
@ -194,7 +204,7 @@ mod tests {
})
.collect(),
});
let simulation_runner: SimulationRunner<(), DummyStreamingNode<()>, OutData> =
let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> =
SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap();
simulation_runner.simulate().unwrap();
}

View File

@ -107,7 +107,10 @@ mod tests {
regions::{Region, RegionsData},
Network, NetworkBehaviourKey,
},
node::{dummy_streaming::DummyStreamingNode, Node, NodeId, NodeIdExt},
node::{
dummy_streaming::{DummyStreamingNode, DummyStreamingState},
Node, NodeId, NodeIdExt,
},
output_processors::OutData,
runner::SimulationRunner,
warding::SimulationState,
@ -119,10 +122,10 @@ mod tests {
states: HashMap<NodeId, usize>,
}
impl TryFrom<&SimulationState<DummyStreamingNode<()>>> for SettingsRecord {
impl<S, T: Serialize> TryFrom<&SimulationState<S, T>> for SettingsRecord {
type Error = anyhow::Error;
fn try_from(value: &SimulationState<DummyStreamingNode<()>>) -> Result<Self, Self::Error> {
fn try_from(value: &SimulationState<S, T>) -> Result<Self, Self::Error> {
Ok(Self {
states: value
.nodes
@ -142,7 +145,14 @@ mod tests {
};
let nodes = (0..6)
.map(|idx| DummyStreamingNode::new(NodeId::from_index(idx), ()))
.map(|idx| {
Box::new(DummyStreamingNode::new(NodeId::from_index(idx), ()))
as Box<
dyn Node<State = DummyStreamingState, Settings = ()>
+ std::marker::Send
+ Sync,
>
})
.collect::<Vec<_>>();
let network = Network::new(RegionsData {
regions: (0..6)
@ -194,7 +204,7 @@ mod tests {
})
.collect(),
});
let simulation_runner: SimulationRunner<(), DummyStreamingNode<()>, OutData> =
let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> =
SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap();
simulation_runner.simulate().unwrap();
}

View File

@ -1,4 +1,3 @@
use crate::node::Node;
use crate::warding::{SimulationState, SimulationWard};
use serde::{Deserialize, Serialize};
@ -10,8 +9,8 @@ pub struct MinMaxViewWard {
max_gap: usize,
}
impl<N: Node> SimulationWard<N> for MinMaxViewWard {
type SimulationState = SimulationState<N>;
impl<S, T> SimulationWard<S, T> for MinMaxViewWard {
type SimulationState = SimulationState<S, T>;
fn analyze(&mut self, state: &Self::SimulationState) -> bool {
let mut min = usize::MAX;
let mut max = 0;
@ -36,13 +35,13 @@ mod test {
fn rebase_threshold() {
let mut minmax = MinMaxViewWard { max_gap: 5 };
let state = SimulationState {
nodes: Arc::new(RwLock::new(vec![10])),
nodes: Arc::new(RwLock::new(vec![Box::new(10)])),
};
// we only have one node, so always false
assert!(!minmax.analyze(&state));
// push a new node with 10
state.nodes.write().push(20);
state.nodes.write().push(Box::new(20));
// we now have two nodes and the max - min is 10 > max_gap 5, so true
assert!(minmax.analyze(&state));
}

View File

@ -4,19 +4,19 @@ use std::sync::Arc;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
// internal
use crate::node::Node;
use crate::runner::BoxedNode;
mod minmax;
mod stalled;
mod ttf;
pub struct SimulationState<N> {
pub nodes: Arc<RwLock<Vec<N>>>,
pub struct SimulationState<S, T> {
pub nodes: Arc<RwLock<Vec<BoxedNode<S, T>>>>,
}
impl<N> SimulationState<N> {
impl<S, T> SimulationState<S, T> {
#[inline]
pub fn new(nodes: Vec<N>) -> Self {
pub fn new(nodes: Vec<BoxedNode<S, T>>) -> Self {
Self {
nodes: Arc::new(RwLock::new(nodes)),
}
@ -25,7 +25,7 @@ impl<N> SimulationState<N> {
/// A ward is a computation over the `NetworkState`, it must return true if the state satisfies
/// the warding conditions. It is used to stop the consensus simulation if such condition is reached.
pub trait SimulationWard<N> {
pub trait SimulationWard<S, T> {
type SimulationState;
fn analyze(&mut self, state: &Self::SimulationState) -> bool;
}
@ -41,9 +41,9 @@ pub enum Ward {
}
impl Ward {
pub fn simulation_ward_mut<N: Node>(
pub fn simulation_ward_mut<S, T>(
&mut self,
) -> &mut dyn SimulationWard<N, SimulationState = SimulationState<N>> {
) -> &mut dyn SimulationWard<S, T, SimulationState = SimulationState<S, T>> {
match self {
Ward::MaxView(ward) => ward,
Ward::MinMaxView(ward) => ward,
@ -52,8 +52,8 @@ impl Ward {
}
}
impl<N: Node> SimulationWard<N> for Ward {
type SimulationState = SimulationState<N>;
impl<S, T> SimulationWard<S, T> for Ward {
type SimulationState = SimulationState<S, T>;
fn analyze(&mut self, state: &Self::SimulationState) -> bool {
self.simulation_ward_mut().analyze(state)
}

View File

@ -1,4 +1,4 @@
use crate::node::Node;
use crate::runner::BoxedNode;
use crate::warding::{SimulationState, SimulationWard};
use serde::{Deserialize, Serialize};
@ -31,8 +31,8 @@ impl StalledViewWard {
}
}
impl<N: Node> SimulationWard<N> for StalledViewWard {
type SimulationState = SimulationState<N>;
impl<S, T> SimulationWard<S, T> for StalledViewWard {
type SimulationState = SimulationState<S, T>;
fn analyze(&mut self, state: &Self::SimulationState) -> bool {
let nodes = state.nodes.read();
self.update_state(checksum(nodes.as_slice()));
@ -41,7 +41,7 @@ impl<N: Node> SimulationWard<N> for StalledViewWard {
}
#[inline]
fn checksum<N: Node>(nodes: &[N]) -> u32 {
fn checksum<S, T>(nodes: &[BoxedNode<S, T>]) -> u32 {
let mut hash = crc32fast::Hasher::new();
for node in nodes.iter() {
hash.update(&node.current_view().to_be_bytes());
@ -65,7 +65,7 @@ mod test {
threshold: 2,
};
let state = SimulationState {
nodes: Arc::new(RwLock::new(vec![10])),
nodes: Arc::new(RwLock::new(vec![Box::new(10)])),
};
// increase the criterion, 1
@ -76,7 +76,7 @@ mod test {
assert!(stalled.analyze(&state));
// push a new one, so the criterion is reset to 0
state.nodes.write().push(20);
state.nodes.write().push(Box::new(20));
assert!(!stalled.analyze(&state));
// increase the criterion, 2

View File

@ -1,4 +1,3 @@
use crate::node::Node;
use crate::warding::{SimulationState, SimulationWard};
use serde::{Deserialize, Serialize};
@ -10,8 +9,8 @@ pub struct MaxViewWard {
max_view: usize,
}
impl<N: Node> SimulationWard<N> for MaxViewWard {
type SimulationState = SimulationState<N>;
impl<S, T> SimulationWard<S, T> for MaxViewWard {
type SimulationState = SimulationState<S, T>;
fn analyze(&mut self, state: &Self::SimulationState) -> bool {
state
.nodes
@ -34,11 +33,11 @@ mod test {
let node = 11;
let state = SimulationState {
nodes: Arc::new(RwLock::new(vec![node])),
nodes: Arc::new(RwLock::new(vec![Box::new(node)])),
};
assert!(ttf.analyze(&state));
state.nodes.write().push(9);
state.nodes.write().push(Box::new(9));
assert!(!ttf.analyze(&state));
}
}

View File

@ -4,7 +4,7 @@ use std::process::{Child, Command, Stdio};
use std::time::Duration;
// internal
use crate::{get_available_port, Node, SpawnConfig, RNG};
use consensus_engine::overlay::{RoundRobin, Settings};
use consensus_engine::overlay::{FlatOverlaySettings, RoundRobin};
use consensus_engine::NodeId;
use nomos_consensus::{CarnotInfo, CarnotSettings};
use nomos_http::backends::axum::AxumBackendSettings;
@ -208,7 +208,7 @@ fn create_node_config(
consensus: CarnotSettings {
private_key,
fountain_settings: (),
overlay_settings: Settings {
overlay_settings: FlatOverlaySettings {
nodes,
leader: RoundRobin::new(),
// By setting the leader_threshold to 1 we ensure that all nodes come