diff --git a/simulations/Cargo.toml b/simulations/Cargo.toml index f04c87df..1b941633 100644 --- a/simulations/Cargo.toml +++ b/simulations/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] clap = { version = "4", features = ["derive"] } +crc32fast = "1.3" fixed-slice-deque = "0.1.0-beta2" nomos-core = { path = "../nomos-core" } polars = { version = "0.27", features = ["serde", "object", "json", "csv-file", "parquet", "dtype-struct"] } diff --git a/simulations/src/warding/mod.rs b/simulations/src/warding/mod.rs index f0abe64b..d84b53aa 100644 --- a/simulations/src/warding/mod.rs +++ b/simulations/src/warding/mod.rs @@ -6,6 +6,7 @@ use serde::Deserialize; use crate::node::Node; mod minmax; +mod stalled; mod ttf; pub struct SimulationState { diff --git a/simulations/src/warding/stalled.rs b/simulations/src/warding/stalled.rs new file mode 100644 index 00000000..19c2d6fd --- /dev/null +++ b/simulations/src/warding/stalled.rs @@ -0,0 +1,89 @@ +use crate::node::Node; +use crate::warding::{SimulationState, SimulationWard}; +use serde::Deserialize; + +/// StalledView. Track stalled nodes (e.g incoming queue is empty, the node doesn't write to other queues) +#[derive(Debug, Deserialize, Clone)] +pub struct StalledViewWard { + // the hash checksum + consecutive_viewed_checkpoint: Option, + // use to check if the node is stalled + criterion: usize, + threshold: usize, +} + +impl StalledViewWard { + fn update_state(&mut self, cks: u32) { + match &mut self.consecutive_viewed_checkpoint { + Some(cp) => { + if cks == *cp { + self.criterion += 1; + } else { + *cp = cks; + // reset the criterion + self.criterion = 0; + } + } + None => { + self.consecutive_viewed_checkpoint = Some(cks); + } + } + } +} + +impl SimulationWard for StalledViewWard { + type SimulationState = SimulationState; + fn analyze(&mut self, state: &Self::SimulationState) -> bool { + let nodes = state + .nodes + .read() + .expect("simulations: StalledViewWard panic when requiring a read lock"); + self.update_state(checksum(nodes.as_slice())); + self.criterion >= self.threshold + } +} + +#[inline] +fn checksum(nodes: &[N]) -> u32 { + let mut hash = crc32fast::Hasher::new(); + for node in nodes.iter() { + hash.update(&node.current_view().to_be_bytes()); + // TODO: hash messages in the node + } + + hash.finalize() +} + +#[cfg(test)] +mod test { + use super::*; + use std::sync::{Arc, RwLock}; + + #[test] + fn rebase_threshold() { + let mut stalled = StalledViewWard { + consecutive_viewed_checkpoint: None, + criterion: 0, + threshold: 2, + }; + let state = SimulationState { + nodes: Arc::new(RwLock::new(vec![10])), + }; + + // increase the criterion, 1 + assert!(!stalled.analyze(&state)); + // increase the criterion, 2 + assert!(!stalled.analyze(&state)); + // increase the criterion, 3 > threshold 2, so true + assert!(stalled.analyze(&state)); + + // push a new one, so the criterion is reset to 0 + state.nodes.write().unwrap().push(20); + assert!(!stalled.analyze(&state)); + + // increase the criterion, 2 + assert!(!stalled.analyze(&state)); + // increase the criterion, 3 > threshold 2, so true + assert!(stalled.analyze(&state)); + } +}