Simulation happy path (#161)
* finish subscriber manager * optimize subscribe on SimulationRunnerHandle * fix comment * replace std locks to parking_lot locks * move producer initialization out simulate fn * WIP * optimize run fn * update condition * fix CI * collect run times * Add happy-path consensus engine * tmp * Fit types from spec (#124) * Match types to spec * Remove Output import * Consensus engine rework (#126) * rework * fix test * clippy happy --------- Co-authored-by: Giacomo Pasini <Zeegomo@users.noreply.github.com> * Adapt carnot network adapter interfaces and implementations * Fix errors * Update network with engine types * Fit types yet again * Remove leadership and old overlay Create carnot event builder Added some adjustments * Add view to vote * Fix serde derive in consensus-engine * Add serde feature for engine in core * Use view in tally * Move carnot tally to consensus service * Add new view msg * Fit engine types in adapter * Missing serde feature in consensus service * Implement carnot event builder * Implement even builder run main tasks * Fill up view resolver * Fix errors on network adapter implementations * Clippy happy * Extract event handling to independent methods in View * Fix test * Refactor carnot event builder (#135) * refactor * format * Discriminate proposal messages (#136) * Derive block id from wire format (#139) * Derive block id from wire format * Derive id on block creation * Use compile time hash size * Add leader role (#138) * add leadership stub * fix gather_new_views * fmt * actually build qc * remove redundant fields * add flat overlay (#143) * add flat overlay * fix * sort imports * fix tests * Fix waku update * rewrite data collection add different kind of subscribers * fix fmt * Unhappy tally (#137) * Refactor tally module * Implement tally for new view messages * Assess pr comments * Fix rebase * simplify tally --------- Co-authored-by: Giacomo Pasini <g.pasini98@gmail.com> * fix gather_new_views * working node * fix unhappy path * remove leftover * Kickstart event building in sim app * finish event builder * fix comment * add Tally * gather enough new views then construct ProposalBlock event * Revert "gather enough new views then construct ProposalBlock event" This reverts commit 87da2bdd0c5d7ba19c110e128749ee356934ccbd. * WIP: CarnotNode * WIP * finish event handle * dump state * WIP * finish message sending * fix some compile errors * make project compile * update * fix fmt and clippy * optimize json ser/deser and add a config * update Cargo.toml * Implement leader proposing (#154) * Implement leader proposing * fix fmt --------- Co-authored-by: al8n <scygliu1@gmail.com> * fix ser/deser bugs * fix subscriber bugs * Fix proposing genesis * Fix genesis retrieval in consensus-engine * Bring back general block proposal event * Fix leaf voting * fix init node bugs * add more tracing * fix empty qc * fix data race * fix all panics * cleanup * propose new blocks * fix comment * do not approve for the same block * no panics * fix some comments * use serde_with * Bring back genesis on 0 * Fix genesis retrieval Replace output enum Vote for genesis proposal * Genesis methods * fix StardardQc::genesis() * fix genesis block bug * fix PR comment * fix PR comment * fix PR comment * fix PR comment * fix PR comment * Fix tally Fix proposing * Remove public block building Added raw method * Missing fmt * clippy happy * fix io stream downcast * optional stream-type arg, by default we do not run any subscriber * fmt * cleanup * Remove from header block constructor * Fix duplicated approve (#180) * fix duplicated approve * Success tally just on threshold * Integrate random beacon on happy path * Fix missing updating beacon * Replicate consensus output * Prune older non relevant messages from cache * Remove view info just again * Refactor Block deps * Reverse wrong parent committee call in Consensus engine * Remove useless event builder settings * Remove blocks store from event builder * Remove unnecessary carnot seed * Remove duplicated proposals check --------- Co-authored-by: Giacomo Pasini <g.pasini98@gmail.com> Co-authored-by: Daniel Sanchez <sanchez.quiros.daniel@gmail.com> Co-authored-by: Giacomo Pasini <Zeegomo@users.noreply.github.com>
This commit is contained in:
parent
75025e8cf0
commit
27d9f72035
@ -32,6 +32,11 @@ impl<O: Overlay> Carnot<O> {
|
||||
pub fn current_view(&self) -> View {
|
||||
self.current_view
|
||||
}
|
||||
|
||||
pub fn highest_voted_view(&self) -> View {
|
||||
self.highest_voted_view
|
||||
}
|
||||
|
||||
/// Upon reception of a block
|
||||
///
|
||||
/// Preconditions:
|
||||
@ -71,7 +76,6 @@ impl<O: Overlay> Carnot<O> {
|
||||
return Err(());
|
||||
}
|
||||
let mut new_state = self.clone();
|
||||
|
||||
if new_state.block_is_safe(block.clone()) {
|
||||
new_state.safe_blocks.insert(block.id, block.clone());
|
||||
new_state.update_high_qc(block.parent_qc);
|
||||
@ -361,6 +365,10 @@ impl<O: Overlay> Carnot<O> {
|
||||
self.overlay.is_member_of_root_committee(self.id)
|
||||
}
|
||||
|
||||
pub fn overlay(&self) -> &O {
|
||||
&self.overlay
|
||||
}
|
||||
|
||||
/// A way to allow for overlay extendability without compromising the engine
|
||||
/// generality.
|
||||
pub fn update_overlay<F, E>(&self, f: F) -> Result<Self, E>
|
||||
|
@ -6,7 +6,6 @@ use serde::{Deserialize, Serialize};
|
||||
/// Flat overlay with a single committee and round robin leader selection.
|
||||
pub struct FlatOverlay<L: LeaderSelection> {
|
||||
nodes: Vec<NodeId>,
|
||||
|
||||
leader: L,
|
||||
}
|
||||
|
||||
|
@ -82,6 +82,15 @@ impl Block {
|
||||
pub fn parent(&self) -> BlockId {
|
||||
self.parent_qc.block()
|
||||
}
|
||||
|
||||
pub fn genesis() -> Self {
|
||||
Self {
|
||||
id: [0; 32],
|
||||
view: 0,
|
||||
parent_qc: Qc::Standard(StandardQc::genesis()),
|
||||
leader_proof: LeaderProof::LeaderId { leader_id: [0; 32] },
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Possible output events.
|
||||
|
@ -33,7 +33,7 @@ impl<TxId: Clone + Eq + Hash + Serialize + DeserializeOwned> Block<TxId> {
|
||||
) -> Self {
|
||||
let transactions = txs.collect();
|
||||
let header = consensus_engine::Block {
|
||||
id: [view as u8; 32],
|
||||
id: [0; 32],
|
||||
view,
|
||||
parent_qc,
|
||||
leader_proof: LeaderProof::LeaderId {
|
||||
@ -46,7 +46,7 @@ impl<TxId: Clone + Eq + Hash + Serialize + DeserializeOwned> Block<TxId> {
|
||||
transactions,
|
||||
beacon,
|
||||
};
|
||||
let id = id_from_wire_content(&s.as_bytes());
|
||||
let id = block_id_from_wire_content(&s);
|
||||
s.header.id = id;
|
||||
s
|
||||
}
|
||||
@ -66,9 +66,12 @@ impl<TxId: Clone + Eq + Hash> Block<TxId> {
|
||||
}
|
||||
}
|
||||
|
||||
fn id_from_wire_content(bytes: &[u8]) -> consensus_engine::BlockId {
|
||||
pub fn block_id_from_wire_content<Tx: Clone + Eq + Hash + Serialize + DeserializeOwned>(
|
||||
block: &Block<Tx>,
|
||||
) -> consensus_engine::BlockId {
|
||||
use blake2::digest::{consts::U32, Digest};
|
||||
use blake2::Blake2b;
|
||||
let bytes = block.as_bytes();
|
||||
let mut hasher = Blake2b::<U32>::new();
|
||||
hasher.update(bytes);
|
||||
hasher.finalize().into()
|
||||
@ -82,7 +85,7 @@ impl<TxId: Clone + Eq + Hash + Serialize + DeserializeOwned> Block<TxId> {
|
||||
|
||||
pub fn from_bytes(bytes: &[u8]) -> Self {
|
||||
let mut result: Self = wire::deserialize(bytes).unwrap();
|
||||
result.header.id = id_from_wire_content(bytes);
|
||||
result.header.id = block_id_from_wire_content(&result);
|
||||
result
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
mod leader_selection;
|
||||
pub mod leader_selection;
|
||||
pub mod network;
|
||||
mod tally;
|
||||
mod task_manager;
|
||||
@ -298,7 +298,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // TODO: remove this when using broadcasting events
|
||||
#[derive(Debug)]
|
||||
enum Output<Tx: Clone + Eq + Hash> {
|
||||
Send(consensus_engine::Send),
|
||||
BroadcastTimeoutQc { timeout_qc: TimeoutQc },
|
||||
@ -716,7 +716,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum Event<Tx: Clone + Hash + Eq> {
|
||||
enum Event<Tx: Clone + Hash + Eq> {
|
||||
Proposal {
|
||||
block: Block<Tx>,
|
||||
stream: Pin<Box<dyn Stream<Item = Block<Tx>> + Send>>,
|
||||
|
@ -6,7 +6,7 @@ use crate::NodeId;
|
||||
use consensus_engine::{BlockId, NewView, Qc, Timeout, TimeoutQc, View, Vote};
|
||||
use nomos_core::wire;
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, Debug)]
|
||||
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
|
||||
pub struct ProposalChunkMsg {
|
||||
pub chunk: Box<[u8]>,
|
||||
pub proposal: BlockId,
|
||||
@ -39,7 +39,7 @@ impl VoteMsg {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(Serialize, Deserialize, Hash, PartialEq, Eq, Clone)]
|
||||
pub struct NewViewMsg {
|
||||
pub voter: NodeId,
|
||||
pub vote: NewView,
|
||||
@ -54,7 +54,7 @@ impl NewViewMsg {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(Serialize, Deserialize, Hash, PartialEq, Eq, Clone)]
|
||||
pub struct TimeoutMsg {
|
||||
pub voter: NodeId,
|
||||
pub vote: Timeout,
|
||||
@ -69,7 +69,7 @@ impl TimeoutMsg {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Clone)]
|
||||
pub struct TimeoutQcMsg {
|
||||
pub source: NodeId,
|
||||
pub qc: TimeoutQc,
|
||||
|
@ -3,16 +3,27 @@ name = "simulations"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[[bin]]
|
||||
name = "simulation"
|
||||
path = "src/bin/app.rs"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1"
|
||||
arc-swap = "1.6"
|
||||
bls-signatures = "0.14"
|
||||
clap = { version = "4", features = ["derive"] }
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
crc32fast = "1.3"
|
||||
crossbeam = { version = "0.8.2", features = ["crossbeam-channel"] }
|
||||
consensus-engine = { path = "../consensus-engine" }
|
||||
fixed-slice-deque = "0.1.0-beta2"
|
||||
futures = "0.3"
|
||||
humantime-serde = "1"
|
||||
nomos-core = { path = "../nomos-core" }
|
||||
nomos-consensus = { path = "../nomos-services/consensus" }
|
||||
once_cell = "1.17"
|
||||
parking_lot = "0.12"
|
||||
polars = { version = "0.27", features = ["serde", "object", "json", "csv-file", "parquet", "dtype-struct"] }
|
||||
rand = { version = "0.8", features = ["small_rng"] }
|
||||
@ -22,6 +33,8 @@ serde = { version = "1.0", features = ["derive", "rc"] }
|
||||
serde_with = "2.3"
|
||||
serde_json = "1.0"
|
||||
thiserror = "1"
|
||||
tracing = { version = "0.1", default-features = false, features = ["log", "attributes"] }
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "tracing-log"]}
|
||||
|
||||
[target.'cfg(target_arch = "wasm32")'.dependencies]
|
||||
getrandom = { version = "0.2", features = ["js"] }
|
||||
|
60
simulations/config/carnot.json
Normal file
60
simulations/config/carnot.json
Normal file
@ -0,0 +1,60 @@
|
||||
{
|
||||
"network_settings": {
|
||||
"network_behaviors": {
|
||||
"north america:north america": {
|
||||
"secs": 1,
|
||||
"nanos": 0
|
||||
},
|
||||
"north america:europe": {
|
||||
"secs": 1,
|
||||
"nanos": 0
|
||||
},
|
||||
"north america:asia": {
|
||||
"secs": 1,
|
||||
"nanos": 0
|
||||
},
|
||||
"europe:europe": {
|
||||
"secs": 1,
|
||||
"nanos": 0
|
||||
},
|
||||
"europe:asia": {
|
||||
"secs": 1,
|
||||
"nanos": 0
|
||||
},
|
||||
"europe:north america": {
|
||||
"secs": 1,
|
||||
"nanos": 0
|
||||
},
|
||||
"asia:north america": {
|
||||
"secs": 1,
|
||||
"nanos": 0
|
||||
},
|
||||
"asia:europe": {
|
||||
"secs": 1,
|
||||
"nanos": 0
|
||||
},
|
||||
"asia:asia": {
|
||||
"secs": 1,
|
||||
"nanos": 0
|
||||
}
|
||||
},
|
||||
"regions": {
|
||||
"north america": 0.4,
|
||||
"europe": 0.4,
|
||||
"asia": 0.3
|
||||
}
|
||||
},
|
||||
"overlay_settings": "Flat",
|
||||
"node_settings": {
|
||||
"seed": 0,
|
||||
"timeout": "1000ms"
|
||||
},
|
||||
"runner_settings": "Sync",
|
||||
"stream_settings": {
|
||||
"format": "json"
|
||||
},
|
||||
"node_count": 3,
|
||||
"views_count": 3,
|
||||
"leaders_count": 1,
|
||||
"seed": 0
|
||||
}
|
@ -1,9 +1,7 @@
|
||||
// std
|
||||
use anyhow::Ok;
|
||||
use serde::Serialize;
|
||||
use simulations::streaming::io::IOSubscriber;
|
||||
use simulations::streaming::naive::NaiveSubscriber;
|
||||
use simulations::streaming::polars::PolarsSubscriber;
|
||||
use simulations::node::carnot::CarnotSettings;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fs::File;
|
||||
use std::path::{Path, PathBuf};
|
||||
@ -11,6 +9,8 @@ use std::sync::Arc;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
// crates
|
||||
use clap::Parser;
|
||||
use consensus_engine::overlay::{FlatOverlay, RandomBeaconState, RoundRobin};
|
||||
use consensus_engine::Block;
|
||||
use crossbeam::channel;
|
||||
use parking_lot::RwLock;
|
||||
use rand::rngs::SmallRng;
|
||||
@ -22,12 +22,15 @@ use simulations::network::regions::{create_regions, RegionsData};
|
||||
use simulations::network::{InMemoryNetworkInterface, Network};
|
||||
use simulations::node::dummy::DummyNode;
|
||||
use simulations::node::{Node, NodeId, OverlayState, ViewOverlay};
|
||||
use simulations::overlay::{create_overlay, Overlay, SimulationOverlay};
|
||||
use simulations::streaming::StreamType;
|
||||
use simulations::overlay::{create_overlay, SimulationOverlay};
|
||||
use simulations::streaming::{
|
||||
io::IOSubscriber, naive::NaiveSubscriber, polars::PolarsSubscriber,
|
||||
runtime_subscriber::RuntimeSubscriber, settings_subscriber::SettingsSubscriber, StreamType,
|
||||
};
|
||||
// internal
|
||||
use simulations::{
|
||||
node::carnot::CarnotNode, output_processors::OutData, runner::SimulationRunner,
|
||||
settings::SimulationSettings,
|
||||
settings::SimulationSettings, util::node_id,
|
||||
};
|
||||
|
||||
/// Main simulation wrapper
|
||||
@ -38,7 +41,7 @@ pub struct SimulationApp {
|
||||
#[clap(long, short)]
|
||||
input_settings: PathBuf,
|
||||
#[clap(long)]
|
||||
stream_type: StreamType,
|
||||
stream_type: Option<StreamType>,
|
||||
}
|
||||
|
||||
impl SimulationApp {
|
||||
@ -56,9 +59,7 @@ impl SimulationApp {
|
||||
.as_secs()
|
||||
});
|
||||
let mut rng = SmallRng::seed_from_u64(seed);
|
||||
let mut node_ids: Vec<NodeId> = (0..simulation_settings.node_count)
|
||||
.map(Into::into)
|
||||
.collect();
|
||||
let mut node_ids: Vec<NodeId> = (0..simulation_settings.node_count).map(node_id).collect();
|
||||
node_ids.shuffle(&mut rng);
|
||||
|
||||
let regions = create_regions(&node_ids, &mut rng, &simulation_settings.network_settings);
|
||||
@ -79,17 +80,52 @@ impl SimulationApp {
|
||||
overlays,
|
||||
}));
|
||||
|
||||
let mut network = Network::new(regions_data);
|
||||
|
||||
match &simulation_settings.node_settings {
|
||||
simulations::settings::NodeSettings::Carnot => {
|
||||
simulations::settings::NodeSettings::Carnot { timeout } => {
|
||||
let ids = node_ids.clone();
|
||||
let mut network = Network::new(regions_data);
|
||||
let nodes = node_ids
|
||||
.iter()
|
||||
.map(|node_id| CarnotNode::new(*node_id))
|
||||
.copied()
|
||||
.map(|node_id| {
|
||||
let (node_message_sender, node_message_receiver) = channel::unbounded();
|
||||
let network_message_receiver =
|
||||
network.connect(node_id, node_message_receiver);
|
||||
let network_interface = InMemoryNetworkInterface::new(
|
||||
node_id,
|
||||
node_message_sender,
|
||||
network_message_receiver,
|
||||
);
|
||||
let nodes: Vec<NodeId> = ids.clone().into_iter().map(Into::into).collect();
|
||||
let leader = nodes.first().copied().unwrap();
|
||||
let overlay_settings = consensus_engine::overlay::Settings {
|
||||
nodes: nodes.to_vec(),
|
||||
leader: RoundRobin::new(),
|
||||
};
|
||||
// FIXME: Actually use a proposer and a key to generate random beacon state
|
||||
let genesis = nomos_core::block::Block::new(
|
||||
0,
|
||||
Block::genesis().parent_qc,
|
||||
[].into_iter(),
|
||||
leader,
|
||||
RandomBeaconState::Sad {
|
||||
entropy: Box::new([0; 32]),
|
||||
},
|
||||
);
|
||||
CarnotNode::<FlatOverlay<RoundRobin>>::new(
|
||||
node_id,
|
||||
CarnotSettings::new(nodes, *timeout),
|
||||
overlay_settings,
|
||||
genesis,
|
||||
network_interface,
|
||||
&mut rng,
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
run(network, nodes, simulation_settings, stream_type)?;
|
||||
}
|
||||
simulations::settings::NodeSettings::Dummy => {
|
||||
let mut network = Network::new(regions_data);
|
||||
let nodes = node_ids
|
||||
.iter()
|
||||
.map(|node_id| {
|
||||
@ -115,7 +151,7 @@ fn run<M, N: Node>(
|
||||
network: Network<M>,
|
||||
nodes: Vec<N>,
|
||||
settings: SimulationSettings,
|
||||
stream_type: StreamType,
|
||||
stream_type: Option<StreamType>,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
M: Clone + Send + Sync + 'static,
|
||||
@ -125,30 +161,47 @@ where
|
||||
{
|
||||
let stream_settings = settings.stream_settings.clone();
|
||||
let runner =
|
||||
SimulationRunner::<_, _, OutData>::new(network, nodes, Default::default(), settings);
|
||||
SimulationRunner::<_, _, OutData>::new(network, nodes, Default::default(), settings)?;
|
||||
macro_rules! bail {
|
||||
($settings: ident, $sub: ident) => {
|
||||
let handle = runner.simulate()?;
|
||||
let mut sub_handle = handle.subscribe::<$sub<OutData>>($settings)?;
|
||||
std::thread::spawn(move || {
|
||||
sub_handle.run();
|
||||
let mut data_subscriber_handle = handle.subscribe::<$sub<OutData>>($settings)?;
|
||||
let mut runtime_subscriber_handle =
|
||||
handle.subscribe::<RuntimeSubscriber<OutData>>(Default::default())?;
|
||||
let mut settings_subscriber_handle =
|
||||
handle.subscribe::<SettingsSubscriber<OutData>>(Default::default())?;
|
||||
std::thread::scope(|s| {
|
||||
s.spawn(move || {
|
||||
data_subscriber_handle.run();
|
||||
});
|
||||
|
||||
s.spawn(move || {
|
||||
runtime_subscriber_handle.run();
|
||||
});
|
||||
|
||||
s.spawn(move || {
|
||||
settings_subscriber_handle.run();
|
||||
});
|
||||
});
|
||||
handle.join()?;
|
||||
};
|
||||
}
|
||||
match stream_type {
|
||||
StreamType::Naive => {
|
||||
Some(StreamType::Naive) => {
|
||||
let settings = stream_settings.unwrap_naive();
|
||||
bail!(settings, NaiveSubscriber);
|
||||
}
|
||||
StreamType::IO => {
|
||||
Some(StreamType::IO) => {
|
||||
let settings = stream_settings.unwrap_io();
|
||||
bail!(settings, IOSubscriber);
|
||||
}
|
||||
StreamType::Polars => {
|
||||
Some(StreamType::Polars) => {
|
||||
let settings = stream_settings.unwrap_polars();
|
||||
bail!(settings, PolarsSubscriber);
|
||||
}
|
||||
None => {
|
||||
runner.simulate()?.join()?;
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
@ -162,27 +215,34 @@ fn load_json_from_file<T: DeserializeOwned>(path: &Path) -> anyhow::Result<T> {
|
||||
// Helper method to pregenerate views.
|
||||
// TODO: Remove once shared overlay can generate new views on demand.
|
||||
fn generate_overlays<R: Rng>(
|
||||
node_ids: &[NodeId],
|
||||
overlay: &SimulationOverlay,
|
||||
overlay_count: usize,
|
||||
leader_count: usize,
|
||||
rng: &mut R,
|
||||
_node_ids: &[NodeId],
|
||||
_overlay: &SimulationOverlay,
|
||||
_overlay_count: usize,
|
||||
_leader_count: usize,
|
||||
_rng: &mut R,
|
||||
) -> BTreeMap<usize, ViewOverlay> {
|
||||
(0..overlay_count)
|
||||
.map(|view_id| {
|
||||
(
|
||||
view_id,
|
||||
ViewOverlay {
|
||||
leaders: overlay.leaders(node_ids, leader_count, rng).collect(),
|
||||
layout: overlay.layout(node_ids, rng),
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
// TODO: This call needs to be removed
|
||||
Default::default()
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
let filter = std::env::var("SIMULATION_LOG").unwrap_or_else(|_| "info".to_owned());
|
||||
let subscriber = tracing_subscriber::fmt::fmt()
|
||||
.without_time()
|
||||
.with_line_number(true)
|
||||
.with_env_filter(filter)
|
||||
.with_file(false)
|
||||
.with_target(true)
|
||||
.with_ansi(true)
|
||||
.finish();
|
||||
tracing::subscriber::set_global_default(subscriber)
|
||||
.expect("config_tracing is only called once");
|
||||
|
||||
let app: SimulationApp = SimulationApp::parse();
|
||||
app.run()?;
|
||||
|
||||
if let Err(e) = app.run() {
|
||||
tracing::error!("Error: {}", e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -6,3 +6,7 @@ pub mod runner;
|
||||
pub mod settings;
|
||||
pub mod streaming;
|
||||
pub mod warding;
|
||||
|
||||
pub mod util;
|
||||
static START_TIME: once_cell::sync::Lazy<std::time::Instant> =
|
||||
once_cell::sync::Lazy::new(std::time::Instant::now);
|
||||
|
@ -4,7 +4,7 @@ use std::{collections::HashMap, time::Duration};
|
||||
use rand::Rng;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::{regions::Region, NetworkSettings};
|
||||
use super::{NetworkBehaviourKey, NetworkSettings};
|
||||
// internal
|
||||
|
||||
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
|
||||
@ -31,10 +31,10 @@ impl NetworkBehaviour {
|
||||
// network behaviors for pairs of NodeIds.
|
||||
pub fn create_behaviours(
|
||||
network_settings: &NetworkSettings,
|
||||
) -> HashMap<(Region, Region), NetworkBehaviour> {
|
||||
) -> HashMap<NetworkBehaviourKey, NetworkBehaviour> {
|
||||
network_settings
|
||||
.network_behaviors
|
||||
.iter()
|
||||
.map(|((a, b), d)| ((*a, *b), NetworkBehaviour::new(*d, 0.0)))
|
||||
.map(|(k, d)| (*k, NetworkBehaviour::new(*d, 0.0)))
|
||||
.collect()
|
||||
}
|
||||
|
@ -2,13 +2,14 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
ops::Add,
|
||||
str::FromStr,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
// crates
|
||||
use crossbeam::channel::{self, Receiver, Sender};
|
||||
use rand::{rngs::ThreadRng, Rng};
|
||||
use rayon::prelude::*;
|
||||
use serde::Deserialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
// internal
|
||||
use crate::node::NodeId;
|
||||
|
||||
@ -17,9 +18,45 @@ pub mod regions;
|
||||
|
||||
type NetworkTime = Instant;
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Default)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub struct NetworkBehaviourKey {
|
||||
pub from: regions::Region,
|
||||
pub to: regions::Region,
|
||||
}
|
||||
|
||||
impl NetworkBehaviourKey {
|
||||
pub fn new(from: regions::Region, to: regions::Region) -> Self {
|
||||
Self { from, to }
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for NetworkBehaviourKey {
|
||||
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
|
||||
let s = format!("{}:{}", self.from, self.to);
|
||||
serializer.serialize_str(&s)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for NetworkBehaviourKey {
|
||||
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
|
||||
let s = String::deserialize(deserializer)?;
|
||||
let mut split = s.split(':');
|
||||
let from = split.next().ok_or(serde::de::Error::custom(
|
||||
"NetworkBehaviourKey should be in the form of `from_region:to_region`",
|
||||
))?;
|
||||
let to = split.next().ok_or(serde::de::Error::custom(
|
||||
"NetworkBehaviourKey should be in the form of `from_region:to_region`",
|
||||
))?;
|
||||
Ok(Self::new(
|
||||
regions::Region::from_str(from).map_err(serde::de::Error::custom)?,
|
||||
regions::Region::from_str(to).map_err(serde::de::Error::custom)?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
|
||||
pub struct NetworkSettings {
|
||||
pub network_behaviors: HashMap<(regions::Region, regions::Region), Duration>,
|
||||
pub network_behaviors: HashMap<NetworkBehaviourKey, Duration>,
|
||||
/// Represents node distribution in the simulated regions.
|
||||
/// The sum of distributions should be 1.
|
||||
pub regions: HashMap<regions::Region, f32>,
|
||||
@ -193,7 +230,7 @@ mod tests {
|
||||
regions::{Region, RegionsData},
|
||||
Network, NetworkInterface, NetworkMessage,
|
||||
};
|
||||
use crate::node::NodeId;
|
||||
use crate::{network::NetworkBehaviourKey, node::NodeId, util::node_id};
|
||||
use crossbeam::channel::{self, Receiver, Sender};
|
||||
use std::{collections::HashMap, time::Duration};
|
||||
|
||||
@ -232,12 +269,12 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn send_receive_messages() {
|
||||
let node_a = 0.into();
|
||||
let node_b = 1.into();
|
||||
let node_a = node_id(0);
|
||||
let node_b = node_id(1);
|
||||
|
||||
let regions = HashMap::from([(Region::Europe, vec![node_a, node_b])]);
|
||||
let behaviour = HashMap::from([(
|
||||
(Region::Europe, Region::Europe),
|
||||
NetworkBehaviourKey::new(Region::Europe, Region::Europe),
|
||||
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
|
||||
)]);
|
||||
let regions_data = RegionsData::new(regions, behaviour);
|
||||
@ -281,9 +318,9 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn regions_send_receive_messages() {
|
||||
let node_a = 0.into();
|
||||
let node_b = 1.into();
|
||||
let node_c = 2.into();
|
||||
let node_a = node_id(0);
|
||||
let node_b = node_id(1);
|
||||
let node_c = node_id(2);
|
||||
|
||||
let regions = HashMap::from([
|
||||
(Region::Asia, vec![node_a, node_b]),
|
||||
@ -291,15 +328,15 @@ mod tests {
|
||||
]);
|
||||
let behaviour = HashMap::from([
|
||||
(
|
||||
(Region::Asia, Region::Asia),
|
||||
NetworkBehaviourKey::new(Region::Asia, Region::Asia),
|
||||
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
|
||||
),
|
||||
(
|
||||
(Region::Asia, Region::Europe),
|
||||
NetworkBehaviourKey::new(Region::Asia, Region::Europe),
|
||||
NetworkBehaviour::new(Duration::from_millis(500), 0.0),
|
||||
),
|
||||
(
|
||||
(Region::Europe, Region::Europe),
|
||||
NetworkBehaviourKey::new(Region::Europe, Region::Europe),
|
||||
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
|
||||
),
|
||||
]);
|
||||
|
@ -1,14 +1,14 @@
|
||||
// std
|
||||
use rand::{seq::SliceRandom, Rng};
|
||||
use std::collections::HashMap;
|
||||
use std::{collections::HashMap, str::FromStr};
|
||||
// crates
|
||||
use serde::{Deserialize, Serialize};
|
||||
// internal
|
||||
use crate::{network::behaviour::NetworkBehaviour, node::NodeId};
|
||||
|
||||
use super::NetworkSettings;
|
||||
use super::{NetworkBehaviourKey, NetworkSettings};
|
||||
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
|
||||
pub enum Region {
|
||||
NorthAmerica,
|
||||
Europe,
|
||||
@ -18,18 +18,74 @@ pub enum Region {
|
||||
Australia,
|
||||
}
|
||||
|
||||
impl core::fmt::Display for Region {
|
||||
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||
let s = match self {
|
||||
Self::NorthAmerica => "NorthAmerica",
|
||||
Self::Europe => "Europe",
|
||||
Self::Asia => "Asia",
|
||||
Self::Africa => "Africa",
|
||||
Self::SouthAmerica => "SouthAmerica",
|
||||
Self::Australia => "Australia",
|
||||
};
|
||||
write!(f, "{s}")
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for Region {
|
||||
type Err = String;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s
|
||||
.trim()
|
||||
.to_lowercase()
|
||||
.replace(['-', '_', ' '], "")
|
||||
.as_str()
|
||||
{
|
||||
"northamerica" | "na" => Ok(Self::NorthAmerica),
|
||||
"europe" | "eu" => Ok(Self::Europe),
|
||||
"asia" | "as" => Ok(Self::Asia),
|
||||
"africa" | "af" => Ok(Self::Africa),
|
||||
"southamerica" | "sa" => Ok(Self::SouthAmerica),
|
||||
"australia" | "au" => Ok(Self::Australia),
|
||||
_ => Err(format!("Unknown region: {s}")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for Region {
|
||||
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
|
||||
let s = match self {
|
||||
Self::NorthAmerica => "North America",
|
||||
Self::Europe => "Europe",
|
||||
Self::Asia => "Asia",
|
||||
Self::Africa => "Africa",
|
||||
Self::SouthAmerica => "South America",
|
||||
Self::Australia => "Australia",
|
||||
};
|
||||
serializer.serialize_str(s)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for Region {
|
||||
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
|
||||
let s = String::deserialize(deserializer)?;
|
||||
Self::from_str(&s).map_err(serde::de::Error::custom)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct RegionsData {
|
||||
pub regions: HashMap<Region, Vec<NodeId>>,
|
||||
#[serde(skip)]
|
||||
pub node_region: HashMap<NodeId, Region>,
|
||||
pub region_network_behaviour: HashMap<(Region, Region), NetworkBehaviour>,
|
||||
pub region_network_behaviour: HashMap<NetworkBehaviourKey, NetworkBehaviour>,
|
||||
}
|
||||
|
||||
impl RegionsData {
|
||||
pub fn new(
|
||||
regions: HashMap<Region, Vec<NodeId>>,
|
||||
region_network_behaviour: HashMap<(Region, Region), NetworkBehaviour>,
|
||||
region_network_behaviour: HashMap<NetworkBehaviourKey, NetworkBehaviour>,
|
||||
) -> Self {
|
||||
let node_region = regions
|
||||
.iter()
|
||||
@ -49,9 +105,11 @@ impl RegionsData {
|
||||
pub fn network_behaviour(&self, node_a: NodeId, node_b: NodeId) -> &NetworkBehaviour {
|
||||
let region_a = self.node_region[&node_a];
|
||||
let region_b = self.node_region[&node_b];
|
||||
let k = NetworkBehaviourKey::new(region_a, region_b);
|
||||
let k_rev = NetworkBehaviourKey::new(region_b, region_a);
|
||||
self.region_network_behaviour
|
||||
.get(&(region_a, region_b))
|
||||
.or(self.region_network_behaviour.get(&(region_b, region_a)))
|
||||
.get(&k)
|
||||
.or(self.region_network_behaviour.get(&k_rev))
|
||||
.expect("Network behaviour not found for the given regions")
|
||||
}
|
||||
|
||||
@ -106,6 +164,7 @@ mod tests {
|
||||
NetworkSettings,
|
||||
},
|
||||
node::NodeId,
|
||||
util::node_id,
|
||||
};
|
||||
|
||||
#[test]
|
||||
@ -144,9 +203,7 @@ mod tests {
|
||||
let mut rng = StepRng::new(1, 0);
|
||||
|
||||
for tcase in test_cases.iter() {
|
||||
let nodes = (0..tcase.node_count)
|
||||
.map(Into::into)
|
||||
.collect::<Vec<NodeId>>();
|
||||
let nodes = (0..tcase.node_count).map(node_id).collect::<Vec<NodeId>>();
|
||||
|
||||
let available_regions = vec![
|
||||
Region::NorthAmerica,
|
||||
|
226
simulations/src/node/carnot/event_builder.rs
Normal file
226
simulations/src/node/carnot/event_builder.rs
Normal file
@ -0,0 +1,226 @@
|
||||
use crate::node::carnot::messages::CarnotMessage;
|
||||
use crate::util::parse_idx;
|
||||
use consensus_engine::{Carnot, NewView, Overlay, Qc, StandardQc, Timeout, TimeoutQc, View, Vote};
|
||||
use nomos_consensus::network::messages::{NewViewMsg, TimeoutMsg, VoteMsg};
|
||||
use nomos_consensus::NodeId;
|
||||
use nomos_core::block::Block;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::hash::Hash;
|
||||
|
||||
pub type CarnotTx = [u8; 32];
|
||||
|
||||
pub(crate) struct EventBuilder {
|
||||
id: NodeId,
|
||||
leader_vote_message: Tally<VoteMsg>,
|
||||
vote_message: Tally<VoteMsg>,
|
||||
timeout_message: Tally<TimeoutMsg>,
|
||||
new_view_message: Tally<NewViewMsg>,
|
||||
pub(crate) current_view: View,
|
||||
}
|
||||
|
||||
impl EventBuilder {
|
||||
pub fn new(id: NodeId) -> Self {
|
||||
Self {
|
||||
vote_message: Default::default(),
|
||||
leader_vote_message: Default::default(),
|
||||
timeout_message: Default::default(),
|
||||
new_view_message: Default::default(),
|
||||
current_view: View::default(),
|
||||
id,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn step<O: Overlay>(
|
||||
&mut self,
|
||||
messages: Vec<CarnotMessage>,
|
||||
engine: &Carnot<O>,
|
||||
) -> Vec<Event<CarnotTx>> {
|
||||
let mut events = Vec::new();
|
||||
// only run when the engine is in the genesis view
|
||||
if engine.highest_voted_view() == -1
|
||||
&& engine.overlay().is_member_of_leaf_committee(self.id)
|
||||
{
|
||||
tracing::info!(node = parse_idx(&self.id), "voting genesis",);
|
||||
let genesis = engine.genesis_block();
|
||||
events.push(Event::Approve {
|
||||
qc: Qc::Standard(StandardQc {
|
||||
view: genesis.view,
|
||||
id: genesis.id,
|
||||
}),
|
||||
block: genesis,
|
||||
votes: HashSet::new(),
|
||||
})
|
||||
}
|
||||
|
||||
for message in messages {
|
||||
match message {
|
||||
CarnotMessage::Proposal(msg) => {
|
||||
let block = Block::from_bytes(&msg.chunk);
|
||||
tracing::info!(
|
||||
node=parse_idx(&self.id),
|
||||
current_view = engine.current_view(),
|
||||
block_view=block.header().view,
|
||||
block=?block.header().id,
|
||||
parent_block=?block.header().parent(),
|
||||
"receive proposal message",
|
||||
);
|
||||
events.push(Event::Proposal { block })
|
||||
}
|
||||
CarnotMessage::TimeoutQc(msg) => {
|
||||
events.push(Event::TimeoutQc { timeout_qc: msg.qc });
|
||||
}
|
||||
CarnotMessage::Vote(msg) => {
|
||||
let msg_view = msg.vote.view;
|
||||
let block_id = msg.vote.block;
|
||||
let voter = msg.voter;
|
||||
let is_next_view_leader = engine.is_next_leader();
|
||||
let is_message_from_root_committee =
|
||||
engine.overlay().is_member_of_root_committee(voter);
|
||||
|
||||
let tally = if is_message_from_root_committee {
|
||||
&mut self.leader_vote_message
|
||||
} else {
|
||||
&mut self.vote_message
|
||||
};
|
||||
|
||||
let Some(qc) = msg.qc.clone() else {
|
||||
tracing::warn!(node=?parse_idx(&self.id), current_view = engine.current_view(), "received vote without QC");
|
||||
continue;
|
||||
};
|
||||
|
||||
// if the message comes from the root committee, then use the leader threshold, otherwise use the leaf threshold
|
||||
let threshold = if is_message_from_root_committee {
|
||||
engine.leader_super_majority_threshold()
|
||||
} else {
|
||||
engine.super_majority_threshold()
|
||||
};
|
||||
|
||||
if let Some(votes) = tally.tally_by(msg_view, msg, threshold) {
|
||||
if let Some(block) = engine
|
||||
.blocks_in_view(msg_view)
|
||||
.iter()
|
||||
.find(|block| block.id == block_id)
|
||||
.cloned()
|
||||
{
|
||||
tracing::info!(
|
||||
node=parse_idx(&self.id),
|
||||
votes=votes.len(),
|
||||
current_view = engine.current_view(),
|
||||
block_view=block.view,
|
||||
block=?block.id,
|
||||
"approve block",
|
||||
);
|
||||
|
||||
if is_next_view_leader && is_message_from_root_committee {
|
||||
events.push(Event::ProposeBlock {
|
||||
qc: Qc::Standard(StandardQc {
|
||||
view: block.view,
|
||||
id: block.id,
|
||||
}),
|
||||
});
|
||||
} else {
|
||||
events.push(Event::Approve {
|
||||
qc,
|
||||
block,
|
||||
votes: votes.into_iter().map(|v| v.vote).collect(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
CarnotMessage::Timeout(msg) => {
|
||||
let msg_view = msg.vote.view;
|
||||
if let Some(timeouts) = self.timeout_message.tally(msg_view, msg) {
|
||||
events.push(Event::RootTimeout {
|
||||
timeouts: timeouts.into_iter().map(|v| v.vote).collect(),
|
||||
})
|
||||
}
|
||||
}
|
||||
CarnotMessage::NewView(msg) => {
|
||||
let msg_view = msg.vote.view;
|
||||
let timeout_qc = msg.vote.timeout_qc.clone();
|
||||
self.current_view = core::cmp::max(self.current_view, msg_view);
|
||||
// if we are the leader, then use the leader threshold, otherwise use the leaf threshold
|
||||
let threshold = if engine.is_next_leader() {
|
||||
engine.leader_super_majority_threshold()
|
||||
} else {
|
||||
engine.super_majority_threshold()
|
||||
};
|
||||
|
||||
if let Some(new_views) =
|
||||
self.new_view_message.tally_by(msg_view, msg, threshold)
|
||||
{
|
||||
events.push(Event::NewView {
|
||||
new_views: new_views.into_iter().map(|v| v.vote).collect(),
|
||||
timeout_qc,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
events
|
||||
}
|
||||
}
|
||||
|
||||
struct Tally<T: core::hash::Hash + Eq> {
|
||||
cache: HashMap<View, HashSet<T>>,
|
||||
threshold: usize,
|
||||
}
|
||||
|
||||
impl<T: core::hash::Hash + Eq> Default for Tally<T> {
|
||||
fn default() -> Self {
|
||||
Self::new(0)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: core::hash::Hash + Eq> Tally<T> {
|
||||
fn new(threshold: usize) -> Self {
|
||||
Self {
|
||||
cache: Default::default(),
|
||||
threshold,
|
||||
}
|
||||
}
|
||||
|
||||
fn tally(&mut self, view: View, message: T) -> Option<HashSet<T>> {
|
||||
self.tally_by(view, message, self.threshold)
|
||||
}
|
||||
|
||||
fn tally_by(&mut self, view: View, message: T, threshold: usize) -> Option<HashSet<T>> {
|
||||
let entries = self.cache.entry(view).or_default();
|
||||
entries.insert(message);
|
||||
let entries = entries.len();
|
||||
if entries == threshold {
|
||||
Some(self.cache.remove(&view).unwrap())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum Event<Tx: Clone + Hash + Eq> {
|
||||
Proposal {
|
||||
block: Block<Tx>,
|
||||
},
|
||||
#[allow(dead_code)]
|
||||
Approve {
|
||||
qc: Qc,
|
||||
block: consensus_engine::Block,
|
||||
votes: HashSet<Vote>,
|
||||
},
|
||||
ProposeBlock {
|
||||
qc: Qc,
|
||||
},
|
||||
LocalTimeout,
|
||||
NewView {
|
||||
timeout_qc: TimeoutQc,
|
||||
new_views: HashSet<NewView>,
|
||||
},
|
||||
TimeoutQc {
|
||||
timeout_qc: TimeoutQc,
|
||||
},
|
||||
RootTimeout {
|
||||
timeouts: HashSet<Timeout>,
|
||||
},
|
||||
None,
|
||||
}
|
30
simulations/src/node/carnot/message_cache.rs
Normal file
30
simulations/src/node/carnot/message_cache.rs
Normal file
@ -0,0 +1,30 @@
|
||||
use crate::node::carnot::messages::CarnotMessage;
|
||||
use consensus_engine::View;
|
||||
use polars::export::ahash::HashMap;
|
||||
|
||||
pub(crate) struct MessageCache {
|
||||
cache: HashMap<View, Vec<CarnotMessage>>,
|
||||
}
|
||||
|
||||
impl MessageCache {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
cache: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update<I: IntoIterator<Item = CarnotMessage>>(&mut self, messages: I) {
|
||||
for message in messages {
|
||||
let entry = self.cache.entry(message.view()).or_default();
|
||||
entry.push(message);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn prune(&mut self, view: View) {
|
||||
self.cache.retain(|v, _| v > &view);
|
||||
}
|
||||
|
||||
pub fn retrieve(&mut self, view: View) -> Vec<CarnotMessage> {
|
||||
self.cache.remove(&view).unwrap_or_default()
|
||||
}
|
||||
}
|
25
simulations/src/node/carnot/messages.rs
Normal file
25
simulations/src/node/carnot/messages.rs
Normal file
@ -0,0 +1,25 @@
|
||||
use consensus_engine::View;
|
||||
use nomos_consensus::network::messages::{
|
||||
NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg,
|
||||
};
|
||||
|
||||
#[derive(Eq, PartialEq, Hash, Clone)]
|
||||
pub enum CarnotMessage {
|
||||
Proposal(ProposalChunkMsg),
|
||||
Vote(VoteMsg),
|
||||
TimeoutQc(TimeoutQcMsg),
|
||||
Timeout(TimeoutMsg),
|
||||
NewView(NewViewMsg),
|
||||
}
|
||||
|
||||
impl CarnotMessage {
|
||||
pub fn view(&self) -> View {
|
||||
match self {
|
||||
CarnotMessage::Proposal(msg) => msg.view,
|
||||
CarnotMessage::Vote(msg) => msg.vote.view,
|
||||
CarnotMessage::TimeoutQc(msg) => msg.qc.view,
|
||||
CarnotMessage::Timeout(msg) => msg.vote.view,
|
||||
CarnotMessage::NewView(msg) => msg.vote.view,
|
||||
}
|
||||
}
|
||||
}
|
@ -1,33 +1,211 @@
|
||||
#![allow(dead_code)]
|
||||
|
||||
mod event_builder;
|
||||
mod message_cache;
|
||||
mod messages;
|
||||
|
||||
// std
|
||||
use std::hash::Hash;
|
||||
use std::{collections::HashMap, time::Duration};
|
||||
// crates
|
||||
use bls_signatures::PrivateKey;
|
||||
use rand::Rng;
|
||||
use serde::{Deserialize, Serialize};
|
||||
// internal
|
||||
use self::messages::CarnotMessage;
|
||||
use super::{Node, NodeId};
|
||||
use crate::network::{InMemoryNetworkInterface, NetworkInterface, NetworkMessage};
|
||||
use crate::node::carnot::event_builder::{CarnotTx, Event};
|
||||
use crate::node::carnot::message_cache::MessageCache;
|
||||
use crate::util::parse_idx;
|
||||
use consensus_engine::overlay::RandomBeaconState;
|
||||
use consensus_engine::{
|
||||
Block, BlockId, Carnot, Committee, Overlay, Payload, Qc, StandardQc, TimeoutQc, View, Vote,
|
||||
};
|
||||
use nomos_consensus::network::messages::ProposalChunkMsg;
|
||||
use nomos_consensus::{
|
||||
leader_selection::UpdateableLeaderSelection,
|
||||
network::messages::{NewViewMsg, TimeoutMsg, VoteMsg},
|
||||
};
|
||||
|
||||
#[derive(Default, Serialize)]
|
||||
pub struct CarnotState {}
|
||||
|
||||
#[derive(Clone, Default, Deserialize)]
|
||||
pub struct CarnotSettings {}
|
||||
|
||||
#[allow(dead_code)] // TODO: remove when handling settings
|
||||
pub struct CarnotNode {
|
||||
id: NodeId,
|
||||
state: CarnotState,
|
||||
settings: CarnotSettings,
|
||||
#[derive(Serialize)]
|
||||
pub struct CarnotState {
|
||||
current_view: View,
|
||||
highest_voted_view: View,
|
||||
local_high_qc: StandardQc,
|
||||
#[serde(serialize_with = "serialize_blocks")]
|
||||
safe_blocks: HashMap<BlockId, Block>,
|
||||
last_view_timeout_qc: Option<TimeoutQc>,
|
||||
latest_committed_block: Block,
|
||||
latest_committed_view: View,
|
||||
root_committe: Committee,
|
||||
parent_committe: Committee,
|
||||
child_committees: Vec<Committee>,
|
||||
committed_blocks: Vec<BlockId>,
|
||||
}
|
||||
|
||||
impl CarnotNode {
|
||||
pub fn new(id: NodeId) -> Self {
|
||||
/// Have to implement this manually because of the `serde_json` will panic if the key of map
|
||||
/// is not a string.
|
||||
fn serialize_blocks<S>(blocks: &HashMap<BlockId, Block>, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeMap;
|
||||
let mut ser = serializer.serialize_map(Some(blocks.len()))?;
|
||||
for (k, v) in blocks {
|
||||
ser.serialize_entry(&format!("{k:?}"), v)?;
|
||||
}
|
||||
ser.end()
|
||||
}
|
||||
|
||||
impl<O: Overlay> From<&Carnot<O>> for CarnotState {
|
||||
fn from(value: &Carnot<O>) -> Self {
|
||||
let current_view = value.current_view();
|
||||
Self {
|
||||
id,
|
||||
state: Default::default(),
|
||||
settings: Default::default(),
|
||||
current_view,
|
||||
local_high_qc: value.high_qc(),
|
||||
parent_committe: value.parent_committee(),
|
||||
root_committe: value.root_committee(),
|
||||
child_committees: value.child_committees(),
|
||||
latest_committed_block: value.latest_committed_block(),
|
||||
latest_committed_view: value.latest_committed_view(),
|
||||
safe_blocks: value
|
||||
.blocks_in_view(current_view)
|
||||
.into_iter()
|
||||
.map(|b| (b.id, b))
|
||||
.collect(),
|
||||
last_view_timeout_qc: value.last_view_timeout_qc(),
|
||||
committed_blocks: value.committed_blocks(),
|
||||
highest_voted_view: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Node for CarnotNode {
|
||||
#[derive(Clone, Default, Deserialize)]
|
||||
pub struct CarnotSettings {
|
||||
nodes: Vec<consensus_engine::NodeId>,
|
||||
timeout: Duration,
|
||||
}
|
||||
|
||||
impl CarnotSettings {
|
||||
pub fn new(nodes: Vec<consensus_engine::NodeId>, timeout: Duration) -> Self {
|
||||
Self { nodes, timeout }
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // TODO: remove when handling settings
|
||||
pub struct CarnotNode<O: Overlay> {
|
||||
id: consensus_engine::NodeId,
|
||||
state: CarnotState,
|
||||
settings: CarnotSettings,
|
||||
network_interface: InMemoryNetworkInterface<CarnotMessage>,
|
||||
message_cache: MessageCache,
|
||||
event_builder: event_builder::EventBuilder,
|
||||
engine: Carnot<O>,
|
||||
random_beacon_pk: PrivateKey,
|
||||
}
|
||||
|
||||
impl<O: Overlay> CarnotNode<O> {
|
||||
pub fn new<R: Rng>(
|
||||
id: consensus_engine::NodeId,
|
||||
settings: CarnotSettings,
|
||||
overlay_settings: O::Settings,
|
||||
genesis: nomos_core::block::Block<CarnotTx>,
|
||||
network_interface: InMemoryNetworkInterface<CarnotMessage>,
|
||||
rng: &mut R,
|
||||
) -> Self {
|
||||
let overlay = O::new(overlay_settings);
|
||||
let engine = Carnot::from_genesis(id, genesis.header().clone(), overlay);
|
||||
let state = CarnotState::from(&engine);
|
||||
// pk is generated in an insecure way, but for simulation purpouses using a rng like smallrng is more useful
|
||||
let mut pk_buff = [0; 32];
|
||||
rng.fill_bytes(&mut pk_buff);
|
||||
let random_beacon_pk = PrivateKey::new(pk_buff);
|
||||
Self {
|
||||
id,
|
||||
state,
|
||||
settings,
|
||||
network_interface,
|
||||
message_cache: MessageCache::new(),
|
||||
event_builder: event_builder::EventBuilder::new(id),
|
||||
engine,
|
||||
random_beacon_pk,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn send_message(&self, message: NetworkMessage<CarnotMessage>) {
|
||||
self.network_interface
|
||||
.send_message(self.id, message.payload);
|
||||
}
|
||||
|
||||
fn handle_output(&self, output: Output<CarnotTx>) {
|
||||
match output {
|
||||
Output::Send(consensus_engine::Send {
|
||||
to,
|
||||
payload: Payload::Vote(vote),
|
||||
}) => {
|
||||
for node in to {
|
||||
self.network_interface.send_message(
|
||||
node,
|
||||
CarnotMessage::Vote(VoteMsg {
|
||||
voter: self.id,
|
||||
vote: vote.clone(),
|
||||
qc: Some(Qc::Standard(StandardQc {
|
||||
view: vote.view,
|
||||
id: vote.block,
|
||||
})),
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
Output::Send(consensus_engine::Send {
|
||||
to,
|
||||
payload: Payload::NewView(new_view),
|
||||
}) => {
|
||||
for node in to {
|
||||
self.network_interface.send_message(
|
||||
node,
|
||||
CarnotMessage::NewView(NewViewMsg {
|
||||
voter: node,
|
||||
vote: new_view.clone(),
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
Output::Send(consensus_engine::Send {
|
||||
to,
|
||||
payload: Payload::Timeout(timeout),
|
||||
}) => {
|
||||
for node in to {
|
||||
self.network_interface.send_message(
|
||||
node,
|
||||
CarnotMessage::Timeout(TimeoutMsg {
|
||||
voter: node,
|
||||
vote: timeout.clone(),
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
Output::BroadcastTimeoutQc { .. } => {
|
||||
unimplemented!()
|
||||
}
|
||||
Output::BroadcastProposal { proposal } => {
|
||||
for node in &self.settings.nodes {
|
||||
self.network_interface.send_message(
|
||||
*node,
|
||||
CarnotMessage::Proposal(ProposalChunkMsg {
|
||||
chunk: proposal.as_bytes().to_vec().into(),
|
||||
proposal: proposal.header().id,
|
||||
view: proposal.header().view,
|
||||
}),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for CarnotNode<O> {
|
||||
type Settings = CarnotSettings;
|
||||
type State = CarnotState;
|
||||
|
||||
@ -36,7 +214,7 @@ impl Node for CarnotNode {
|
||||
}
|
||||
|
||||
fn current_view(&self) -> usize {
|
||||
todo!()
|
||||
self.event_builder.current_view as usize
|
||||
}
|
||||
|
||||
fn state(&self) -> &CarnotState {
|
||||
@ -44,6 +222,143 @@ impl Node for CarnotNode {
|
||||
}
|
||||
|
||||
fn step(&mut self) {
|
||||
todo!()
|
||||
// split messages per view, we just one to process the current engine processing view or proposals or timeoutqcs
|
||||
let (mut current_view_messages, other_view_messages): (Vec<_>, Vec<_>) = self
|
||||
.network_interface
|
||||
.receive_messages()
|
||||
.into_iter()
|
||||
.map(|m| m.payload)
|
||||
.partition(|m| {
|
||||
m.view() == self.engine.current_view()
|
||||
|| matches!(m, CarnotMessage::Proposal(_) | CarnotMessage::TimeoutQc(_))
|
||||
});
|
||||
self.message_cache.prune(self.engine.current_view() - 1);
|
||||
self.message_cache.update(other_view_messages);
|
||||
current_view_messages.append(&mut self.message_cache.retrieve(self.engine.current_view()));
|
||||
|
||||
let events = self.event_builder.step(current_view_messages, &self.engine);
|
||||
|
||||
for event in events {
|
||||
let mut output: Vec<Output<CarnotTx>> = vec![];
|
||||
match event {
|
||||
Event::Proposal { block } => {
|
||||
let current_view = self.engine.current_view();
|
||||
tracing::info!(
|
||||
node=parse_idx(&self.id),
|
||||
last_committed_view=self.engine.latest_committed_view(),
|
||||
current_view = current_view,
|
||||
block_view = block.header().view,
|
||||
block = ?block.header().id,
|
||||
parent_block=?block.header().parent(),
|
||||
"receive block proposal",
|
||||
);
|
||||
match self.engine.receive_block(block.header().clone()) {
|
||||
Ok(mut new) => {
|
||||
if self.engine.current_view() != new.current_view() {
|
||||
new = new
|
||||
.update_overlay(|overlay| {
|
||||
overlay.update_leader_selection(|leader_selection| {
|
||||
leader_selection.on_new_block_received(block.clone())
|
||||
})
|
||||
})
|
||||
.unwrap_or(new);
|
||||
self.engine = new;
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::error!(node = parse_idx(&self.id), current_view = self.engine.current_view(), block_view = block.header().view, block = ?block.header().id, "receive block proposal, but is invalid");
|
||||
}
|
||||
}
|
||||
|
||||
if self.engine.overlay().is_member_of_leaf_committee(self.id) {
|
||||
output.push(Output::Send(consensus_engine::Send {
|
||||
to: self.engine.parent_committee(),
|
||||
payload: Payload::Vote(Vote {
|
||||
view: self.engine.current_view(),
|
||||
block: block.header().id,
|
||||
}),
|
||||
}))
|
||||
}
|
||||
}
|
||||
// This branch means we already get enough votes for this block
|
||||
// So we can just call approve_block
|
||||
Event::Approve { block, .. } => {
|
||||
tracing::info!(
|
||||
node = parse_idx(&self.id),
|
||||
current_view = self.engine.current_view(),
|
||||
block_view = block.view,
|
||||
block = ?block.id,
|
||||
parent_block=?block.parent(),
|
||||
"receive approve message"
|
||||
);
|
||||
let (new, out) = self.engine.approve_block(block);
|
||||
tracing::info!(vote=?out, node=parse_idx(&self.id));
|
||||
output = vec![Output::Send(out)];
|
||||
self.engine = new;
|
||||
}
|
||||
Event::ProposeBlock { qc } => {
|
||||
output = vec![Output::BroadcastProposal {
|
||||
proposal: nomos_core::block::Block::new(
|
||||
qc.view() + 1,
|
||||
qc.clone(),
|
||||
[].into_iter(),
|
||||
self.id,
|
||||
RandomBeaconState::generate_happy(
|
||||
qc.view() + 1,
|
||||
&self.random_beacon_pk,
|
||||
),
|
||||
),
|
||||
}]
|
||||
}
|
||||
// This branch means we already get enough new view msgs for this qc
|
||||
// So we can just call approve_new_view
|
||||
Event::NewView {
|
||||
timeout_qc: _,
|
||||
new_views: _,
|
||||
} => {
|
||||
// let (new, out) = self.engine.approve_new_view(timeout_qc, new_views);
|
||||
// output = Some(out);
|
||||
// self.engine = new;
|
||||
// let next_view = timeout_qc.view + 2;
|
||||
// if self.engine.is_leader_for_view(next_view) {
|
||||
// self.gather_new_views(&[self.id].into_iter().collect(), timeout_qc);
|
||||
// }
|
||||
tracing::error!("unimplemented new view branch");
|
||||
unimplemented!()
|
||||
}
|
||||
Event::TimeoutQc { timeout_qc } => {
|
||||
self.engine = self.engine.receive_timeout_qc(timeout_qc);
|
||||
}
|
||||
Event::RootTimeout { timeouts } => {
|
||||
println!("root timeouts: {timeouts:?}");
|
||||
}
|
||||
Event::LocalTimeout => {
|
||||
tracing::error!("unimplemented local timeout branch");
|
||||
unreachable!("local timeout will never be constructed")
|
||||
}
|
||||
Event::None => {
|
||||
tracing::error!("unimplemented none branch");
|
||||
unreachable!("none event will never be constructed")
|
||||
}
|
||||
}
|
||||
|
||||
for output_event in output {
|
||||
self.handle_output(output_event);
|
||||
}
|
||||
}
|
||||
|
||||
// update state
|
||||
self.state = CarnotState::from(&self.engine);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum Output<Tx: Clone + Eq + Hash> {
|
||||
Send(consensus_engine::Send),
|
||||
BroadcastTimeoutQc {
|
||||
timeout_qc: TimeoutQc,
|
||||
},
|
||||
BroadcastProposal {
|
||||
proposal: nomos_core::block::Block<Tx>,
|
||||
},
|
||||
}
|
||||
|
@ -8,7 +8,7 @@ use crate::{
|
||||
node::{Node, NodeId},
|
||||
};
|
||||
|
||||
use super::{OverlayGetter, OverlayState, SharedState, ViewOverlay};
|
||||
use super::{CommitteeId, OverlayGetter, OverlayState, SharedState, ViewOverlay};
|
||||
|
||||
#[derive(Debug, Default, Serialize)]
|
||||
pub struct DummyState {
|
||||
@ -112,7 +112,7 @@ impl LocalView {
|
||||
let current_roots = view
|
||||
.layout
|
||||
.committees
|
||||
.get(&0.into())
|
||||
.get(&CommitteeId(0))
|
||||
.map(|c| c.nodes.clone());
|
||||
|
||||
Self {
|
||||
@ -435,7 +435,7 @@ mod tests {
|
||||
network::{
|
||||
behaviour::NetworkBehaviour,
|
||||
regions::{Region, RegionsData},
|
||||
InMemoryNetworkInterface, Network,
|
||||
InMemoryNetworkInterface, Network, NetworkBehaviourKey,
|
||||
},
|
||||
node::{
|
||||
dummy::{get_child_nodes, get_parent_nodes, get_roles, DummyRole},
|
||||
@ -445,6 +445,7 @@ mod tests {
|
||||
tree::{TreeOverlay, TreeSettings},
|
||||
Overlay,
|
||||
},
|
||||
util::node_id,
|
||||
};
|
||||
|
||||
use super::{DummyMessage, DummyNode, Intent, Vote};
|
||||
@ -452,7 +453,7 @@ mod tests {
|
||||
fn init_network(node_ids: &[NodeId]) -> Network<DummyMessage> {
|
||||
let regions = HashMap::from([(Region::Europe, node_ids.to_vec())]);
|
||||
let behaviour = HashMap::from([(
|
||||
(Region::Europe, Region::Europe),
|
||||
NetworkBehaviourKey::new(Region::Europe, Region::Europe),
|
||||
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
|
||||
)]);
|
||||
let regions_data = RegionsData::new(regions, behaviour);
|
||||
@ -516,7 +517,7 @@ mod tests {
|
||||
.for_each(|leader_id| {
|
||||
for _ in 0..committee_size {
|
||||
nodes
|
||||
.get(&0.into())
|
||||
.get(&node_id(0))
|
||||
.unwrap()
|
||||
.send_message(*leader_id, DummyMessage::Vote(initial_vote.clone()));
|
||||
}
|
||||
@ -538,7 +539,7 @@ mod tests {
|
||||
let mut network = init_network(&node_ids);
|
||||
|
||||
let view = ViewOverlay {
|
||||
leaders: vec![0.into(), 1.into(), 2.into()],
|
||||
leaders: vec![node_id(0), node_id(1), node_id(2)],
|
||||
layout: overlay.layout(&node_ids, &mut rng),
|
||||
};
|
||||
let overlay_state = Arc::new(RwLock::new(OverlayState {
|
||||
@ -556,9 +557,9 @@ mod tests {
|
||||
let initial_vote = Vote::new(1, Intent::FromRootToLeader);
|
||||
|
||||
// Using any node as the sender for initial proposal to leader nodes.
|
||||
nodes[&0.into()].send_message(0.into(), DummyMessage::Vote(initial_vote.clone()));
|
||||
nodes[&0.into()].send_message(1.into(), DummyMessage::Vote(initial_vote.clone()));
|
||||
nodes[&0.into()].send_message(2.into(), DummyMessage::Vote(initial_vote));
|
||||
nodes[&node_id(0)].send_message(node_id(0), DummyMessage::Vote(initial_vote.clone()));
|
||||
nodes[&node_id(0)].send_message(node_id(1), DummyMessage::Vote(initial_vote.clone()));
|
||||
nodes[&node_id(0)].send_message(node_id(2), DummyMessage::Vote(initial_vote));
|
||||
network.collect_messages();
|
||||
|
||||
for (_, node) in nodes.iter() {
|
||||
@ -586,15 +587,15 @@ mod tests {
|
||||
}
|
||||
|
||||
// Root and Internal haven't sent their votes yet.
|
||||
assert!(!nodes[&0.into()].state().view_state[&1].vote_sent); // Root
|
||||
assert!(!nodes[&1.into()].state().view_state[&1].vote_sent); // Internal
|
||||
assert!(!nodes[&2.into()].state().view_state[&1].vote_sent); // Internal
|
||||
assert!(!nodes[&node_id(0)].state().view_state[&1].vote_sent); // Root
|
||||
assert!(!nodes[&node_id(1)].state().view_state[&1].vote_sent); // Internal
|
||||
assert!(!nodes[&node_id(2)].state().view_state[&1].vote_sent); // Internal
|
||||
|
||||
// Leaves should have thier vote sent.
|
||||
assert!(nodes[&3.into()].state().view_state[&1].vote_sent); // Leaf
|
||||
assert!(nodes[&4.into()].state().view_state[&1].vote_sent); // Leaf
|
||||
assert!(nodes[&5.into()].state().view_state[&1].vote_sent); // Leaf
|
||||
assert!(nodes[&6.into()].state().view_state[&1].vote_sent); // Leaf
|
||||
assert!(nodes[&node_id(3)].state().view_state[&1].vote_sent); // Leaf
|
||||
assert!(nodes[&node_id(4)].state().view_state[&1].vote_sent); // Leaf
|
||||
assert!(nodes[&node_id(5)].state().view_state[&1].vote_sent); // Leaf
|
||||
assert!(nodes[&node_id(6)].state().view_state[&1].vote_sent); // Leaf
|
||||
|
||||
// 3. Internal nodes send vote to root node.
|
||||
network.dispatch_after(Duration::from_millis(100));
|
||||
@ -604,15 +605,15 @@ mod tests {
|
||||
network.collect_messages();
|
||||
|
||||
// Root hasn't sent its votes yet.
|
||||
assert!(!nodes[&0.into()].state().view_state[&1].vote_sent); // Root
|
||||
assert!(!nodes[&node_id(0)].state().view_state[&1].vote_sent); // Root
|
||||
|
||||
// Internal and leaves should have thier vote sent.
|
||||
assert!(nodes[&1.into()].state().view_state[&1].vote_sent); // Internal
|
||||
assert!(nodes[&2.into()].state().view_state[&1].vote_sent); // Internal
|
||||
assert!(nodes[&3.into()].state().view_state[&1].vote_sent); // Leaf
|
||||
assert!(nodes[&4.into()].state().view_state[&1].vote_sent); // Leaf
|
||||
assert!(nodes[&5.into()].state().view_state[&1].vote_sent); // Leaf
|
||||
assert!(nodes[&6.into()].state().view_state[&1].vote_sent); // Leaf
|
||||
assert!(nodes[&node_id(1)].state().view_state[&1].vote_sent); // Internal
|
||||
assert!(nodes[&node_id(2)].state().view_state[&1].vote_sent); // Internal
|
||||
assert!(nodes[&node_id(3)].state().view_state[&1].vote_sent); // Leaf
|
||||
assert!(nodes[&node_id(4)].state().view_state[&1].vote_sent); // Leaf
|
||||
assert!(nodes[&node_id(5)].state().view_state[&1].vote_sent); // Leaf
|
||||
assert!(nodes[&node_id(6)].state().view_state[&1].vote_sent); // Leaf
|
||||
|
||||
// 4. Root node send vote to next view leader nodes.
|
||||
network.dispatch_after(Duration::from_millis(100));
|
||||
@ -622,13 +623,13 @@ mod tests {
|
||||
network.collect_messages();
|
||||
|
||||
// Root has sent its votes.
|
||||
assert!(nodes[&0.into()].state().view_state[&1].vote_sent); // Root
|
||||
assert!(nodes[&1.into()].state().view_state[&1].vote_sent); // Internal
|
||||
assert!(nodes[&2.into()].state().view_state[&1].vote_sent); // Internal
|
||||
assert!(nodes[&3.into()].state().view_state[&1].vote_sent); // Leaf
|
||||
assert!(nodes[&4.into()].state().view_state[&1].vote_sent); // Leaf
|
||||
assert!(nodes[&5.into()].state().view_state[&1].vote_sent); // Leaf
|
||||
assert!(nodes[&6.into()].state().view_state[&1].vote_sent); // Leaf
|
||||
assert!(nodes[&node_id(0)].state().view_state[&1].vote_sent); // Root
|
||||
assert!(nodes[&node_id(1)].state().view_state[&1].vote_sent); // Internal
|
||||
assert!(nodes[&node_id(2)].state().view_state[&1].vote_sent); // Internal
|
||||
assert!(nodes[&node_id(3)].state().view_state[&1].vote_sent); // Leaf
|
||||
assert!(nodes[&node_id(4)].state().view_state[&1].vote_sent); // Leaf
|
||||
assert!(nodes[&node_id(5)].state().view_state[&1].vote_sent); // Leaf
|
||||
assert!(nodes[&node_id(6)].state().view_state[&1].vote_sent); // Leaf
|
||||
|
||||
// 5. Leaders receive vote and broadcast new Proposal(Block) to all nodes.
|
||||
network.dispatch_after(Duration::from_millis(100));
|
||||
@ -656,15 +657,15 @@ mod tests {
|
||||
}
|
||||
|
||||
// Root and Internal haven't sent their votes yet.
|
||||
assert!(!nodes[&0.into()].state().view_state[&2].vote_sent); // Root
|
||||
assert!(!nodes[&1.into()].state().view_state[&2].vote_sent); // Internal
|
||||
assert!(!nodes[&2.into()].state().view_state[&2].vote_sent); // Internal
|
||||
assert!(!nodes[&node_id(0)].state().view_state[&2].vote_sent); // Root
|
||||
assert!(!nodes[&node_id(1)].state().view_state[&2].vote_sent); // Internal
|
||||
assert!(!nodes[&node_id(2)].state().view_state[&2].vote_sent); // Internal
|
||||
|
||||
// Leaves should have thier vote sent.
|
||||
assert!(nodes[&3.into()].state().view_state[&2].vote_sent); // Leaf
|
||||
assert!(nodes[&4.into()].state().view_state[&2].vote_sent); // Leaf
|
||||
assert!(nodes[&5.into()].state().view_state[&2].vote_sent); // Leaf
|
||||
assert!(nodes[&6.into()].state().view_state[&2].vote_sent); // Leaf
|
||||
assert!(nodes[&node_id(3)].state().view_state[&2].vote_sent); // Leaf
|
||||
assert!(nodes[&node_id(4)].state().view_state[&2].vote_sent); // Leaf
|
||||
assert!(nodes[&node_id(5)].state().view_state[&2].vote_sent); // Leaf
|
||||
assert!(nodes[&node_id(6)].state().view_state[&2].vote_sent); // Leaf
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -685,7 +686,7 @@ mod tests {
|
||||
}));
|
||||
|
||||
// There are more nodes in the network than in a tree overlay.
|
||||
let node_ids: Vec<NodeId> = (0..100).map(Into::into).collect();
|
||||
let node_ids: Vec<NodeId> = (0..100).map(node_id).collect();
|
||||
let mut network = init_network(&node_ids);
|
||||
|
||||
let overlays = generate_overlays(&node_ids, &overlay, 4, 3, &mut rng);
|
||||
@ -735,7 +736,7 @@ mod tests {
|
||||
}));
|
||||
|
||||
// There are more nodes in the network than in a tree overlay.
|
||||
let node_ids: Vec<NodeId> = (0..10000).map(Into::into).collect();
|
||||
let node_ids: Vec<NodeId> = (0..10000).map(node_id).collect();
|
||||
let mut network = init_network(&node_ids);
|
||||
|
||||
let overlays = generate_overlays(&node_ids, &overlay, 4, 100, &mut rng);
|
||||
@ -785,7 +786,7 @@ mod tests {
|
||||
}));
|
||||
|
||||
// There are more nodes in the network than in a tree overlay.
|
||||
let node_ids: Vec<NodeId> = (0..100000).map(Into::into).collect();
|
||||
let node_ids: Vec<NodeId> = (0..100000).map(node_id).collect();
|
||||
let mut network = init_network(&node_ids);
|
||||
|
||||
let overlays = generate_overlays(&node_ids, &overlay, 4, 1000, &mut rng);
|
||||
@ -824,42 +825,42 @@ mod tests {
|
||||
(
|
||||
0,
|
||||
None,
|
||||
Some(BTreeSet::from([1.into(), 2.into()])),
|
||||
Some(BTreeSet::from([node_id(1), node_id(2)])),
|
||||
vec![DummyRole::Root],
|
||||
),
|
||||
(
|
||||
1,
|
||||
Some(BTreeSet::from([0.into()])),
|
||||
Some(BTreeSet::from([3.into(), 4.into()])),
|
||||
Some(BTreeSet::from([node_id(0)])),
|
||||
Some(BTreeSet::from([node_id(3), node_id(4)])),
|
||||
vec![DummyRole::Internal],
|
||||
),
|
||||
(
|
||||
2,
|
||||
Some(BTreeSet::from([0.into()])),
|
||||
Some(BTreeSet::from([5.into(), 6.into()])),
|
||||
Some(BTreeSet::from([node_id(0)])),
|
||||
Some(BTreeSet::from([node_id(5), node_id(6)])),
|
||||
vec![DummyRole::Internal],
|
||||
),
|
||||
(
|
||||
3,
|
||||
Some(BTreeSet::from([1.into()])),
|
||||
Some(BTreeSet::from([node_id(1)])),
|
||||
None,
|
||||
vec![DummyRole::Leaf],
|
||||
),
|
||||
(
|
||||
4,
|
||||
Some(BTreeSet::from([1.into()])),
|
||||
Some(BTreeSet::from([node_id(1)])),
|
||||
None,
|
||||
vec![DummyRole::Leaf],
|
||||
),
|
||||
(
|
||||
5,
|
||||
Some(BTreeSet::from([2.into()])),
|
||||
Some(BTreeSet::from([node_id(2)])),
|
||||
None,
|
||||
vec![DummyRole::Leaf],
|
||||
),
|
||||
(
|
||||
6,
|
||||
Some(BTreeSet::from([2.into()])),
|
||||
Some(BTreeSet::from([node_id(2)])),
|
||||
None,
|
||||
vec![DummyRole::Leader, DummyRole::Leaf],
|
||||
),
|
||||
@ -871,12 +872,12 @@ mod tests {
|
||||
committee_size: 1,
|
||||
});
|
||||
let node_ids: Vec<NodeId> = overlay.nodes();
|
||||
let leaders = vec![6.into()];
|
||||
let leaders = vec![node_id(6)];
|
||||
let layout = overlay.layout(&node_ids, &mut rng);
|
||||
let view = ViewOverlay { leaders, layout };
|
||||
|
||||
for (node_id, expected_parents, expected_children, expected_roles) in test_cases {
|
||||
let node_id = node_id.into();
|
||||
for (nid, expected_parents, expected_children, expected_roles) in test_cases {
|
||||
let node_id = node_id(nid);
|
||||
let parents = get_parent_nodes(node_id, &view);
|
||||
let children = get_child_nodes(node_id, &view);
|
||||
let role = get_roles(node_id, &view, &parents, &children);
|
||||
|
@ -17,27 +17,7 @@ use serde::{Deserialize, Serialize};
|
||||
// internal
|
||||
use crate::overlay::{Layout, OverlaySettings, SimulationOverlay};
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
|
||||
#[serde(transparent)]
|
||||
pub struct NodeId(usize);
|
||||
|
||||
impl NodeId {
|
||||
#[inline]
|
||||
pub const fn new(id: usize) -> Self {
|
||||
Self(id)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub const fn inner(&self) -> usize {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl From<usize> for NodeId {
|
||||
fn from(id: usize) -> Self {
|
||||
Self(id)
|
||||
}
|
||||
}
|
||||
pub use consensus_engine::NodeId;
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
|
||||
#[serde(transparent)]
|
||||
@ -183,7 +163,7 @@ impl Node for usize {
|
||||
type State = Self;
|
||||
|
||||
fn id(&self) -> NodeId {
|
||||
(*self).into()
|
||||
crate::util::node_id(*self)
|
||||
}
|
||||
|
||||
fn current_view(&self) -> usize {
|
||||
|
@ -1,16 +1,97 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::settings::SimulationSettings;
|
||||
use crate::warding::SimulationState;
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
pub enum RecordType {
|
||||
Meta,
|
||||
Settings,
|
||||
Data,
|
||||
}
|
||||
|
||||
pub trait Record: From<Runtime> + From<SimulationSettings> + Send + Sync + 'static {
|
||||
fn record_type(&self) -> RecordType;
|
||||
|
||||
fn is_settings(&self) -> bool {
|
||||
self.record_type() == RecordType::Settings
|
||||
}
|
||||
|
||||
fn is_meta(&self) -> bool {
|
||||
self.record_type() == RecordType::Meta
|
||||
}
|
||||
|
||||
fn is_data(&self) -> bool {
|
||||
self.record_type() == RecordType::Data
|
||||
}
|
||||
}
|
||||
|
||||
pub type SerializedNodeState = serde_json::Value;
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct OutData(SerializedNodeState);
|
||||
pub struct Runtime {
|
||||
start: DateTime<Utc>,
|
||||
end: DateTime<Utc>,
|
||||
elapsed: Duration,
|
||||
}
|
||||
|
||||
impl Runtime {
|
||||
pub(crate) fn load() -> anyhow::Result<Self> {
|
||||
let elapsed = crate::START_TIME.elapsed();
|
||||
let end = Utc::now();
|
||||
Ok(Self {
|
||||
start: end
|
||||
.checked_sub_signed(chrono::Duration::from_std(elapsed)?)
|
||||
.unwrap(),
|
||||
end,
|
||||
elapsed,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum OutData {
|
||||
Runtime(Runtime),
|
||||
Settings(Box<SimulationSettings>),
|
||||
Data(SerializedNodeState),
|
||||
}
|
||||
|
||||
impl From<Runtime> for OutData {
|
||||
fn from(runtime: Runtime) -> Self {
|
||||
Self::Runtime(runtime)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SimulationSettings> for OutData {
|
||||
fn from(settings: SimulationSettings) -> Self {
|
||||
Self::Settings(Box::new(settings))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SerializedNodeState> for OutData {
|
||||
fn from(state: SerializedNodeState) -> Self {
|
||||
Self::Data(state)
|
||||
}
|
||||
}
|
||||
|
||||
impl Record for OutData {
|
||||
fn record_type(&self) -> RecordType {
|
||||
match self {
|
||||
Self::Runtime(_) => RecordType::Meta,
|
||||
Self::Settings(_) => RecordType::Settings,
|
||||
Self::Data(_) => RecordType::Data,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl OutData {
|
||||
#[inline]
|
||||
pub const fn new(state: SerializedNodeState) -> Self {
|
||||
Self(state)
|
||||
Self::Data(state)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6,6 +6,7 @@ use rand::Rng;
|
||||
use super::Overlay;
|
||||
use crate::node::NodeId;
|
||||
use crate::overlay::{Committee, Layout};
|
||||
use crate::util::node_id;
|
||||
|
||||
pub struct FlatOverlay;
|
||||
impl FlatOverlay {
|
||||
@ -22,7 +23,7 @@ impl Default for FlatOverlay {
|
||||
|
||||
impl Overlay for FlatOverlay {
|
||||
fn nodes(&self) -> Vec<NodeId> {
|
||||
(0..10).map(NodeId::from).collect()
|
||||
(0..10).map(node_id).collect()
|
||||
}
|
||||
|
||||
fn leaders<R: Rng>(
|
||||
|
@ -5,7 +5,7 @@ pub mod tree;
|
||||
use std::collections::{BTreeSet, HashMap};
|
||||
// crates
|
||||
use rand::Rng;
|
||||
use serde::Deserialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
// internal
|
||||
use crate::node::{CommitteeId, NodeId};
|
||||
|
||||
@ -97,7 +97,7 @@ pub enum SimulationOverlay {
|
||||
Tree(tree::TreeOverlay),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum OverlaySettings {
|
||||
Flat,
|
||||
Tree(TreeSettings),
|
||||
|
@ -2,18 +2,21 @@
|
||||
use std::collections::HashMap;
|
||||
// crates
|
||||
use rand::seq::IteratorRandom;
|
||||
use serde::Deserialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
// internal
|
||||
use super::{Committee, Layout, Overlay};
|
||||
use crate::node::{CommitteeId, NodeId};
|
||||
use crate::{
|
||||
node::{CommitteeId, NodeId},
|
||||
util::node_id,
|
||||
};
|
||||
|
||||
#[derive(Clone, Debug, Default, Deserialize)]
|
||||
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
|
||||
pub enum TreeType {
|
||||
#[default]
|
||||
FullBinaryTree,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct TreeSettings {
|
||||
pub tree_type: TreeType,
|
||||
pub committee_size: usize,
|
||||
@ -105,7 +108,7 @@ impl TreeOverlay {
|
||||
impl Overlay for TreeOverlay {
|
||||
fn nodes(&self) -> Vec<NodeId> {
|
||||
let properties = get_tree_properties(&self.settings);
|
||||
(0..properties.node_count).map(From::from).collect()
|
||||
(0..properties.node_count).map(node_id).collect()
|
||||
}
|
||||
|
||||
fn leaders<R: rand::Rng>(
|
||||
@ -151,6 +154,8 @@ fn get_layer(id: usize) -> CommitteeId {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::util::node_id;
|
||||
|
||||
use super::*;
|
||||
use rand::rngs::mock::StepRng;
|
||||
|
||||
@ -224,13 +229,13 @@ mod tests {
|
||||
|
||||
let root_nodes = &layout.committees[&CommitteeId::new(0)].nodes;
|
||||
assert_eq!(root_nodes.len(), 10);
|
||||
assert_eq!(root_nodes.first(), Some(&NodeId::new(0)));
|
||||
assert_eq!(root_nodes.last(), Some(&NodeId::new(9)));
|
||||
assert_eq!(root_nodes.first(), Some(&node_id(0)));
|
||||
assert_eq!(root_nodes.last(), Some(&node_id(9)));
|
||||
|
||||
let last_nodes = &layout.committees[&CommitteeId::new(1022)].nodes;
|
||||
assert_eq!(last_nodes.len(), 10);
|
||||
assert_eq!(last_nodes.first(), Some(&NodeId::new(10220)));
|
||||
assert_eq!(last_nodes.last(), Some(&NodeId::new(10229)));
|
||||
assert_eq!(last_nodes.first(), Some(&node_id(10220)));
|
||||
assert_eq!(last_nodes.last(), Some(&node_id(10229)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -1,4 +1,5 @@
|
||||
use crate::node::{Node, NodeId};
|
||||
use crate::output_processors::Record;
|
||||
use crate::runner::SimulationRunner;
|
||||
use crate::warding::SimulationState;
|
||||
use crossbeam::channel::bounded;
|
||||
@ -21,7 +22,11 @@ where
|
||||
N: Send + Sync + 'static,
|
||||
N::Settings: Clone + Send,
|
||||
N::State: Serialize,
|
||||
R: for<'a> TryFrom<&'a SimulationState<N>, Error = anyhow::Error> + Send + Sync + 'static,
|
||||
R: Record
|
||||
+ for<'a> TryFrom<&'a SimulationState<N>, Error = anyhow::Error>
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
{
|
||||
let simulation_state = SimulationState::<N> {
|
||||
nodes: Arc::clone(&runner.nodes),
|
||||
|
@ -1,5 +1,7 @@
|
||||
use crate::node::{Node, NodeId};
|
||||
use crate::output_processors::Record;
|
||||
use crate::runner::SimulationRunner;
|
||||
use crate::util::{node_id, parse_idx};
|
||||
use crate::warding::SimulationState;
|
||||
use crossbeam::channel::bounded;
|
||||
use crossbeam::select;
|
||||
@ -23,7 +25,11 @@ where
|
||||
N: Send + Sync + 'static,
|
||||
N::Settings: Clone + Send,
|
||||
N::State: Serialize,
|
||||
R: for<'a> TryFrom<&'a SimulationState<N>, Error = anyhow::Error> + Send + Sync + 'static,
|
||||
R: Record
|
||||
+ for<'a> TryFrom<&'a SimulationState<N>, Error = anyhow::Error>
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
{
|
||||
let simulation_state = SimulationState {
|
||||
nodes: Arc::clone(&runner.nodes),
|
||||
@ -31,7 +37,7 @@ where
|
||||
|
||||
let inner_runner = runner.inner.clone();
|
||||
let nodes = runner.nodes;
|
||||
let nodes_remaining: BTreeSet<NodeId> = (0..nodes.read().len()).map(From::from).collect();
|
||||
let nodes_remaining: BTreeSet<NodeId> = (0..nodes.read().len()).map(node_id).collect();
|
||||
let iterations: Vec<_> = (0..maximum_iterations).collect();
|
||||
let (stop_tx, stop_rx) = bounded(1);
|
||||
let p = runner.producer.clone();
|
||||
@ -56,7 +62,7 @@ where
|
||||
{
|
||||
let mut shared_nodes = nodes.write();
|
||||
let node: &mut N = shared_nodes
|
||||
.get_mut(node_id.inner())
|
||||
.get_mut(parse_idx(&node_id))
|
||||
.expect("Node should be present");
|
||||
node.step();
|
||||
}
|
||||
|
@ -40,7 +40,9 @@ use rand::rngs::SmallRng;
|
||||
use serde::Serialize;
|
||||
// internal
|
||||
use crate::node::{Node, NodeId};
|
||||
use crate::output_processors::Record;
|
||||
use crate::runner::SimulationRunner;
|
||||
use crate::util::parse_idx;
|
||||
use crate::warding::SimulationState;
|
||||
|
||||
use super::SimulationRunnerHandle;
|
||||
@ -56,7 +58,11 @@ where
|
||||
N: Send + Sync + 'static,
|
||||
N::Settings: Clone + Send,
|
||||
N::State: Serialize,
|
||||
R: for<'a> TryFrom<&'a SimulationState<N>, Error = anyhow::Error> + Send + Sync + 'static,
|
||||
R: Record
|
||||
+ for<'a> TryFrom<&'a SimulationState<N>, Error = anyhow::Error>
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
{
|
||||
let distribution =
|
||||
distribution.unwrap_or_else(|| std::iter::repeat(1.0f32).take(gap).collect());
|
||||
@ -91,7 +97,7 @@ where
|
||||
{
|
||||
let mut shared_nodes = nodes.write();
|
||||
let node: &mut N = shared_nodes
|
||||
.get_mut(node_id.inner())
|
||||
.get_mut(parse_idx(&node_id))
|
||||
.expect("Node should be present");
|
||||
let prev_view = node.current_view();
|
||||
node.step();
|
||||
|
@ -7,6 +7,7 @@ mod sync_runner;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::output_processors::Record;
|
||||
// crates
|
||||
use crate::streaming::{StreamProducer, Subscriber, SubscriberHandle};
|
||||
use crossbeam::channel::Sender;
|
||||
@ -28,7 +29,7 @@ pub struct SimulationRunnerHandle<R> {
|
||||
handle: std::thread::JoinHandle<anyhow::Result<()>>,
|
||||
}
|
||||
|
||||
impl<R: Send + Sync + 'static> SimulationRunnerHandle<R> {
|
||||
impl<R: Record> SimulationRunnerHandle<R> {
|
||||
pub fn stop_after(self, duration: Duration) -> anyhow::Result<()> {
|
||||
std::thread::sleep(duration);
|
||||
self.stop()
|
||||
@ -76,7 +77,7 @@ where
|
||||
.any(|x| x)
|
||||
}
|
||||
|
||||
fn step<N>(&mut self, nodes: &mut Vec<N>)
|
||||
fn step<N>(&mut self, nodes: &mut [N])
|
||||
where
|
||||
N: Node + Send + Sync,
|
||||
N::Settings: Clone + Send,
|
||||
@ -108,19 +109,28 @@ where
|
||||
N: Send + Sync + 'static,
|
||||
N::Settings: Clone + Send,
|
||||
N::State: Serialize,
|
||||
R: for<'a> TryFrom<&'a SimulationState<N>, Error = anyhow::Error> + Send + Sync + 'static,
|
||||
R: Record
|
||||
+ for<'a> TryFrom<&'a SimulationState<N>, Error = anyhow::Error>
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
{
|
||||
pub fn new(
|
||||
network: Network<M>,
|
||||
nodes: Vec<N>,
|
||||
producer: StreamProducer<R>,
|
||||
settings: SimulationSettings,
|
||||
) -> Self {
|
||||
mut settings: SimulationSettings,
|
||||
) -> anyhow::Result<Self> {
|
||||
let seed = settings
|
||||
.seed
|
||||
.unwrap_or_else(|| rand::thread_rng().next_u64());
|
||||
|
||||
println!("Seed: {seed}");
|
||||
settings
|
||||
.seed
|
||||
.get_or_insert_with(|| rand::thread_rng().next_u64());
|
||||
|
||||
// Store the settings to the producer so that we can collect them later
|
||||
producer.send(R::from(settings.clone()))?;
|
||||
|
||||
let rng = SmallRng::seed_from_u64(seed);
|
||||
let nodes = Arc::new(RwLock::new(nodes));
|
||||
@ -136,7 +146,7 @@ where
|
||||
leaders_count: _,
|
||||
network_settings: _,
|
||||
} = settings;
|
||||
Self {
|
||||
Ok(Self {
|
||||
runner_settings,
|
||||
inner: Arc::new(RwLock::new(SimulationRunnerInner {
|
||||
network,
|
||||
@ -145,10 +155,13 @@ where
|
||||
})),
|
||||
nodes,
|
||||
producer,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn simulate(self) -> anyhow::Result<SimulationRunnerHandle<R>> {
|
||||
// init the start time
|
||||
let _ = *crate::START_TIME;
|
||||
|
||||
match self.runner_settings.clone() {
|
||||
RunnerSettings::Sync => sync_runner::simulate(self),
|
||||
RunnerSettings::Async { chunks } => async_runner::simulate(self, chunks),
|
||||
|
@ -1,8 +1,8 @@
|
||||
use serde::Serialize;
|
||||
|
||||
use super::{SimulationRunner, SimulationRunnerHandle};
|
||||
use crate::node::Node;
|
||||
use crate::warding::SimulationState;
|
||||
use crate::{node::Node, output_processors::Record};
|
||||
use crossbeam::channel::{bounded, select};
|
||||
use std::sync::Arc;
|
||||
|
||||
@ -15,7 +15,11 @@ where
|
||||
N: Send + Sync + 'static,
|
||||
N::Settings: Clone + Send,
|
||||
N::State: Serialize,
|
||||
R: for<'a> TryFrom<&'a SimulationState<N>, Error = anyhow::Error> + Send + Sync + 'static,
|
||||
R: Record
|
||||
+ for<'a> TryFrom<&'a SimulationState<N>, Error = anyhow::Error>
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
{
|
||||
let state = SimulationState {
|
||||
nodes: Arc::clone(&runner.nodes),
|
||||
@ -67,7 +71,7 @@ mod tests {
|
||||
network::{
|
||||
behaviour::NetworkBehaviour,
|
||||
regions::{Region, RegionsData},
|
||||
InMemoryNetworkInterface, Network,
|
||||
InMemoryNetworkInterface, Network, NetworkBehaviourKey,
|
||||
},
|
||||
node::{
|
||||
dummy::{DummyMessage, DummyNode},
|
||||
@ -78,6 +82,7 @@ mod tests {
|
||||
runner::SimulationRunner,
|
||||
settings::SimulationSettings,
|
||||
streaming::StreamProducer,
|
||||
util::node_id,
|
||||
};
|
||||
use crossbeam::channel;
|
||||
use parking_lot::RwLock;
|
||||
@ -91,7 +96,7 @@ mod tests {
|
||||
fn init_network(node_ids: &[NodeId]) -> Network<DummyMessage> {
|
||||
let regions = HashMap::from([(Region::Europe, node_ids.to_vec())]);
|
||||
let behaviour = HashMap::from([(
|
||||
(Region::Europe, Region::Europe),
|
||||
NetworkBehaviourKey::new(Region::Europe, Region::Europe),
|
||||
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
|
||||
)]);
|
||||
let regions_data = RegionsData::new(regions, behaviour);
|
||||
@ -126,7 +131,7 @@ mod tests {
|
||||
};
|
||||
|
||||
let mut rng = StepRng::new(1, 0);
|
||||
let node_ids: Vec<NodeId> = (0..settings.node_count).map(Into::into).collect();
|
||||
let node_ids: Vec<NodeId> = (0..settings.node_count).map(node_id).collect();
|
||||
let overlay = TreeOverlay::new(settings.overlay_settings.clone().try_into().unwrap());
|
||||
let mut network = init_network(&node_ids);
|
||||
let view = ViewOverlay {
|
||||
@ -142,7 +147,7 @@ mod tests {
|
||||
|
||||
let producer = StreamProducer::default();
|
||||
let runner: SimulationRunner<DummyMessage, DummyNode, OutData> =
|
||||
SimulationRunner::new(network, nodes, producer, settings);
|
||||
SimulationRunner::new(network, nodes, producer, settings).unwrap();
|
||||
let mut nodes = runner.nodes.write();
|
||||
runner.inner.write().step(&mut nodes);
|
||||
drop(nodes);
|
||||
@ -161,7 +166,7 @@ mod tests {
|
||||
};
|
||||
|
||||
let mut rng = StepRng::new(1, 0);
|
||||
let node_ids: Vec<NodeId> = (0..settings.node_count).map(Into::into).collect();
|
||||
let node_ids: Vec<NodeId> = (0..settings.node_count).map(node_id).collect();
|
||||
let overlay = TreeOverlay::new(settings.overlay_settings.clone().try_into().unwrap());
|
||||
let mut network = init_network(&node_ids);
|
||||
let view = ViewOverlay {
|
||||
@ -188,7 +193,7 @@ mod tests {
|
||||
network.collect_messages();
|
||||
|
||||
let runner: SimulationRunner<DummyMessage, DummyNode, OutData> =
|
||||
SimulationRunner::new(network, nodes, Default::default(), settings);
|
||||
SimulationRunner::new(network, nodes, Default::default(), settings).unwrap();
|
||||
|
||||
let mut nodes = runner.nodes.write();
|
||||
runner.inner.write().step(&mut nodes);
|
||||
|
@ -2,9 +2,9 @@ use crate::network::NetworkSettings;
|
||||
use crate::overlay::OverlaySettings;
|
||||
use crate::streaming::StreamSettings;
|
||||
use crate::warding::Ward;
|
||||
use serde::Deserialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Default)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
|
||||
pub enum RunnerSettings {
|
||||
#[default]
|
||||
Sync,
|
||||
@ -21,20 +21,25 @@ pub enum RunnerSettings {
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Default)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
|
||||
#[serde(untagged)]
|
||||
pub enum NodeSettings {
|
||||
Carnot,
|
||||
Carnot {
|
||||
#[serde(with = "humantime_serde")]
|
||||
timeout: std::time::Duration,
|
||||
},
|
||||
#[default]
|
||||
Dummy,
|
||||
}
|
||||
|
||||
#[derive(Default, Deserialize)]
|
||||
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
|
||||
pub struct SimulationSettings {
|
||||
#[serde(default)]
|
||||
pub wards: Vec<Ward>,
|
||||
pub network_settings: NetworkSettings,
|
||||
pub overlay_settings: OverlaySettings,
|
||||
pub node_settings: NodeSettings,
|
||||
#[serde(default)]
|
||||
pub runner_settings: RunnerSettings,
|
||||
pub stream_settings: StreamSettings,
|
||||
pub node_count: usize,
|
||||
|
@ -1,15 +1,18 @@
|
||||
use std::{any::Any, io::stdout, sync::Arc};
|
||||
|
||||
use super::{Receivers, StreamSettings, Subscriber};
|
||||
use crate::output_processors::{RecordType, Runtime};
|
||||
use parking_lot::Mutex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, Default, Deserialize)]
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||
pub struct IOStreamSettings {
|
||||
#[serde(rename = "type")]
|
||||
pub writer_type: WriteType,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, Deserialize)]
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum WriteType {
|
||||
#[default]
|
||||
Stdout,
|
||||
@ -23,8 +26,7 @@ impl<W: std::io::Write + Send + Sync + 'static> ToWriter<W> for WriteType {
|
||||
fn to_writer(&self) -> anyhow::Result<W> {
|
||||
match self {
|
||||
WriteType::Stdout => {
|
||||
let stdout = Box::new(stdout());
|
||||
let boxed_any = Box::new(stdout) as Box<dyn Any + Send + Sync>;
|
||||
let boxed_any = Box::new(stdout()) as Box<dyn Any + Send + Sync>;
|
||||
Ok(boxed_any.downcast::<W>().map(|boxed| *boxed).map_err(|_| {
|
||||
std::io::Error::new(std::io::ErrorKind::Other, "Writer type mismatch")
|
||||
})?)
|
||||
@ -53,7 +55,7 @@ pub struct IOSubscriber<R, W = std::io::Stdout> {
|
||||
impl<W, R> Subscriber for IOSubscriber<R, W>
|
||||
where
|
||||
W: std::io::Write + Send + Sync + 'static,
|
||||
R: Serialize + Send + Sync + 'static,
|
||||
R: crate::output_processors::Record + Serialize,
|
||||
{
|
||||
type Record = R;
|
||||
type Settings = IOStreamSettings;
|
||||
@ -83,6 +85,8 @@ where
|
||||
loop {
|
||||
crossbeam::select! {
|
||||
recv(self.recvs.stop_rx) -> _ => {
|
||||
// collect the run time meta
|
||||
self.sink(Arc::new(R::from(Runtime::load()?)))?;
|
||||
break;
|
||||
}
|
||||
recv(self.recvs.recv) -> msg => {
|
||||
@ -98,6 +102,10 @@ where
|
||||
serde_json::to_writer(&mut *self.writer.lock(), &state)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn subscribe_data_type() -> RecordType {
|
||||
RecordType::Data
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@ -108,11 +116,12 @@ mod tests {
|
||||
network::{
|
||||
behaviour::NetworkBehaviour,
|
||||
regions::{Region, RegionsData},
|
||||
Network,
|
||||
Network, NetworkBehaviourKey,
|
||||
},
|
||||
node::{dummy_streaming::DummyStreamingNode, Node, NodeId},
|
||||
output_processors::OutData,
|
||||
runner::SimulationRunner,
|
||||
util::node_id,
|
||||
warding::SimulationState,
|
||||
};
|
||||
|
||||
@ -144,7 +153,7 @@ mod tests {
|
||||
};
|
||||
|
||||
let nodes = (0..6)
|
||||
.map(|idx| DummyStreamingNode::new(NodeId::from(idx), ()))
|
||||
.map(|idx| DummyStreamingNode::new(node_id(idx), ()))
|
||||
.collect::<Vec<_>>();
|
||||
let network = Network::new(RegionsData {
|
||||
regions: (0..6)
|
||||
@ -158,7 +167,7 @@ mod tests {
|
||||
5 => Region::Australia,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
(region, vec![idx.into()])
|
||||
(region, vec![node_id(idx)])
|
||||
})
|
||||
.collect(),
|
||||
node_region: (0..6)
|
||||
@ -172,7 +181,7 @@ mod tests {
|
||||
5 => Region::Australia,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
(idx.into(), region)
|
||||
(node_id(idx), region)
|
||||
})
|
||||
.collect(),
|
||||
region_network_behaviour: (0..6)
|
||||
@ -187,7 +196,7 @@ mod tests {
|
||||
_ => unreachable!(),
|
||||
};
|
||||
(
|
||||
(region, region),
|
||||
NetworkBehaviourKey::new(region, region),
|
||||
NetworkBehaviour {
|
||||
delay: Duration::from_millis(100),
|
||||
drop: 0.0,
|
||||
@ -197,7 +206,7 @@ mod tests {
|
||||
.collect(),
|
||||
});
|
||||
let simulation_runner: SimulationRunner<(), DummyStreamingNode<()>, OutData> =
|
||||
SimulationRunner::new(network, nodes, Default::default(), simulation_settings);
|
||||
SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap();
|
||||
simulation_runner
|
||||
.simulate()
|
||||
.unwrap()
|
||||
|
@ -7,9 +7,19 @@ use std::{
|
||||
use crossbeam::channel::{bounded, unbounded, Receiver, Sender};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::output_processors::{Record, RecordType, Runtime};
|
||||
|
||||
pub mod io;
|
||||
pub mod naive;
|
||||
pub mod polars;
|
||||
pub mod runtime_subscriber;
|
||||
pub mod settings_subscriber;
|
||||
|
||||
pub enum SubscriberType {
|
||||
Meta,
|
||||
Settings,
|
||||
Data,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Receivers<R> {
|
||||
@ -30,6 +40,7 @@ impl FromStr for StreamType {
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s.trim().to_ascii_lowercase().as_str() {
|
||||
"io" => Ok(Self::IO),
|
||||
"naive" => Ok(Self::Naive),
|
||||
"polars" => Ok(Self::Polars),
|
||||
tag => Err(format!(
|
||||
@ -49,7 +60,8 @@ impl<'de> serde::Deserialize<'de> for StreamType {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase", untagged)]
|
||||
pub enum StreamSettings {
|
||||
Naive(naive::NaiveSettings),
|
||||
IO(io::IOStreamSettings),
|
||||
@ -122,7 +134,7 @@ where
|
||||
match handle.join() {
|
||||
Ok(rst) => rst?,
|
||||
Err(_) => {
|
||||
eprintln!("Error joining subscriber thread");
|
||||
tracing::error!("Error joining subscriber thread");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -137,6 +149,7 @@ where
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Senders<R> {
|
||||
record_ty: RecordType,
|
||||
record_sender: Sender<Arc<R>>,
|
||||
stop_sender: Sender<()>,
|
||||
}
|
||||
@ -188,7 +201,7 @@ impl<R> StreamProducer<R> {
|
||||
|
||||
impl<R> StreamProducer<R>
|
||||
where
|
||||
R: Send + Sync + 'static,
|
||||
R: Record + Send + Sync + 'static,
|
||||
{
|
||||
pub fn send(&self, record: R) -> anyhow::Result<()> {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
@ -197,11 +210,18 @@ where
|
||||
Ok(())
|
||||
} else {
|
||||
let record = Arc::new(record);
|
||||
// cache record for new subscriber
|
||||
inner.record_cache.push(record.clone());
|
||||
|
||||
// if a send fails, then it means the corresponding subscriber is dropped,
|
||||
// we just remove the sender from the list of senders.
|
||||
inner
|
||||
.senders
|
||||
.retain(|tx| tx.record_sender.send(record.clone()).is_ok());
|
||||
inner.senders.retain(|tx| {
|
||||
if tx.record_ty != record.record_type() {
|
||||
true
|
||||
} else {
|
||||
tx.record_sender.send(Arc::clone(&record)).is_ok()
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -213,9 +233,18 @@ where
|
||||
let (tx, rx) = unbounded();
|
||||
let (stop_tx, stop_rx) = bounded(1);
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
|
||||
// send all previous records to the new subscriber
|
||||
for record in inner.record_cache.iter() {
|
||||
if S::subscribe_data_type() == record.record_type() {
|
||||
tx.send(Arc::clone(record))?;
|
||||
}
|
||||
}
|
||||
|
||||
inner.senders.push(Senders {
|
||||
record_sender: tx,
|
||||
stop_sender: stop_tx.clone(),
|
||||
record_ty: S::subscribe_data_type(),
|
||||
});
|
||||
Ok(SubscriberHandle {
|
||||
handle: None,
|
||||
@ -225,10 +254,22 @@ where
|
||||
}
|
||||
|
||||
pub fn stop(self) -> anyhow::Result<()> {
|
||||
let meta_record = Arc::new(R::from(Runtime::load()?));
|
||||
let inner = self.inner.lock().unwrap();
|
||||
|
||||
// send runtime record to runtime subscribers
|
||||
inner.senders.iter().for_each(|tx| {
|
||||
if tx.record_ty == meta_record.record_type() {
|
||||
if let Err(e) = tx.record_sender.send(Arc::clone(&meta_record)) {
|
||||
tracing::error!("Error sending meta record: {e}");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// send stop signal to all subscribers
|
||||
inner.senders.iter().for_each(|tx| {
|
||||
if let Err(e) = tx.stop_sender.send(()) {
|
||||
eprintln!("Error stopping subscriber: {e}");
|
||||
tracing::error!("Error stopping subscriber: {e}");
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
@ -237,7 +278,7 @@ where
|
||||
|
||||
pub trait Subscriber {
|
||||
type Settings;
|
||||
type Record: Serialize + Send + Sync + 'static;
|
||||
type Record: crate::output_processors::Record + Serialize;
|
||||
|
||||
fn new(
|
||||
record_recv: Receiver<Arc<Self::Record>>,
|
||||
@ -260,4 +301,6 @@ pub trait Subscriber {
|
||||
}
|
||||
|
||||
fn sink(&self, state: Arc<Self::Record>) -> anyhow::Result<()>;
|
||||
|
||||
fn subscribe_data_type() -> RecordType;
|
||||
}
|
||||
|
@ -1,4 +1,5 @@
|
||||
use super::{Receivers, StreamSettings, Subscriber};
|
||||
use crate::output_processors::{RecordType, Runtime};
|
||||
use parking_lot::Mutex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
@ -41,7 +42,7 @@ pub struct NaiveSubscriber<R> {
|
||||
|
||||
impl<R> Subscriber for NaiveSubscriber<R>
|
||||
where
|
||||
R: Serialize + Send + Sync + 'static,
|
||||
R: crate::output_processors::Record + Serialize,
|
||||
{
|
||||
type Record = R;
|
||||
|
||||
@ -70,7 +71,11 @@ where
|
||||
)),
|
||||
recvs: Arc::new(recvs),
|
||||
};
|
||||
eprintln!("Subscribed to {}", settings.path.display());
|
||||
tracing::info!(
|
||||
target = "simulation",
|
||||
"subscribed to {}",
|
||||
settings.path.display()
|
||||
);
|
||||
Ok(this)
|
||||
}
|
||||
|
||||
@ -82,6 +87,8 @@ where
|
||||
loop {
|
||||
crossbeam::select! {
|
||||
recv(self.recvs.stop_rx) -> _ => {
|
||||
// collect the run time meta
|
||||
self.sink(Arc::new(R::from(Runtime::load()?)))?;
|
||||
break;
|
||||
}
|
||||
recv(self.recvs.recv) -> msg => {
|
||||
@ -99,6 +106,10 @@ where
|
||||
file.write_all(b",\n")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn subscribe_data_type() -> RecordType {
|
||||
RecordType::Data
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@ -109,11 +120,12 @@ mod tests {
|
||||
network::{
|
||||
behaviour::NetworkBehaviour,
|
||||
regions::{Region, RegionsData},
|
||||
Network,
|
||||
Network, NetworkBehaviourKey,
|
||||
},
|
||||
node::{dummy_streaming::DummyStreamingNode, Node, NodeId},
|
||||
output_processors::OutData,
|
||||
runner::SimulationRunner,
|
||||
util::node_id,
|
||||
warding::SimulationState,
|
||||
};
|
||||
|
||||
@ -146,7 +158,7 @@ mod tests {
|
||||
};
|
||||
|
||||
let nodes = (0..6)
|
||||
.map(|idx| DummyStreamingNode::new(NodeId::from(idx), ()))
|
||||
.map(|idx| DummyStreamingNode::new(node_id(idx), ()))
|
||||
.collect::<Vec<_>>();
|
||||
let network = Network::new(RegionsData {
|
||||
regions: (0..6)
|
||||
@ -160,7 +172,7 @@ mod tests {
|
||||
5 => Region::Australia,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
(region, vec![idx.into()])
|
||||
(region, vec![node_id(idx)])
|
||||
})
|
||||
.collect(),
|
||||
node_region: (0..6)
|
||||
@ -174,7 +186,7 @@ mod tests {
|
||||
5 => Region::Australia,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
(idx.into(), region)
|
||||
(node_id(idx), region)
|
||||
})
|
||||
.collect(),
|
||||
region_network_behaviour: (0..6)
|
||||
@ -189,7 +201,7 @@ mod tests {
|
||||
_ => unreachable!(),
|
||||
};
|
||||
(
|
||||
(region, region),
|
||||
NetworkBehaviourKey::new(region, region),
|
||||
NetworkBehaviour {
|
||||
delay: Duration::from_millis(100),
|
||||
drop: 0.0,
|
||||
@ -199,7 +211,7 @@ mod tests {
|
||||
.collect(),
|
||||
});
|
||||
let simulation_runner: SimulationRunner<(), DummyStreamingNode<()>, OutData> =
|
||||
SimulationRunner::new(network, nodes, Default::default(), simulation_settings);
|
||||
SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap();
|
||||
simulation_runner.simulate().unwrap();
|
||||
}
|
||||
}
|
||||
|
@ -1,3 +1,5 @@
|
||||
use super::{Receivers, StreamSettings};
|
||||
use crate::output_processors::{RecordType, Runtime};
|
||||
use parking_lot::Mutex;
|
||||
use polars::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@ -8,8 +10,6 @@ use std::{
|
||||
str::FromStr,
|
||||
};
|
||||
|
||||
use super::{Receivers, StreamSettings};
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize)]
|
||||
pub enum PolarsFormat {
|
||||
Json,
|
||||
@ -45,7 +45,8 @@ impl<'de> Deserialize<'de> for PolarsFormat {
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PolarsSettings {
|
||||
pub format: PolarsFormat,
|
||||
pub path: PathBuf,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub path: Option<PathBuf>,
|
||||
}
|
||||
|
||||
impl TryFrom<StreamSettings> for PolarsSettings {
|
||||
@ -90,7 +91,7 @@ where
|
||||
|
||||
impl<R> super::Subscriber for PolarsSubscriber<R>
|
||||
where
|
||||
R: Serialize + Send + Sync + 'static,
|
||||
R: crate::output_processors::Record + Serialize,
|
||||
{
|
||||
type Record = R;
|
||||
type Settings = PolarsSettings;
|
||||
@ -110,9 +111,22 @@ where
|
||||
let this = PolarsSubscriber {
|
||||
data: Arc::new(Mutex::new(Vec::new())),
|
||||
recvs: Arc::new(recvs),
|
||||
path: settings.path.clone(),
|
||||
path: settings.path.clone().unwrap_or_else(|| {
|
||||
let mut p = std::env::temp_dir().join("polars");
|
||||
match settings.format {
|
||||
PolarsFormat::Json => p.set_extension("json"),
|
||||
PolarsFormat::Csv => p.set_extension("csv"),
|
||||
PolarsFormat::Parquet => p.set_extension("parquet"),
|
||||
};
|
||||
p
|
||||
}),
|
||||
format: settings.format,
|
||||
};
|
||||
tracing::info!(
|
||||
target = "simulation",
|
||||
"subscribed to {}",
|
||||
this.path.display()
|
||||
);
|
||||
Ok(this)
|
||||
}
|
||||
|
||||
@ -124,6 +138,8 @@ where
|
||||
loop {
|
||||
crossbeam::select! {
|
||||
recv(self.recvs.stop_rx) -> _ => {
|
||||
// collect the run time meta
|
||||
self.sink(Arc::new(R::from(Runtime::load()?)))?;
|
||||
return self.persist();
|
||||
}
|
||||
recv(self.recvs.recv) -> msg => {
|
||||
@ -137,6 +153,10 @@ where
|
||||
self.data.lock().push(state);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn subscribe_data_type() -> RecordType {
|
||||
RecordType::Data
|
||||
}
|
||||
}
|
||||
|
||||
fn dump_dataframe_to_json(data: &mut DataFrame, out_path: &Path) -> anyhow::Result<()> {
|
||||
|
202
simulations/src/streaming/runtime_subscriber.rs
Normal file
202
simulations/src/streaming/runtime_subscriber.rs
Normal file
@ -0,0 +1,202 @@
|
||||
use super::{Receivers, Subscriber};
|
||||
use crate::output_processors::{RecordType, Runtime};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
fs::{File, OpenOptions},
|
||||
io::Write,
|
||||
path::PathBuf,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct RuntimeSettings {
|
||||
pub path: PathBuf,
|
||||
}
|
||||
|
||||
impl Default for RuntimeSettings {
|
||||
fn default() -> Self {
|
||||
let mut tmp = std::env::temp_dir();
|
||||
tmp.push("simulation");
|
||||
tmp.set_extension("runtime");
|
||||
Self { path: tmp }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RuntimeSubscriber<R> {
|
||||
file: Arc<Mutex<File>>,
|
||||
recvs: Arc<Receivers<R>>,
|
||||
}
|
||||
|
||||
impl<R> Subscriber for RuntimeSubscriber<R>
|
||||
where
|
||||
R: crate::output_processors::Record + Serialize,
|
||||
{
|
||||
type Record = R;
|
||||
|
||||
type Settings = RuntimeSettings;
|
||||
|
||||
fn new(
|
||||
record_recv: crossbeam::channel::Receiver<Arc<Self::Record>>,
|
||||
stop_recv: crossbeam::channel::Receiver<()>,
|
||||
settings: Self::Settings,
|
||||
) -> anyhow::Result<Self>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
let mut opts = OpenOptions::new();
|
||||
let recvs = Receivers {
|
||||
stop_rx: stop_recv,
|
||||
recv: record_recv,
|
||||
};
|
||||
let this = RuntimeSubscriber {
|
||||
file: Arc::new(Mutex::new(
|
||||
opts.truncate(true)
|
||||
.create(true)
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open(&settings.path)?,
|
||||
)),
|
||||
recvs: Arc::new(recvs),
|
||||
};
|
||||
tracing::info!(
|
||||
taget = "simulation",
|
||||
"subscribed to {}",
|
||||
settings.path.display()
|
||||
);
|
||||
Ok(this)
|
||||
}
|
||||
|
||||
fn next(&self) -> Option<anyhow::Result<Arc<Self::Record>>> {
|
||||
Some(self.recvs.recv.recv().map_err(From::from))
|
||||
}
|
||||
|
||||
fn run(self) -> anyhow::Result<()> {
|
||||
crossbeam::select! {
|
||||
recv(self.recvs.stop_rx) -> _ => {
|
||||
// collect the run time meta
|
||||
self.sink(Arc::new(R::from(Runtime::load()?)))?;
|
||||
}
|
||||
recv(self.recvs.recv) -> msg => {
|
||||
self.sink(msg?)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn sink(&self, state: Arc<Self::Record>) -> anyhow::Result<()> {
|
||||
let mut file = self.file.lock().expect("failed to lock file");
|
||||
serde_json::to_writer(&mut *file, &state)?;
|
||||
file.write_all(b",\n")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn subscribe_data_type() -> RecordType {
|
||||
RecordType::Data
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{collections::HashMap, time::Duration};
|
||||
|
||||
use crate::{
|
||||
network::{
|
||||
behaviour::NetworkBehaviour,
|
||||
regions::{Region, RegionsData},
|
||||
Network, NetworkBehaviourKey,
|
||||
},
|
||||
node::{dummy_streaming::DummyStreamingNode, Node, NodeId},
|
||||
output_processors::OutData,
|
||||
runner::SimulationRunner,
|
||||
util::node_id,
|
||||
warding::SimulationState,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
struct RuntimeRecord {
|
||||
states: HashMap<NodeId, usize>,
|
||||
}
|
||||
|
||||
impl TryFrom<&SimulationState<DummyStreamingNode<()>>> for RuntimeRecord {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: &SimulationState<DummyStreamingNode<()>>) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
states: value
|
||||
.nodes
|
||||
.read()
|
||||
.iter()
|
||||
.map(|node| (node.id(), node.current_view()))
|
||||
.collect(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_streaming() {
|
||||
let simulation_settings = crate::settings::SimulationSettings {
|
||||
seed: Some(1),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let nodes = (0..6)
|
||||
.map(|idx| DummyStreamingNode::new(node_id(idx), ()))
|
||||
.collect::<Vec<_>>();
|
||||
let network = Network::new(RegionsData {
|
||||
regions: (0..6)
|
||||
.map(|idx| {
|
||||
let region = match idx % 6 {
|
||||
0 => Region::Europe,
|
||||
1 => Region::NorthAmerica,
|
||||
2 => Region::SouthAmerica,
|
||||
3 => Region::Asia,
|
||||
4 => Region::Africa,
|
||||
5 => Region::Australia,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
(region, vec![node_id(idx)])
|
||||
})
|
||||
.collect(),
|
||||
node_region: (0..6)
|
||||
.map(|idx| {
|
||||
let region = match idx % 6 {
|
||||
0 => Region::Europe,
|
||||
1 => Region::NorthAmerica,
|
||||
2 => Region::SouthAmerica,
|
||||
3 => Region::Asia,
|
||||
4 => Region::Africa,
|
||||
5 => Region::Australia,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
(node_id(idx), region)
|
||||
})
|
||||
.collect(),
|
||||
region_network_behaviour: (0..6)
|
||||
.map(|idx| {
|
||||
let region = match idx % 6 {
|
||||
0 => Region::Europe,
|
||||
1 => Region::NorthAmerica,
|
||||
2 => Region::SouthAmerica,
|
||||
3 => Region::Asia,
|
||||
4 => Region::Africa,
|
||||
5 => Region::Australia,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
(
|
||||
NetworkBehaviourKey::new(region, region),
|
||||
NetworkBehaviour {
|
||||
delay: Duration::from_millis(100),
|
||||
drop: 0.0,
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
});
|
||||
let simulation_runner: SimulationRunner<(), DummyStreamingNode<()>, OutData> =
|
||||
SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap();
|
||||
simulation_runner.simulate().unwrap();
|
||||
}
|
||||
}
|
202
simulations/src/streaming/settings_subscriber.rs
Normal file
202
simulations/src/streaming/settings_subscriber.rs
Normal file
@ -0,0 +1,202 @@
|
||||
use super::{Receivers, Subscriber};
|
||||
use crate::output_processors::{RecordType, Runtime};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
fs::{File, OpenOptions},
|
||||
io::Write,
|
||||
path::PathBuf,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SettingsSubscriberSettings {
|
||||
pub path: PathBuf,
|
||||
}
|
||||
|
||||
impl Default for SettingsSubscriberSettings {
|
||||
fn default() -> Self {
|
||||
let mut tmp = std::env::temp_dir();
|
||||
tmp.push("simulation");
|
||||
tmp.set_extension("conf");
|
||||
Self { path: tmp }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SettingsSubscriber<R> {
|
||||
file: Arc<Mutex<File>>,
|
||||
recvs: Arc<Receivers<R>>,
|
||||
}
|
||||
|
||||
impl<R> Subscriber for SettingsSubscriber<R>
|
||||
where
|
||||
R: crate::output_processors::Record + Serialize,
|
||||
{
|
||||
type Record = R;
|
||||
|
||||
type Settings = SettingsSubscriberSettings;
|
||||
|
||||
fn new(
|
||||
record_recv: crossbeam::channel::Receiver<Arc<Self::Record>>,
|
||||
stop_recv: crossbeam::channel::Receiver<()>,
|
||||
settings: Self::Settings,
|
||||
) -> anyhow::Result<Self>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
let mut opts = OpenOptions::new();
|
||||
let recvs = Receivers {
|
||||
stop_rx: stop_recv,
|
||||
recv: record_recv,
|
||||
};
|
||||
let this = SettingsSubscriber {
|
||||
file: Arc::new(Mutex::new(
|
||||
opts.truncate(true)
|
||||
.create(true)
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open(&settings.path)?,
|
||||
)),
|
||||
recvs: Arc::new(recvs),
|
||||
};
|
||||
tracing::info!(
|
||||
target = "simulation",
|
||||
"subscribed to {}",
|
||||
settings.path.display()
|
||||
);
|
||||
Ok(this)
|
||||
}
|
||||
|
||||
fn next(&self) -> Option<anyhow::Result<Arc<Self::Record>>> {
|
||||
Some(self.recvs.recv.recv().map_err(From::from))
|
||||
}
|
||||
|
||||
fn run(self) -> anyhow::Result<()> {
|
||||
crossbeam::select! {
|
||||
recv(self.recvs.stop_rx) -> _ => {
|
||||
// collect the run time meta
|
||||
self.sink(Arc::new(R::from(Runtime::load()?)))?;
|
||||
}
|
||||
recv(self.recvs.recv) -> msg => {
|
||||
self.sink(msg?)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn sink(&self, state: Arc<Self::Record>) -> anyhow::Result<()> {
|
||||
let mut file = self.file.lock().expect("failed to lock file");
|
||||
serde_json::to_writer(&mut *file, &state)?;
|
||||
file.write_all(b",\n")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn subscribe_data_type() -> RecordType {
|
||||
RecordType::Data
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{collections::HashMap, time::Duration};
|
||||
|
||||
use crate::{
|
||||
network::{
|
||||
behaviour::NetworkBehaviour,
|
||||
regions::{Region, RegionsData},
|
||||
Network, NetworkBehaviourKey,
|
||||
},
|
||||
node::{dummy_streaming::DummyStreamingNode, Node, NodeId},
|
||||
output_processors::OutData,
|
||||
runner::SimulationRunner,
|
||||
util::node_id,
|
||||
warding::SimulationState,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
struct SettingsRecord {
|
||||
states: HashMap<NodeId, usize>,
|
||||
}
|
||||
|
||||
impl TryFrom<&SimulationState<DummyStreamingNode<()>>> for SettingsRecord {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: &SimulationState<DummyStreamingNode<()>>) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
states: value
|
||||
.nodes
|
||||
.read()
|
||||
.iter()
|
||||
.map(|node| (node.id(), node.current_view()))
|
||||
.collect(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_streaming() {
|
||||
let simulation_settings = crate::settings::SimulationSettings {
|
||||
seed: Some(1),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let nodes = (0..6)
|
||||
.map(|idx| DummyStreamingNode::new(node_id(idx), ()))
|
||||
.collect::<Vec<_>>();
|
||||
let network = Network::new(RegionsData {
|
||||
regions: (0..6)
|
||||
.map(|idx| {
|
||||
let region = match idx % 6 {
|
||||
0 => Region::Europe,
|
||||
1 => Region::NorthAmerica,
|
||||
2 => Region::SouthAmerica,
|
||||
3 => Region::Asia,
|
||||
4 => Region::Africa,
|
||||
5 => Region::Australia,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
(region, vec![node_id(idx)])
|
||||
})
|
||||
.collect(),
|
||||
node_region: (0..6)
|
||||
.map(|idx| {
|
||||
let region = match idx % 6 {
|
||||
0 => Region::Europe,
|
||||
1 => Region::NorthAmerica,
|
||||
2 => Region::SouthAmerica,
|
||||
3 => Region::Asia,
|
||||
4 => Region::Africa,
|
||||
5 => Region::Australia,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
(node_id(idx), region)
|
||||
})
|
||||
.collect(),
|
||||
region_network_behaviour: (0..6)
|
||||
.map(|idx| {
|
||||
let region = match idx % 6 {
|
||||
0 => Region::Europe,
|
||||
1 => Region::NorthAmerica,
|
||||
2 => Region::SouthAmerica,
|
||||
3 => Region::Asia,
|
||||
4 => Region::Africa,
|
||||
5 => Region::Australia,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
(
|
||||
NetworkBehaviourKey::new(region, region),
|
||||
NetworkBehaviour {
|
||||
delay: Duration::from_millis(100),
|
||||
drop: 0.0,
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
});
|
||||
let simulation_runner: SimulationRunner<(), DummyStreamingNode<()>, OutData> =
|
||||
SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap();
|
||||
simulation_runner.simulate().unwrap();
|
||||
}
|
||||
}
|
20
simulations/src/util.rs
Normal file
20
simulations/src/util.rs
Normal file
@ -0,0 +1,20 @@
|
||||
/// Create a random node id.
|
||||
///
|
||||
/// The format is:
|
||||
///
|
||||
/// [0..4]: node index in big endian
|
||||
/// [4..32]: zeros
|
||||
pub fn node_id(id: usize) -> consensus_engine::NodeId {
|
||||
let mut bytes = [0; 32];
|
||||
bytes[..4].copy_from_slice((id as u32).to_be_bytes().as_ref());
|
||||
bytes
|
||||
}
|
||||
|
||||
/// Parse the original index from NodeId
|
||||
pub(crate) fn parse_idx(id: &consensus_engine::NodeId) -> usize {
|
||||
let mut bytes = [0; 4];
|
||||
bytes.copy_from_slice(&id[..4]);
|
||||
u32::from_be_bytes(bytes) as usize
|
||||
}
|
||||
|
||||
pub(crate) mod millis_duration {}
|
@ -1,10 +1,11 @@
|
||||
use crate::node::Node;
|
||||
use crate::warding::{SimulationState, SimulationWard};
|
||||
use serde::Deserialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// MinMaxView. It monitors the gap between a min view and max view, triggers when surpassing
|
||||
/// the max view - min view is larger than a gap.
|
||||
#[derive(Debug, Deserialize, Copy, Clone)]
|
||||
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
|
||||
#[serde(transparent)]
|
||||
pub struct MinMaxViewWard {
|
||||
max_gap: usize,
|
||||
}
|
||||
|
@ -2,7 +2,7 @@
|
||||
use std::sync::Arc;
|
||||
// crates
|
||||
use parking_lot::RwLock;
|
||||
use serde::Deserialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
// internal
|
||||
use crate::node::Node;
|
||||
|
||||
@ -32,7 +32,7 @@ pub trait SimulationWard<N> {
|
||||
|
||||
/// Ward dispatcher
|
||||
/// Enum to avoid Boxing (Box<dyn SimulationWard>) wards.
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum Ward {
|
||||
MaxView(ttf::MaxViewWard),
|
||||
|
@ -1,9 +1,9 @@
|
||||
use crate::node::Node;
|
||||
use crate::warding::{SimulationState, SimulationWard};
|
||||
use serde::Deserialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// StalledView. Track stalled nodes (e.g incoming queue is empty, the node doesn't write to other queues)
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct StalledViewWard {
|
||||
// the hash checksum
|
||||
consecutive_viewed_checkpoint: Option<u32>,
|
||||
|
@ -1,10 +1,11 @@
|
||||
use crate::node::Node;
|
||||
use crate::warding::{SimulationState, SimulationWard};
|
||||
use serde::Deserialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Time to finality ward. It monitors the amount of rounds of the simulations, triggers when surpassing
|
||||
/// the set threshold.
|
||||
#[derive(Debug, Deserialize, Copy, Clone)]
|
||||
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
|
||||
#[serde(transparent)]
|
||||
pub struct MaxViewWard {
|
||||
max_view: usize,
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user