diff --git a/nomos-core/src/block.rs b/nomos-core/src/block.rs index 065bc39e..7514707d 100644 --- a/nomos-core/src/block.rs +++ b/nomos-core/src/block.rs @@ -10,25 +10,29 @@ pub type TxHash = [u8; 32]; /// A block #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Block { - header: BlockHeader, +pub struct Block { + header: BlockHeader, transactions: IndexSet, } /// A block header #[derive(Copy, Clone, Default, Debug, Serialize, Deserialize)] -pub struct BlockHeader { +pub struct BlockHeader { id: BlockId, + qc: Qc, } /// Identifier of a block pub type BlockId = [u8; 32]; -impl Block { - pub fn new(header: BlockHeader, txs: impl Iterator) -> Self { +impl Block { + pub fn new(qc: Qc, txs: impl Iterator) -> Self { + let transactions = txs.collect(); + // FIXME: Calculate header Id + let header = BlockHeader { id: [0; 32], qc }; Self { header, - transactions: txs.collect(), + transactions, } } @@ -37,8 +41,8 @@ impl Block { Bytes::new() } - pub fn header(&self) -> BlockHeader { - self.header + pub fn header(&self) -> BlockHeader { + self.header.clone() } pub fn transactions(&self) -> impl Iterator + '_ { @@ -46,8 +50,12 @@ impl Block { } } -impl BlockHeader { +impl BlockHeader { pub fn id(&self) -> BlockId { self.id } + + pub fn qc(&self) -> &Qc { + &self.qc + } } diff --git a/nomos-core/src/vote/carnot.rs b/nomos-core/src/vote/carnot.rs index cebe8d6f..be4a336d 100644 --- a/nomos-core/src/vote/carnot.rs +++ b/nomos-core/src/vote/carnot.rs @@ -71,7 +71,8 @@ pub struct CarnotTally { #[async_trait::async_trait] impl Tally for CarnotTally { type Vote = Vote; - type Outcome = QuorumCertificate; + type Qc = QuorumCertificate; + type Outcome = (); type TallyError = CarnotTallyError; type Settings = CarnotTallySettings; @@ -83,7 +84,7 @@ impl Tally for CarnotTally { &self, view: u64, mut vote_stream: S, - ) -> Result { + ) -> Result<(Self::Qc, Self::Outcome), Self::TallyError> { let mut approved = 0usize; let mut seen = HashSet::new(); while let Some(vote) = vote_stream.next().await { @@ -106,10 +107,13 @@ impl Tally for CarnotTally { seen.insert(vote.voter); approved += 1; if approved >= self.settings.threshold { - return Ok(QuorumCertificate::Simple(SimpleQuorumCertificate { - view, - block: vote.block, - })); + return Ok(( + QuorumCertificate::Simple(SimpleQuorumCertificate { + view, + block: vote.block, + }), + (), + )); } } Err(CarnotTallyError::InsufficientVotes) diff --git a/nomos-core/src/vote/mock.rs b/nomos-core/src/vote/mock.rs index 3491edf9..68d1c328 100644 --- a/nomos-core/src/vote/mock.rs +++ b/nomos-core/src/vote/mock.rs @@ -46,7 +46,8 @@ impl MockQc { #[async_trait::async_trait] impl Tally for MockTally { type Vote = MockVote; - type Outcome = MockQc; + type Qc = MockQc; + type Outcome = (); type TallyError = Error; type Settings = MockTallySettings; @@ -59,7 +60,7 @@ impl Tally for MockTally { &self, view: u64, mut vote_stream: S, - ) -> Result { + ) -> Result<(Self::Qc, Self::Outcome), Self::TallyError> { let mut count_votes = 0; while let Some(vote) = vote_stream.next().await { if vote.view() != view { @@ -67,7 +68,7 @@ impl Tally for MockTally { } count_votes += 1; if count_votes > self.threshold { - return Ok(MockQc { count_votes }); + return Ok((MockQc { count_votes }, ())); } } Err(Error("Not enough votes".into())) diff --git a/nomos-core/src/vote/mod.rs b/nomos-core/src/vote/mod.rs index a8190e18..cb0e2b58 100644 --- a/nomos-core/src/vote/mod.rs +++ b/nomos-core/src/vote/mod.rs @@ -6,6 +6,7 @@ use futures::Stream; #[async_trait::async_trait] pub trait Tally { type Vote; + type Qc; type Outcome; type TallyError; type Settings: Clone; @@ -14,5 +15,5 @@ pub trait Tally { &self, view: u64, vote_stream: S, - ) -> Result; + ) -> Result<(Self::Qc, Self::Outcome), Self::TallyError>; } diff --git a/nomos-services/consensus/src/leadership.rs b/nomos-services/consensus/src/leadership.rs index 5a8f0dcf..b64e0bad 100644 --- a/nomos-services/consensus/src/leadership.rs +++ b/nomos-services/consensus/src/leadership.rs @@ -2,8 +2,8 @@ use std::marker::PhantomData; // crates // internal +use nomos_core::crypto::PrivateKey; use nomos_core::tx::Transaction; -use nomos_core::{block::BlockHeader, crypto::PrivateKey}; use nomos_mempool::MempoolMsg; use super::*; @@ -18,9 +18,9 @@ pub struct Leadership { mempool: OutboundRelay>, } -pub enum LeadershipResult<'view, TxId: Clone + Eq + core::hash::Hash> { +pub enum LeadershipResult<'view, Qc: Clone, TxId: Clone + Eq + core::hash::Hash> { Leader { - block: Block, + block: Block, _view: PhantomData<&'view u8>, }, NotLeader { @@ -41,12 +41,12 @@ where } #[allow(unused, clippy::diverging_sub_expression)] - pub async fn try_propose_block<'view, Qc>( + pub async fn try_propose_block<'view, Qc: Clone>( &self, view: &'view View, tip: &Tip, qc: Qc, - ) -> LeadershipResult<'view, Tx::Hash> { + ) -> LeadershipResult<'view, Qc, Tx::Hash> { // TODO: get the correct ancestor for the tip // let ancestor_hint = todo!("get the ancestor from the tip"); let ancestor_hint = [0; 32]; @@ -60,10 +60,7 @@ where LeadershipResult::Leader { _view: PhantomData, - block: Block::new( - BlockHeader::default(), - iter.map(|ref tx| ::hash(tx)), - ), + block: Block::new(qc, iter.map(|ref tx| ::hash(tx))), } } else { LeadershipResult::NotLeader { _view: PhantomData } diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 5a7e173b..80eb1358 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -80,6 +80,7 @@ where M: MempoolAdapter, P: MemPool, T: Tally, + T::Qc: Clone, O: Overlay::Hash>, P::Tx: Transaction + Debug + 'static, ::Hash: Debug, @@ -101,6 +102,7 @@ where A: NetworkAdapter, P: MemPool, T: Tally, + T::Qc: Clone, P::Tx: Transaction + Debug, ::Hash: Debug, M: MempoolAdapter, @@ -122,6 +124,7 @@ where T: Tally + Send + Sync + 'static, T::Settings: Send + Sync + 'static, T::Outcome: Send + Sync, + T::Qc: Clone + Send + Sync, P::Settings: Send + Sync + 'static, P::Tx: Debug + Clone + serde::de::DeserializeOwned + Send + Sync + 'static, ::Hash: Debug + Send + Sync, @@ -198,7 +201,7 @@ where mempool_relay .send(nomos_mempool::MempoolMsg::MarkInBlock { ids: block.transactions().cloned().collect(), - block: block.header(), + block: block.header().id(), }) .await .map_err(|(e, _)| { @@ -237,7 +240,7 @@ impl View { fountain: &F, tally: &T, leadership: &Leadership, - ) -> Result<(Block, View), Box> + ) -> Result<(Block, View), Box> where A: NetworkAdapter + Send + Sync + 'static, F: FountainCode, @@ -245,6 +248,7 @@ impl View { Tx::Hash: Debug, T: Tally + Send + Sync + 'static, T::Outcome: Send + Sync, + T::Qc: Clone, O: Overlay, { let res = if self.is_leader(node_id) { @@ -277,12 +281,13 @@ impl View { fountain: &F, tally: &T, leadership: &Leadership, - ) -> Result::Hash>, ()> + ) -> Result::Hash>, ()> where A: NetworkAdapter + Send + Sync + 'static, F: FountainCode, T: Tally + Send + Sync + 'static, T::Outcome: Send + Sync, + T::Qc: Clone, Tx: Transaction, Tx::Hash: Debug, O: Overlay, @@ -309,11 +314,12 @@ impl View { adapter: &A, fountain: &F, tally: &T, - ) -> Result<(Block, View), ()> + ) -> Result<(Block, View), ()> where A: NetworkAdapter + Send + Sync + 'static, F: FountainCode, T: Tally + Send + Sync + 'static, + T::Qc: Clone, Tx: Transaction, Tx::Hash: Debug, O: Overlay, @@ -356,12 +362,15 @@ impl View { } // Verifies the block is new and the previous leader did not fail - fn pipelined_safe_block(&self, _: &Block) -> bool { + fn pipelined_safe_block( + &self, + _: &Block, + ) -> bool { // return b.view_n >= self.view_n && b.view_n == b.qc.view_n true } - fn generate_next_view(&self, _b: &Block) -> View { + fn generate_next_view(&self, _b: &Block) -> View { let mut seed = self.seed; seed[0] += 1; View { diff --git a/nomos-services/consensus/src/overlay/committees.rs b/nomos-services/consensus/src/overlay/committees.rs index 23f657c8..4c25b624 100644 --- a/nomos-services/consensus/src/overlay/committees.rs +++ b/nomos-services/consensus/src/overlay/committees.rs @@ -117,6 +117,7 @@ where Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, VoteTally: Tally + Sync, + VoteTally::Qc: serde::de::DeserializeOwned + Clone + Send + Sync + 'static, TxId: serde::de::DeserializeOwned + Clone + Hash + Eq + Send + Sync + 'static, { // we still need view here to help us initialize @@ -130,13 +131,13 @@ where view: &View, adapter: &Network, fountain: &Fountain, - ) -> Result, FountainError> { + ) -> Result, FountainError> { assert_eq!(view.view_n, self.view_n, "view_n mismatch"); let committee = self.committee; let message_stream = adapter.proposal_chunks_stream(committee, view).await; fountain.decode(message_stream).await.and_then(|b| { deserializer(&b) - .deserialize::>() + .deserialize::>() .map_err(|e| FountainError::from(e.to_string().as_str())) }) } @@ -144,7 +145,7 @@ where async fn broadcast_block( &self, view: &View, - block: Block, + block: Block, adapter: &Network, fountain: &Fountain, ) { @@ -173,7 +174,7 @@ where async fn approve_and_forward( &self, view: &View, - _block: &Block, + _block: &Block, _adapter: &Network, _tally: &VoteTally, _next_view: &View, @@ -189,12 +190,7 @@ where todo!() } - async fn build_qc( - &self, - view: &View, - _adapter: &Network, - _tally: &VoteTally, - ) -> VoteTally::Outcome { + async fn build_qc(&self, view: &View, _adapter: &Network, _tally: &VoteTally) -> VoteTally::Qc { assert_eq!(view.view_n, self.view_n, "view_n mismatch"); // maybe the leader publishing the QC? todo!() diff --git a/nomos-services/consensus/src/overlay/flat.rs b/nomos-services/consensus/src/overlay/flat.rs index 2b052a36..8e224ee5 100644 --- a/nomos-services/consensus/src/overlay/flat.rs +++ b/nomos-services/consensus/src/overlay/flat.rs @@ -18,36 +18,32 @@ const FLAT_COMMITTEE: Committee = Committee::root(); /// As far as the API is concerned, this should be equivalent to any other /// overlay and far simpler to implement. /// For this reason, this might act as a 'reference' overlay for testing. -pub struct Flat { +pub struct Flat { // TODO: this should be a const param, but we can't do that yet node_id: NodeId, view_n: u64, - _marker: std::marker::PhantomData, } -impl Flat { +impl Flat { pub fn new(view_n: u64, node_id: NodeId) -> Self { - Self { - node_id, - view_n, - _marker: Default::default(), - } + Self { node_id, view_n } } - fn approve(&self, _block: &Block) -> Approval { + fn approve(&self, _block: &Block) -> Approval { // we still need to define how votes look like Approval } } #[async_trait::async_trait] -impl Overlay for Flat +impl Overlay for Flat where TxId: serde::de::DeserializeOwned + Clone + Eq + Hash + Send + Sync + 'static, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, VoteTally: Tally + Sync, VoteTally::Vote: Serialize + DeserializeOwned + Send, + VoteTally::Qc: Clone + DeserializeOwned + Send + Sync + 'static, { fn new(view: &View, node: NodeId) -> Self { Flat::new(view.view_n, node) @@ -58,12 +54,12 @@ where view: &View, adapter: &Network, fountain: &Fountain, - ) -> Result, FountainError> { + ) -> Result, FountainError> { assert_eq!(view.view_n, self.view_n, "view_n mismatch"); let message_stream = adapter.proposal_chunks_stream(FLAT_COMMITTEE, view).await; fountain.decode(message_stream).await.and_then(|b| { deserializer(&b) - .deserialize::>() + .deserialize::>() .map_err(|e| FountainError::from(e.to_string().as_str())) }) } @@ -71,7 +67,7 @@ where async fn broadcast_block( &self, view: &View, - block: Block, + block: Block, adapter: &Network, fountain: &Fountain, ) { @@ -91,7 +87,7 @@ where async fn approve_and_forward( &self, view: &View, - block: &Block, + block: &Block, adapter: &Network, _tally: &VoteTally, _next_view: &View, @@ -112,12 +108,7 @@ where Ok(()) } - async fn build_qc( - &self, - view: &View, - adapter: &Network, - tally: &VoteTally, - ) -> VoteTally::Outcome { + async fn build_qc(&self, view: &View, adapter: &Network, tally: &VoteTally) -> VoteTally::Qc { assert_eq!(view.view_n, self.view_n, "view_n mismatch"); // for now, let's pretend that consensus is reached as soon as the @@ -126,7 +117,7 @@ where // Shadow the original binding so that it can't be directly accessed // ever again. - if let Ok(qc) = tally.tally(view.view_n, stream).await { + if let Ok((qc, _)) = tally.tally(view.view_n, stream).await { qc } else { unimplemented!("consensus not reached") diff --git a/nomos-services/consensus/src/overlay/mod.rs b/nomos-services/consensus/src/overlay/mod.rs index 90664d92..502ea4c5 100644 --- a/nomos-services/consensus/src/overlay/mod.rs +++ b/nomos-services/consensus/src/overlay/mod.rs @@ -20,7 +20,8 @@ pub trait Overlay< Fountain: FountainCode, VoteTally: Tally, TxId: Clone + Eq + Hash, -> +> where + VoteTally::Qc: Clone, { fn new(view: &View, node: NodeId) -> Self; @@ -29,11 +30,11 @@ pub trait Overlay< view: &View, adapter: &Network, fountain: &Fountain, - ) -> Result, FountainError>; + ) -> Result, FountainError>; async fn broadcast_block( &self, view: &View, - block: Block, + block: Block, adapter: &Network, fountain: &Fountain, ); @@ -43,7 +44,7 @@ pub trait Overlay< async fn approve_and_forward( &self, view: &View, - block: &Block, + block: &Block, adapter: &Network, vote_tally: &VoteTally, next_view: &View, @@ -54,5 +55,5 @@ pub trait Overlay< view: &View, adapter: &Network, vote_tally: &VoteTally, - ) -> VoteTally::Outcome; + ) -> VoteTally::Qc; } diff --git a/nomos-services/mempool/src/backend/mockpool.rs b/nomos-services/mempool/src/backend/mockpool.rs index df86d089..274a1c97 100644 --- a/nomos-services/mempool/src/backend/mockpool.rs +++ b/nomos-services/mempool/src/backend/mockpool.rs @@ -6,7 +6,7 @@ use std::{collections::BTreeMap, time::UNIX_EPOCH}; // crates // internal use crate::backend::{MemPool, MempoolError}; -use nomos_core::block::{BlockHeader, BlockId}; +use nomos_core::block::BlockId; use nomos_core::tx::Transaction; /// A mock mempool implementation that stores all transactions in memory in the order received. @@ -73,16 +73,16 @@ where Box::new(pending_txs.into_iter()) } - fn mark_in_block(&mut self, txs: &[::Hash], block: BlockHeader) { + fn mark_in_block(&mut self, txs: &[::Hash], block: BlockId) { let mut txs_in_block = Vec::with_capacity(txs.len()); for tx_id in txs.iter() { if let Some(tx) = self.pending_txs.remove(tx_id) { txs_in_block.push(tx); } } - let block_entry = self.in_block_txs.entry(block.id()).or_default(); + let block_entry = self.in_block_txs.entry(block).or_default(); self.in_block_txs_by_id - .extend(txs.iter().cloned().map(|tx| (tx, block.id()))); + .extend(txs.iter().cloned().map(|tx| (tx, block))); block_entry.append(&mut txs_in_block); } diff --git a/nomos-services/mempool/src/backend/mod.rs b/nomos-services/mempool/src/backend/mod.rs index ffe7af65..39fa4a0d 100644 --- a/nomos-services/mempool/src/backend/mod.rs +++ b/nomos-services/mempool/src/backend/mod.rs @@ -1,7 +1,7 @@ #[cfg(feature = "mock")] pub mod mockpool; -use nomos_core::block::{BlockHeader, BlockId}; +use nomos_core::block::BlockId; use nomos_core::tx::Transaction; #[derive(thiserror::Error, Debug)] @@ -30,7 +30,7 @@ pub trait MemPool { fn view(&self, ancestor_hint: BlockId) -> Box + Send>; /// Record that a set of transactions were included in a block - fn mark_in_block(&mut self, txs: &[::Hash], block: BlockHeader); + fn mark_in_block(&mut self, txs: &[::Hash], block: BlockId); /// Returns all of the transactions for the block #[cfg(test)] diff --git a/nomos-services/mempool/src/lib.rs b/nomos-services/mempool/src/lib.rs index a5275d99..c1dc65c8 100644 --- a/nomos-services/mempool/src/lib.rs +++ b/nomos-services/mempool/src/lib.rs @@ -10,7 +10,7 @@ use tokio::sync::oneshot::Sender; // internal use crate::network::NetworkAdapter; use backend::MemPool; -use nomos_core::block::{BlockHeader, BlockId}; +use nomos_core::block::BlockId; use nomos_core::tx::Transaction; use nomos_network::NetworkService; use overwatch_rs::services::{ @@ -57,7 +57,7 @@ pub enum MempoolMsg { }, MarkInBlock { ids: Vec, - block: BlockHeader, + block: BlockId, }, Metrics { reply_channel: Sender,