finish event builder

This commit is contained in:
al8n 2023-05-16 16:31:00 +08:00
parent 45cb4ebaec
commit 2bfb7acf80
4 changed files with 101 additions and 25 deletions

View File

@ -12,12 +12,20 @@ use serde::{Deserialize, Serialize};
pub type TxHash = [u8; 32];
/// A block
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct Block<TxId: Clone + Eq + Hash> {
header: consensus_engine::Block,
transactions: IndexSet<TxId>,
}
impl<TxId: Clone + Eq + Hash> core::hash::Hash for Block<TxId> {
fn hash<H: core::hash::Hasher>(&self, state: &mut H) {
self.header.hash(state);
self.transactions.len().hash(state);
self.transactions.iter().for_each(|tx| tx.hash(state));
}
}
/// Identifier of a block
pub type BlockId = [u8; 32];

View File

@ -1,63 +1,125 @@
use crate::node::carnot::messages::CarnotMessage;
use nomos_consensus::network::messages::VoteMsg;
use nomos_consensus::Event;
use nomos_consensus::network::messages::{NewViewMsg, TimeoutMsg, VoteMsg};
use nomos_consensus::Event::TimeoutQc;
use nomos_core::block::Block;
use nomos_consensus::{Event, NodeId};
use nomos_core::block::{Block, BlockId};
use polars::chunked_array::object::hashbrown::HashSet;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
pub type CarnotTx = [u8; 32];
#[derive(Default)]
pub struct EventBuilderConfiguration {
#[derive(thiserror::Error, Debug)]
pub enum EventBuilderError {
#[error("block cannot be found: {0:?}")]
BlockNotFound(BlockId),
#[error("voter {voter:?} on view {view} has empty qc")]
EmptyQc { voter: NodeId, view: i64 },
}
#[derive(Default, Copy, Clone, Serialize, Deserialize)]
pub struct EventBuilderSettings {
pub votes_threshold: usize,
pub timeout_threshold: usize,
}
pub struct EventBuilder {
blocks: HashMap<BlockId, Block<CarnotTx>>,
vote_message: HashMap<View, Vec<VoteMsg>>,
config: EventBuilderConfiguration,
timeout_message: HashMap<View, Vec<TimeoutMsg>>,
new_view_message: HashMap<View, Vec<NewViewMsg>>,
config: EventBuilderSettings,
}
impl EventBuilder {
pub fn new() -> Self {
Self {
vote_message: HashMap::new(),
timeout_message: HashMap::new(),
config: Default::default(),
blocks: Default::default(),
new_view_message: Default::default(),
}
}
pub fn step(&mut self, messages: &[CarnotMessage]) -> Vec<Event<CarnotTx>> {
pub fn step(&mut self, messages: Vec<CarnotMessage>) -> Result<Vec<Event<CarnotTx>>> {
let mut events = Vec::new();
for message in messages {
match message {
CarnotMessage::Proposal(msg) => {
let block = Block::from_bytes(&msg.chunk);
self.blocks.insert(block.header().id, block.clone());
events.push(Event::Proposal {
block: Block::from_bytes(&msg.chunk),
block,
stream: Box::pin(futures::stream::empty()),
});
}
CarnotMessage::TimeoutQc(msg) => {
events.push(Event::TimeoutQc {
timeout_qc: msg.qc.clone(),
});
events.push(Event::TimeoutQc { timeout_qc: msg.qc });
}
CarnotMessage::Vote(msg) => {
let msg_view = msg.vote.view;
let entry = self.vote_message.entry(msg_view).or_default();
entry.push(msg.clone());
if entry.len() >= self.config.votes_threshold {
let block_id = msg.vote.block;
let qc = msg.qc.clone().ok_or(EventBuilderError::EmptyQc {
voter: msg.voter,
view: msg_view,
})?;
let entries = self
.vote_message
.entry(msg_view)
.and_modify(|entry| entry.push(msg))
.or_default()
.len();
if entries >= self.config.votes_threshold {
let entry = self.vote_message.remove(&msg_view).unwrap();
events.push(Event::Approve {
qc: msg.qc.cloned().unwrap(),
block: msg.vote.block,
votes: entry.iter().cloned().collect(),
qc,
block: self
.blocks
.get(&block_id)
.ok_or(EventBuilderError::BlockNotFound(block_id))?,
votes: entry.into_iter().collect(),
})
}
}
CarnotMessage::Timeout(msg) => {
let msg_view = msg.vote.view;
let entires = self
.timeout_message
.entry(msg.vote.view)
.and_modify(|entry| entry.push(msg))
.or_default()
.len();
if entires >= self.config.timeout_threshold {
let entry = self.timeout_message.remove(&msg_view).unwrap();
events.push(Event::RootTimeout {
timeouts: entry.into_iter().map(|msg| msg.vote).collect(),
})
}
}
CarnotMessage::NewView(msg) => {
let msg_view = msg.vote.view;
let timeout_qc = msg.vote.timeout_qc.clone();
let entries = self
.new_view_message
.entry(msg.view)
.and_modify(|entry| entry.push(msg))
.or_default()
.len();
if entries >= self.config.votes_threshold {
let entry = self.new_view_message.remove(&msg_view).unwrap();
events.push(Event::NewView {
timeout_qc,
new_views: entry.into_iter().map(|msg| msg.vote).collect(),
})
}
self.vote_message.remove(&msg_view);
}
CarnotMessage::Timeout(_) => {}
CarnotMessage::NewView(_) => {}
}
}
events
Ok(events)
}
}

View File

@ -1,9 +1,11 @@
use nomos_consensus::network::messages::{NewViewMsg, ProposalChunkMsg, TimeoutQcMsg, VoteMsg};
use nomos_consensus::network::messages::{
NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg,
};
pub(crate) enum CarnotMessage {
Proposal(ProposalChunkMsg),
Vote(VoteMsg),
TimeoutQc(TimeoutQcMsg),
Timeout(TimeoutQcMsg),
Timeout(TimeoutMsg),
NewView(NewViewMsg),
}

View File

@ -3,7 +3,9 @@ mod messages;
// std
// crates
use self::event_builder::EventBuilderSettings;
use serde::{Deserialize, Serialize};
// internal
use super::{Node, NodeId};
@ -11,7 +13,9 @@ use super::{Node, NodeId};
pub struct CarnotState {}
#[derive(Clone, Default, Deserialize)]
pub struct CarnotSettings {}
pub struct CarnotSettings {
pub event_builder_settings: EventBuilderSettings,
}
#[allow(dead_code)] // TODO: remove when handling settings
pub struct CarnotNode {