From f5175c74c0a63498a5f57d641d5ca37be54acd25 Mon Sep 17 00:00:00 2001 From: Giacomo Pasini Date: Fri, 27 Jan 2023 10:37:04 +0100 Subject: [PATCH] Add flat overlay (#55) --- nomos-services/consensus/Cargo.toml | 1 + nomos-services/consensus/src/lib.rs | 105 ++++++++-------- .../consensus/src/network/adapters/waku.rs | 2 +- nomos-services/consensus/src/network/mod.rs | 2 +- .../consensus/src/overlay/committees.rs | 25 ++-- nomos-services/consensus/src/overlay/flat.rs | 119 ++++++++++++++++++ nomos-services/consensus/src/overlay/mod.rs | 14 ++- 7 files changed, 202 insertions(+), 66 deletions(-) create mode 100644 nomos-services/consensus/src/overlay/flat.rs diff --git a/nomos-services/consensus/Cargo.toml b/nomos-services/consensus/Cargo.toml index f1c3e186..94f6cf19 100644 --- a/nomos-services/consensus/Cargo.toml +++ b/nomos-services/consensus/Cargo.toml @@ -19,6 +19,7 @@ tokio = { version = "1", features = ["sync"] } tokio-stream = "0.1" futures = "0.3" waku-bindings = { version = "0.1.0-beta2", optional = true} +tracing = "0.1" [features] default = [] diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 61374242..b75b3024 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -10,7 +10,8 @@ pub mod overlay; mod tip; // std -use std::collections::{BTreeMap, HashSet}; +use std::collections::BTreeMap; +use std::error::Error; use std::fmt::Debug; // crates // internal @@ -140,15 +141,27 @@ where let view = view_generator.next().await; // if we want to process multiple views at the same time this can // be spawned as a separate future - // TODO: add leadership module - view.resolve::, _, _, _>( - private_key, - &tip, - &network_adapter, - &fountain, - &leadership, - ) - .await; + + // FIXME: this should probably have a timer to detect failed rounds + let res = view + .resolve::, _, _, _>( + private_key, + &tip, + &network_adapter, + &fountain, + &leadership, + ) + .await; + match res { + Ok(_block) => { + // resolved block, mark as verified and possibly update the tip + // not sure what mark as verified means, e.g. if we want an event subscription + // system for this to be used for example by the ledger, storage and mempool + } + Err(e) => { + tracing::error!("Error while resolving view: {}", e); + } + } } } } @@ -194,8 +207,6 @@ pub struct View { } impl View { - const APPROVAL_THRESHOLD: usize = 1; - // TODO: might want to encode steps in the type system async fn resolve<'view, A, O, F, Tx, Id>( &'view self, @@ -204,60 +215,48 @@ impl View { adapter: &A, fountain: &F, leadership: &Leadership, - ) where + ) -> Result> + where A: NetworkAdapter + Send + Sync + 'static, F: FountainCode, O: Overlay<'view, A, F>, { let overlay = O::new(self, node_id); + // FIXME: this is still a working in progress and best-of-my-understanding + // of the consensus protocol, having pseudocode would be very helpful + // Consensus in Carnot is achieved in 4 steps from the point of view of a node: + // 1) The node receives a block proposal from a leader and verifies it + // 2, The node signals to the network its approval for the block. + // Depending on the overlay, this may require waiting for a certain number + // of other approvals. + // 3) The node waits for consensus to be reached and mark the block as verified + // 4) Upon verifing a block B, if B.parent = B' and B'.parent = B'' and + // B'.view = B''.view + 1, then the node commits B''. + // This happens implicitly at the chain level and does not require any + // explicit action from the node. + + // 1) Collect and verify block proposal. + // If this node is the leader this is trivial let block = if let LeadershipResult::Leader { block, .. } = leadership.try_propose_block(self, tip).await { - Ok(block) + block } else { - overlay.reconstruct_proposal_block(adapter, fountain).await + overlay + .reconstruct_proposal_block(adapter, fountain) + .await? + // TODO: reshare the block? + // TODO: verify }; - match block { - Ok(block) => { - // TODO: verify? - overlay - .broadcast_block(block.clone(), adapter, fountain) - .await; - self.approve(&overlay, block, adapter).await; - } - Err(_e) => { - // TODO: log error - } - } - } - async fn approve< - 'view, - Network: NetworkAdapter, - Fountain: FountainCode, - O: Overlay<'view, Network, Fountain>, - >( - &'view self, - overlay: &O, - block: Block, - adapter: &Network, - ) { - // wait for approval in the overlay, if necessary - let mut approvals = HashSet::new(); - let mut stream = overlay.collect_approvals(block, adapter).await; - while let Some(approval) = stream.recv().await { - approvals.insert(approval); - if approvals.len() > Self::APPROVAL_THRESHOLD { - let self_approval = self.craft_proof_of_approval(approvals.into_iter()); - overlay.forward_approval(self_approval, adapter).await; - return; - } - } - } + // 2) Signal approval to the network + overlay.approve_and_forward(&block, adapter).await?; - fn craft_proof_of_approval(&self, _approvals: impl Iterator) -> Approval { - todo!() + // 3) Wait for consensus + overlay.wait_for_consensus(&block, adapter).await; + + Ok(block) } pub fn is_leader(&self, _node_id: NodeId) -> bool { diff --git a/nomos-services/consensus/src/network/adapters/waku.rs b/nomos-services/consensus/src/network/adapters/waku.rs index 7244f363..a9849d85 100644 --- a/nomos-services/consensus/src/network/adapters/waku.rs +++ b/nomos-services/consensus/src/network/adapters/waku.rs @@ -110,7 +110,7 @@ impl NetworkAdapter for WakuAdapter { }; } - async fn approvals_stream(&self) -> Box> { + async fn approvals_stream(&self) -> Box + Send> { let stream_channel = self .message_subscriber_channel() .await diff --git a/nomos-services/consensus/src/network/mod.rs b/nomos-services/consensus/src/network/mod.rs index 68277ae5..684e0f13 100644 --- a/nomos-services/consensus/src/network/mod.rs +++ b/nomos-services/consensus/src/network/mod.rs @@ -21,6 +21,6 @@ pub trait NetworkAdapter { ) -> Self; async fn proposal_chunks_stream(&self) -> Box + Send + Sync + Unpin>; async fn broadcast_block_chunk(&self, view: &View, chunk_msg: ProposalChunkMsg); - async fn approvals_stream(&self) -> Box>; + async fn approvals_stream(&self) -> Box + Send>; async fn forward_approval(&self, approval: ApprovalMsg); } diff --git a/nomos-services/consensus/src/overlay/committees.rs b/nomos-services/consensus/src/overlay/committees.rs index 5acea30b..96b42511 100644 --- a/nomos-services/consensus/src/overlay/committees.rs +++ b/nomos-services/consensus/src/overlay/committees.rs @@ -89,8 +89,12 @@ impl<'view, const C: usize> Member<'view, C> { } // Return participants in the children committees this member should interact with - pub fn children_committes(&self) -> (Committee, Committee) { - self.committee.children() + pub fn children_committes(&self) -> (Option, Option) { + let (left, right) = self.committee.children(); + ( + self.committees.get_committee_members(left).map(|_| left), + self.committees.get_committee_members(right).map(|_| right), + ) } } @@ -125,15 +129,22 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const .await; } - async fn collect_approvals( + async fn approve_and_forward( &self, - _block: Block, + _block: &Block, adapter: &Network, - ) -> tokio::sync::mpsc::Receiver { + ) -> Result<(), Box> { + // roughly, we want to do something like this: + // 1. wait for left and right children committees to approve + // 2. approve the block + // 3. forward the approval to the parent committee + // + // However this will likely change depending on the position + // of the committee in the tree todo!() } - async fn forward_approval(&self, _approval: Approval, adapter: &Network) { - todo!() + async fn wait_for_consensus(&self, _approval: &Block, adapter: &Network) { + // maybe the leader publishing the QC? } } diff --git a/nomos-services/consensus/src/overlay/flat.rs b/nomos-services/consensus/src/overlay/flat.rs new file mode 100644 index 00000000..0e87d7a1 --- /dev/null +++ b/nomos-services/consensus/src/overlay/flat.rs @@ -0,0 +1,119 @@ +use std::collections::HashSet; +// std +use std::error::Error; +// crates +use futures::StreamExt; +// internal +use super::*; +use crate::network::messages::{ApprovalMsg, ProposalChunkMsg}; +use crate::network::NetworkAdapter; + +const DEFAULT_THRESHOLD: Threshold = Threshold::new(2, 3); + +/// The share of nodes that need to approve a block for it to be valid +/// expressed as a fraction of the total number of nodes +#[derive(Copy, Clone, Debug)] +pub struct Threshold { + num: u64, + den: u64, +} + +impl Threshold { + pub const fn new(num: u64, den: u64) -> Self { + Self { num, den } + } +} + +/// A flat overlay, everyone is in the same committee. +/// As far as the API is concerned, this should be equivalent to any other +/// overlay and far simpler to implement. +/// For this reason, this might act as a 'reference' overlay for testing. +pub struct Flat<'view> { + view: &'view View, + // TODO: this should be a const param, but we can't do that yet + threshold: Threshold, + node_id: NodeId, +} + +impl<'view> Flat<'view> { + pub fn new(view: &'view View, node_id: NodeId) -> Self { + Self { + view, + threshold: DEFAULT_THRESHOLD, + node_id, + } + } + + fn approve(&self, _block: &Block) -> Approval { + // we still need to define how votes look like + todo!() + } +} + +#[async_trait::async_trait] +impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync> + Overlay<'view, Network, Fountain> for Flat<'view> +{ + fn new(view: &'view View, node: NodeId) -> Self { + Flat::new(view, node) + } + + async fn reconstruct_proposal_block( + &self, + adapter: &Network, + fountain: &Fountain, + ) -> Result { + let message_stream = adapter.proposal_chunks_stream().await; + fountain.decode(message_stream).await.map(Block::from_bytes) + } + + async fn broadcast_block(&self, block: Block, adapter: &Network, fountain: &Fountain) { + let block_bytes = block.as_bytes(); + let encoded_stream = fountain.encode(&block_bytes); + encoded_stream + .for_each_concurrent(None, |chunk| async move { + let message = ProposalChunkMsg { chunk }; + adapter.broadcast_block_chunk(self.view, message).await; + }) + .await; + } + + async fn approve_and_forward( + &self, + block: &Block, + adapter: &Network, + ) -> Result<(), Box> { + // in the flat overlay, there's no need to wait for anyone before approving the block + let approval = self.approve(block); + adapter + .forward_approval(ApprovalMsg { + approval, + source: self.node_id, + }) + .await; + Ok(()) + } + + async fn wait_for_consensus(&self, _approval: &Block, adapter: &Network) { + // for now, let's pretend that consensus is reached as soon as the + // block is approved by a share of the nodes + let mut approvals = HashSet::new(); + let mut stream = Box::into_pin(adapter.approvals_stream().await); + + // Shadow the original binding so that it can't be directly accessed + // ever again. + while let Some(approval) = stream.next().await { + // insert the approval in the map to deduplicate + // TODO: validate approval + approvals.insert(approval); + // ceil(num/den * n) + let threshold = + (self.threshold.num * self.view.staking_keys.len() as u64 + self.threshold.den - 1) + / self.threshold.den; + if approvals.len() as u64 >= threshold { + // consensus reached + break; + } + } + } +} diff --git a/nomos-services/consensus/src/overlay/mod.rs b/nomos-services/consensus/src/overlay/mod.rs index e770a28d..dab6a871 100644 --- a/nomos-services/consensus/src/overlay/mod.rs +++ b/nomos-services/consensus/src/overlay/mod.rs @@ -1,7 +1,9 @@ #[allow(unused)] mod committees; +mod flat; // std +use std::error::Error; // crates // internal use super::{Approval, NodeId, View}; @@ -21,10 +23,14 @@ pub trait Overlay<'view, Network: NetworkAdapter, Fountain: FountainCode> { fountain: &Fountain, ) -> Result; async fn broadcast_block(&self, block: Block, adapter: &Network, fountain: &Fountain); - async fn collect_approvals( + /// Different overlays might have different needs or the same overlay might + /// require different steps depending on the node role + /// For now let's put this responsibility on the overlay + async fn approve_and_forward( &self, - block: Block, + block: &Block, adapter: &Network, - ) -> tokio::sync::mpsc::Receiver; - async fn forward_approval(&self, approval: Approval, adapter: &Network); + ) -> Result<(), Box>; + /// Wait for consensus on a block + async fn wait_for_consensus(&self, block: &Block, adapter: &Network); }