add Tally

This commit is contained in:
al8n 2023-05-17 00:16:45 +08:00
parent 5fad6aad04
commit 9ca5060ce7
2 changed files with 46 additions and 35 deletions

View File

@ -11,6 +11,7 @@ arc-swap = "1.6"
clap = { version = "4", features = ["derive"] }
crc32fast = "1.3"
crossbeam = { version = "0.8.2", features = ["crossbeam-channel"] }
consensus-engine = { path = "../consensus-engine" }
fixed-slice-deque = "0.1.0-beta2"
futures = "0.3"
nomos-core = { path = "../nomos-core" }

View File

@ -1,4 +1,5 @@
use crate::node::carnot::messages::CarnotMessage;
use consensus_engine::View;
use nomos_consensus::network::messages::{NewViewMsg, TimeoutMsg, VoteMsg};
use nomos_consensus::Event::TimeoutQc;
use nomos_consensus::{Event, NodeId};
@ -17,9 +18,9 @@ pub struct EventBuilderSettings {
pub struct EventBuilder {
blocks: HashMap<BlockId, Block<CarnotTx>>,
vote_message: HashMap<View, Vec<VoteMsg>>,
timeout_message: HashMap<View, Vec<TimeoutMsg>>,
new_view_message: HashMap<View, Vec<NewViewMsg>>,
vote_message: Tally<VoteMsg>,
timeout_message: Tally<TimeoutMsg>,
new_view_message: Tally<NewViewMsg>,
config: EventBuilderSettings,
}
@ -53,56 +54,30 @@ impl EventBuilder {
let msg_view = msg.vote.view;
let block_id = msg.vote.block;
let qc = msg.qc.clone().expect("empty QC from vote message")?;
let entries = self
.vote_message
.entry(msg_view)
.and_modify(|entry| entry.push(msg))
.or_default()
.len();
if entries >= self.config.votes_threshold {
let entry = self.vote_message.remove(&msg_view).unwrap();
if let Some(votes) = self.vote_message.tally(msg_view, msg) {
events.push(Event::Approve {
qc,
block: self
.blocks
.get(&block_id)
.expect(format!("cannot find block id {:?}", block_id).as_str())?,
votes: entry.into_iter().collect(),
votes: votes.into_iter().collect(),
})
}
}
CarnotMessage::Timeout(msg) => {
let msg_view = msg.vote.view;
let entires = self
.timeout_message
.entry(msg.vote.view)
.and_modify(|entry| entry.push(msg))
.or_default()
.len();
if entires >= self.config.timeout_threshold {
let entry = self.timeout_message.remove(&msg_view).unwrap();
events.push(Event::RootTimeout {
timeouts: entry.into_iter().map(|msg| msg.vote).collect(),
})
if let Some(timeouts) = self.timeout_message.tally(msg_view, msg) {
events.push(Event::RootTimeout { timeouts })
}
}
CarnotMessage::NewView(msg) => {
let msg_view = msg.vote.view;
let timeout_qc = msg.vote.timeout_qc.clone();
let entries = self
.new_view_message
.entry(msg.view)
.and_modify(|entry| entry.push(msg))
.or_default()
.len();
if entries >= self.config.votes_threshold {
let entry = self.new_view_message.remove(&msg_view).unwrap();
if let Some(new_views) = self.new_view_message.tally(msg_view, msg) {
events.push(Event::NewView {
new_views,
timeout_qc,
new_views: entry.into_iter().map(|msg| msg.vote).collect(),
})
}
}
@ -112,3 +87,38 @@ impl EventBuilder {
events
}
}
struct Tally<T> {
cache: HashMap<View, Vec<T>>,
threshold: usize,
}
impl<T> Default for Tally<T> {
fn default() -> Self {
Self::new(0)
}
}
impl<T> Tally<T> {
fn new(threshold: usize) -> Self {
Self {
cache: Default::default(),
threshold,
}
}
fn tally(&mut self, view: View, message: T) -> Option<Vec<T>> {
let entries = self
.cache
.entry(view)
.and_modify(|entry| entry.push(message))
.or_default()
.len();
if entries >= self.threshold {
Some(self.cache.remove(&view).unwrap())
} else {
None
}
}
}