diff --git a/simulations/Cargo.toml b/simulations/Cargo.toml index 0012d7da..ffe19436 100644 --- a/simulations/Cargo.toml +++ b/simulations/Cargo.toml @@ -11,6 +11,7 @@ arc-swap = "1.6" clap = { version = "4", features = ["derive"] } crc32fast = "1.3" crossbeam = { version = "0.8.2", features = ["crossbeam-channel"] } +consensus-engine = { path = "../consensus-engine" } fixed-slice-deque = "0.1.0-beta2" futures = "0.3" nomos-core = { path = "../nomos-core" } diff --git a/simulations/src/node/carnot/event_builder.rs b/simulations/src/node/carnot/event_builder.rs index f41a04c8..7c60b9d2 100644 --- a/simulations/src/node/carnot/event_builder.rs +++ b/simulations/src/node/carnot/event_builder.rs @@ -1,4 +1,5 @@ use crate::node::carnot::messages::CarnotMessage; +use consensus_engine::View; use nomos_consensus::network::messages::{NewViewMsg, TimeoutMsg, VoteMsg}; use nomos_consensus::Event::TimeoutQc; use nomos_consensus::{Event, NodeId}; @@ -17,9 +18,9 @@ pub struct EventBuilderSettings { pub struct EventBuilder { blocks: HashMap>, - vote_message: HashMap>, - timeout_message: HashMap>, - new_view_message: HashMap>, + vote_message: Tally, + timeout_message: Tally, + new_view_message: Tally, config: EventBuilderSettings, } @@ -53,56 +54,30 @@ impl EventBuilder { let msg_view = msg.vote.view; let block_id = msg.vote.block; let qc = msg.qc.clone().expect("empty QC from vote message")?; - let entries = self - .vote_message - .entry(msg_view) - .and_modify(|entry| entry.push(msg)) - .or_default() - .len(); - - if entries >= self.config.votes_threshold { - let entry = self.vote_message.remove(&msg_view).unwrap(); + if let Some(votes) = self.vote_message.tally(msg_view, msg) { events.push(Event::Approve { qc, block: self .blocks .get(&block_id) .expect(format!("cannot find block id {:?}", block_id).as_str())?, - votes: entry.into_iter().collect(), + votes: votes.into_iter().collect(), }) } } CarnotMessage::Timeout(msg) => { let msg_view = msg.vote.view; - let entires = self - .timeout_message - .entry(msg.vote.view) - .and_modify(|entry| entry.push(msg)) - .or_default() - .len(); - - if entires >= self.config.timeout_threshold { - let entry = self.timeout_message.remove(&msg_view).unwrap(); - events.push(Event::RootTimeout { - timeouts: entry.into_iter().map(|msg| msg.vote).collect(), - }) + if let Some(timeouts) = self.timeout_message.tally(msg_view, msg) { + events.push(Event::RootTimeout { timeouts }) } } CarnotMessage::NewView(msg) => { let msg_view = msg.vote.view; let timeout_qc = msg.vote.timeout_qc.clone(); - let entries = self - .new_view_message - .entry(msg.view) - .and_modify(|entry| entry.push(msg)) - .or_default() - .len(); - - if entries >= self.config.votes_threshold { - let entry = self.new_view_message.remove(&msg_view).unwrap(); + if let Some(new_views) = self.new_view_message.tally(msg_view, msg) { events.push(Event::NewView { + new_views, timeout_qc, - new_views: entry.into_iter().map(|msg| msg.vote).collect(), }) } } @@ -112,3 +87,38 @@ impl EventBuilder { events } } + +struct Tally { + cache: HashMap>, + threshold: usize, +} + +impl Default for Tally { + fn default() -> Self { + Self::new(0) + } +} + +impl Tally { + fn new(threshold: usize) -> Self { + Self { + cache: Default::default(), + threshold, + } + } + + fn tally(&mut self, view: View, message: T) -> Option> { + let entries = self + .cache + .entry(view) + .and_modify(|entry| entry.push(message)) + .or_default() + .len(); + + if entries >= self.threshold { + Some(self.cache.remove(&view).unwrap()) + } else { + None + } + } +}