From bbb783e1da1b49957d6fbf9154c2595a5f7464cf Mon Sep 17 00:00:00 2001 From: Giacomo Pasini Date: Wed, 8 Feb 2023 10:23:55 +0100 Subject: [PATCH] Complete consensus (#65) * complete consensus * review comments --- nomos-core/src/block.rs | 9 +- nomos-services/consensus/src/leadership.rs | 1 + nomos-services/consensus/src/lib.rs | 184 +++++++++++------- .../consensus/src/overlay/committees.rs | 4 +- nomos-services/consensus/src/overlay/flat.rs | 7 +- nomos-services/consensus/src/overlay/mod.rs | 3 +- nomos-services/consensus/src/test.rs | 103 ++++++++++ 7 files changed, 235 insertions(+), 76 deletions(-) create mode 100644 nomos-services/consensus/src/test.rs diff --git a/nomos-core/src/block.rs b/nomos-core/src/block.rs index 77d2f143..03f4144d 100644 --- a/nomos-core/src/block.rs +++ b/nomos-core/src/block.rs @@ -12,8 +12,7 @@ pub struct Block; pub struct BlockHeader; /// Identifier of a block -#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)] -pub struct BlockId; +pub type BlockId = [u8; 32]; impl Block { /// Encode block into bytes @@ -24,10 +23,14 @@ impl Block { pub fn from_bytes(_: Bytes) -> Self { Self } + + pub fn header(&self) -> BlockHeader { + BlockHeader + } } impl BlockHeader { pub fn id(&self) -> BlockId { - BlockId + todo!() } } diff --git a/nomos-services/consensus/src/leadership.rs b/nomos-services/consensus/src/leadership.rs index c8e30789..eb64a947 100644 --- a/nomos-services/consensus/src/leadership.rs +++ b/nomos-services/consensus/src/leadership.rs @@ -40,6 +40,7 @@ impl Leadership { &self, view: &'view View, tip: &Tip, + qc: Approval, ) -> LeadershipResult<'view> { let ancestor_hint = todo!("get the ancestor from the tip"); if view.is_leader(self.key.key) { diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 4d904bac..12d22f2b 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -7,6 +7,8 @@ mod leadership; mod network; pub mod overlay; +#[cfg(test)] +mod test; mod tip; // std @@ -111,8 +113,6 @@ where } async fn run(mut self) -> Result<(), overwatch_rs::DynError> { - let mut view_generator = self.view_generator().await; - let network_relay: OutboundRelay<_> = self .network_relay .connect() @@ -137,13 +137,18 @@ where let fountain = F::new(fountain_settings); let leadership = Leadership::new(private_key, mempool_relay); + // FIXME: this should be taken from config + let mut cur_view = View { + seed: [0; 32], + staking_keys: BTreeMap::new(), + view_n: 0, + }; loop { - let view = view_generator.next().await; // if we want to process multiple views at the same time this can // be spawned as a separate future // FIXME: this should probably have a timer to detect failed rounds - let res = view + let res = cur_view .resolve::, _, _, _>( private_key, &tip, @@ -153,10 +158,11 @@ where ) .await; match res { - Ok(_block) => { + Ok((_block, view)) => { // resolved block, mark as verified and possibly update the tip // not sure what mark as verified means, e.g. if we want an event subscription // system for this to be used for example by the ledger, storage and mempool + cur_view = view; } Err(e) => { tracing::error!("Error while resolving view: {}", e); @@ -166,35 +172,6 @@ where } } -impl CarnotConsensus -where - F: FountainCode + Send + Sync + 'static, - A: NetworkAdapter + Send + Sync + 'static, - P: MemPool + Send + Sync + 'static, - P::Settings: Clone + Send + Sync + 'static, - P::Tx: Debug + Send + Sync + 'static, - P::Id: Debug + Send + Sync + 'static, - M: MempoolAdapter + Send + Sync + 'static, -{ - // Build a service that generates new views as they become available - async fn view_generator(&self) -> ViewGenerator { - todo!() - } -} - -/// Tracks new views and make them available as soon as they are available -/// -/// A new view is normally generated as soon a a block is approved, but -/// additional logic is needed in failure cases, like when no new block is -/// approved for a long enough period of time -struct ViewGenerator; - -impl ViewGenerator { - async fn next(&mut self) -> View { - todo!() - } -} - #[derive(Hash, Eq, PartialEq)] pub struct Approval; @@ -203,67 +180,136 @@ pub struct Approval; pub struct View { seed: Seed, staking_keys: BTreeMap, - _view_n: u64, + pub view_n: u64, } impl View { // TODO: might want to encode steps in the type system - async fn resolve<'view, A, O, F, Tx, Id>( + pub async fn resolve<'view, A, O, F, Tx, Id>( &'view self, node_id: NodeId, tip: &Tip, adapter: &A, fountain: &F, leadership: &Leadership, - ) -> Result> + ) -> Result<(Block, View), Box> + where + A: NetworkAdapter + Send + Sync + 'static, + F: FountainCode, + O: Overlay<'view, A, F>, + { + let res = if self.is_leader(node_id) { + let block = self + .resolve_leader::(node_id, tip, adapter, fountain, leadership) + .await + .unwrap(); // FIXME: handle sad path + let next_view = self.generate_next_view(&block); + (block, next_view) + } else { + self.resolve_non_leader::(node_id, adapter, fountain) + .await + .unwrap() // FIXME: handle sad path + }; + + // Commit phase: + // Upon verifing a block B, if B.parent = B' and B'.parent = B'' and + // B'.view = B''.view + 1, then the node commits B''. + // This happens implicitly at the chain level and does not require any + // explicit action from the node. + + Ok(res) + } + + async fn resolve_leader<'view, A, O, F, Tx, Id>( + &'view self, + node_id: NodeId, + tip: &Tip, + adapter: &A, + fountain: &F, + leadership: &Leadership, + ) -> Result where A: NetworkAdapter + Send + Sync + 'static, F: FountainCode, O: Overlay<'view, A, F>, { let overlay = O::new(self, node_id); - // FIXME: this is still a working in progress and best-of-my-understanding - // of the consensus protocol, having pseudocode would be very helpful - // Consensus in Carnot is achieved in 4 steps from the point of view of a node: - // 1) The node receives a block proposal from a leader and verifies it - // 2, The node signals to the network its approval for the block. - // Depending on the overlay, this may require waiting for a certain number - // of other approvals. - // 3) The node waits for consensus to be reached and mark the block as verified - // 4) Upon verifing a block B, if B.parent = B' and B'.parent = B'' and - // B'.view = B''.view + 1, then the node commits B''. - // This happens implicitly at the chain level and does not require any - // explicit action from the node. + // We need to build the QC for the block we are proposing + let qc = overlay.build_qc(adapter).await; - // 1) Collect and verify block proposal. - // If this node is the leader this is trivial - let block = if let LeadershipResult::Leader { block, .. } = - leadership.try_propose_block(self, tip).await - { - block - } else { - overlay - .reconstruct_proposal_block(adapter, fountain) - .await? - // TODO: reshare the block? - // TODO: verify - }; + let LeadershipResult::Leader { block, _view } = leadership + .try_propose_block(self, tip, qc) + .await else { panic!("we are leader")}; - // 2) Signal approval to the network - overlay.approve_and_forward(&block, adapter).await?; - - // 3) Wait for consensus - overlay.wait_for_consensus(&block, adapter).await; + overlay + .broadcast_block(block.clone(), adapter, fountain) + .await; Ok(block) } + async fn resolve_non_leader<'view, A, O, F>( + &'view self, + node_id: NodeId, + adapter: &A, + fountain: &F, + ) -> Result<(Block, View), ()> + where + A: NetworkAdapter + Send + Sync + 'static, + F: FountainCode, + O: Overlay<'view, A, F>, + { + let overlay = O::new(self, node_id); + // Consensus in Carnot is achieved in 2 steps from the point of view of a node: + // 1) The node receives a block proposal from a leader and verifies it + // 2) The node signals to the network its approval for the block. + // Depending on the overlay, this may require waiting for a certain number + // of other approvals. + + // 1) Collect and verify block proposal. + let block = overlay + .reconstruct_proposal_block(adapter, fountain) + .await + .unwrap(); // FIXME: handle sad path + + // TODO: verify + // TODO: reshare the block? + let next_view = self.generate_next_view(&block); + + // 2) Signal approval to the network + // We only consider the happy path for now + if self.pipelined_safe_block(&block) { + overlay + .approve_and_forward(&block, adapter, &next_view) + .await + .unwrap(); // FIXME: handle sad path + } + + Ok((block, next_view)) + } + pub fn is_leader(&self, _node_id: NodeId) -> bool { - todo!() + false } pub fn id(&self) -> u64 { - self._view_n + self.view_n + } + + // Verifies the block is new and the previous leader did not fail + fn pipelined_safe_block(&self, _: &Block) -> bool { + // return b.view_n >= self.view_n && b.view_n == b.qc.view_n + true + } + + fn generate_next_view(&self, _b: &Block) -> View { + let mut seed = self.seed; + seed[0] += 1; + View { + seed, + staking_keys: self.staking_keys.clone(), + view_n: self.view_n + 1, + } } } diff --git a/nomos-services/consensus/src/overlay/committees.rs b/nomos-services/consensus/src/overlay/committees.rs index 9e576f68..851855d2 100644 --- a/nomos-services/consensus/src/overlay/committees.rs +++ b/nomos-services/consensus/src/overlay/committees.rs @@ -154,6 +154,7 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const &self, _block: &Block, _adapter: &Network, + _next_view: &View, ) -> Result<(), Box> { // roughly, we want to do something like this: // 1. wait for left and right children committees to approve @@ -165,7 +166,8 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const todo!() } - async fn wait_for_consensus(&self, _approval: &Block, _adapter: &Network) { + async fn build_qc(&self, _adapter: &Network) -> Approval { // maybe the leader publishing the QC? + todo!() } } diff --git a/nomos-services/consensus/src/overlay/flat.rs b/nomos-services/consensus/src/overlay/flat.rs index e727f9c5..15563a09 100644 --- a/nomos-services/consensus/src/overlay/flat.rs +++ b/nomos-services/consensus/src/overlay/flat.rs @@ -87,6 +87,7 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync> &self, block: &Block, adapter: &Network, + _next_view: &View, ) -> Result<(), Box> { // in the flat overlay, there's no need to wait for anyone before approving the block let approval = self.approve(block); @@ -103,7 +104,7 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync> Ok(()) } - async fn wait_for_consensus(&self, _approval: &Block, adapter: &Network) { + async fn build_qc(&self, adapter: &Network) -> Approval { // for now, let's pretend that consensus is reached as soon as the // block is approved by a share of the nodes let mut approvals = HashSet::new(); @@ -121,8 +122,10 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync> / self.threshold.den; if approvals.len() as u64 >= threshold { // consensus reached - break; + // FIXME: build a real QC + return Approval; } } + unimplemented!("consensus not reached") } } diff --git a/nomos-services/consensus/src/overlay/mod.rs b/nomos-services/consensus/src/overlay/mod.rs index 037c159d..5ec2228d 100644 --- a/nomos-services/consensus/src/overlay/mod.rs +++ b/nomos-services/consensus/src/overlay/mod.rs @@ -29,7 +29,8 @@ pub trait Overlay<'view, Network: NetworkAdapter, Fountain: FountainCode> { &self, block: &Block, adapter: &Network, + next_view: &View, ) -> Result<(), Box>; /// Wait for consensus on a block - async fn wait_for_consensus(&self, block: &Block, adapter: &Network); + async fn build_qc(&self, adapter: &Network) -> Approval; } diff --git a/nomos-services/consensus/src/test.rs b/nomos-services/consensus/src/test.rs new file mode 100644 index 00000000..af59c714 --- /dev/null +++ b/nomos-services/consensus/src/test.rs @@ -0,0 +1,103 @@ +use crate::network::messages::*; +use crate::overlay::committees::*; +use crate::overlay::*; +use crate::*; +use async_trait::async_trait; +use bytes::Bytes; +use futures::Stream; +use nomos_core::block::*; +use nomos_core::fountain::{mock::MockFountain, FountainCode, FountainError}; +use nomos_network::backends::NetworkBackend; +use nomos_network::NetworkService; +use overwatch_rs::services::relay::*; +use tokio::sync::broadcast::Receiver; + +struct DummyOverlay; +struct DummyAdapter; +struct DummyBackend; + +#[async_trait] +impl<'view, N: NetworkAdapter + Sync, F: FountainCode + Sync> Overlay<'view, N, F> + for DummyOverlay +{ + fn new(_: &View, _: NodeId) -> Self { + DummyOverlay + } + + async fn build_qc(&self, _: &N) -> Approval { + Approval + } + + async fn broadcast_block(&self, _: Block, _: &N, _: &F) {} + + async fn reconstruct_proposal_block(&self, _: &N, _: &F) -> Result { + Ok(Block) + } + + async fn approve_and_forward(&self, _: &Block, _: &N, _: &View) -> Result<(), Box> { + Ok(()) + } +} + +#[async_trait] +impl NetworkAdapter for DummyAdapter { + type Backend = DummyBackend; + async fn new( + _: OutboundRelay< as ServiceData>::Message>, + ) -> Self { + DummyAdapter + } + async fn proposal_chunks_stream( + &self, + _: Committee, + _: &View, + ) -> Box + Send + Sync + Unpin> { + unimplemented!() + } + async fn broadcast_block_chunk(&self, _: Committee, _: &View, _: ProposalChunkMsg) { + unimplemented!() + } + async fn approvals_stream( + &self, + _: Committee, + _: &View, + ) -> Box + Send> { + unimplemented!() + } + async fn forward_approval(&self, _: Committee, _: &View, _: ApprovalMsg) {} +} + +#[async_trait] +impl NetworkBackend for DummyBackend { + type Settings = (); + type State = NoState<()>; + type Message = (); + type EventKind = (); + type NetworkEvent = (); + + fn new(_config: Self::Settings) -> Self { + Self + } + async fn process(&self, _: Self::Message) {} + async fn subscribe(&mut self, _: Self::EventKind) -> Receiver { + unimplemented!() + } +} + +#[tokio::test] +async fn test_single_round_non_leader() { + let view = View { + seed: [0; 32], + staking_keys: BTreeMap::new(), + view_n: 0, + }; + let (_, next_view) = view + .resolve_non_leader::( + [0; 32], + &DummyAdapter, + &MockFountain, + ) + .await + .unwrap(); + assert!(next_view.view_n == 1); +}