Add flat overlay (#55)

This commit is contained in:
Giacomo Pasini 2023-01-27 10:37:04 +01:00 committed by GitHub
parent 8e2346c29e
commit f5175c74c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 202 additions and 66 deletions

View File

@ -19,6 +19,7 @@ tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1"
futures = "0.3"
waku-bindings = { version = "0.1.0-beta2", optional = true}
tracing = "0.1"
[features]
default = []

View File

@ -10,7 +10,8 @@ pub mod overlay;
mod tip;
// std
use std::collections::{BTreeMap, HashSet};
use std::collections::BTreeMap;
use std::error::Error;
use std::fmt::Debug;
// crates
// internal
@ -140,15 +141,27 @@ where
let view = view_generator.next().await;
// if we want to process multiple views at the same time this can
// be spawned as a separate future
// TODO: add leadership module
view.resolve::<A, Member<'_, COMMITTEE_SIZE>, _, _, _>(
private_key,
&tip,
&network_adapter,
&fountain,
&leadership,
)
.await;
// FIXME: this should probably have a timer to detect failed rounds
let res = view
.resolve::<A, Member<'_, COMMITTEE_SIZE>, _, _, _>(
private_key,
&tip,
&network_adapter,
&fountain,
&leadership,
)
.await;
match res {
Ok(_block) => {
// resolved block, mark as verified and possibly update the tip
// not sure what mark as verified means, e.g. if we want an event subscription
// system for this to be used for example by the ledger, storage and mempool
}
Err(e) => {
tracing::error!("Error while resolving view: {}", e);
}
}
}
}
}
@ -194,8 +207,6 @@ pub struct View {
}
impl View {
const APPROVAL_THRESHOLD: usize = 1;
// TODO: might want to encode steps in the type system
async fn resolve<'view, A, O, F, Tx, Id>(
&'view self,
@ -204,60 +215,48 @@ impl View {
adapter: &A,
fountain: &F,
leadership: &Leadership<Tx, Id>,
) where
) -> Result<Block, Box<dyn Error>>
where
A: NetworkAdapter + Send + Sync + 'static,
F: FountainCode,
O: Overlay<'view, A, F>,
{
let overlay = O::new(self, node_id);
// FIXME: this is still a working in progress and best-of-my-understanding
// of the consensus protocol, having pseudocode would be very helpful
// Consensus in Carnot is achieved in 4 steps from the point of view of a node:
// 1) The node receives a block proposal from a leader and verifies it
// 2, The node signals to the network its approval for the block.
// Depending on the overlay, this may require waiting for a certain number
// of other approvals.
// 3) The node waits for consensus to be reached and mark the block as verified
// 4) Upon verifing a block B, if B.parent = B' and B'.parent = B'' and
// B'.view = B''.view + 1, then the node commits B''.
// This happens implicitly at the chain level and does not require any
// explicit action from the node.
// 1) Collect and verify block proposal.
// If this node is the leader this is trivial
let block = if let LeadershipResult::Leader { block, .. } =
leadership.try_propose_block(self, tip).await
{
Ok(block)
block
} else {
overlay.reconstruct_proposal_block(adapter, fountain).await
overlay
.reconstruct_proposal_block(adapter, fountain)
.await?
// TODO: reshare the block?
// TODO: verify
};
match block {
Ok(block) => {
// TODO: verify?
overlay
.broadcast_block(block.clone(), adapter, fountain)
.await;
self.approve(&overlay, block, adapter).await;
}
Err(_e) => {
// TODO: log error
}
}
}
async fn approve<
'view,
Network: NetworkAdapter,
Fountain: FountainCode,
O: Overlay<'view, Network, Fountain>,
>(
&'view self,
overlay: &O,
block: Block,
adapter: &Network,
) {
// wait for approval in the overlay, if necessary
let mut approvals = HashSet::new();
let mut stream = overlay.collect_approvals(block, adapter).await;
while let Some(approval) = stream.recv().await {
approvals.insert(approval);
if approvals.len() > Self::APPROVAL_THRESHOLD {
let self_approval = self.craft_proof_of_approval(approvals.into_iter());
overlay.forward_approval(self_approval, adapter).await;
return;
}
}
}
// 2) Signal approval to the network
overlay.approve_and_forward(&block, adapter).await?;
fn craft_proof_of_approval(&self, _approvals: impl Iterator<Item = Approval>) -> Approval {
todo!()
// 3) Wait for consensus
overlay.wait_for_consensus(&block, adapter).await;
Ok(block)
}
pub fn is_leader(&self, _node_id: NodeId) -> bool {

View File

@ -110,7 +110,7 @@ impl NetworkAdapter for WakuAdapter {
};
}
async fn approvals_stream(&self) -> Box<dyn Stream<Item = Approval>> {
async fn approvals_stream(&self) -> Box<dyn Stream<Item = Approval> + Send> {
let stream_channel = self
.message_subscriber_channel()
.await

View File

@ -21,6 +21,6 @@ pub trait NetworkAdapter {
) -> Self;
async fn proposal_chunks_stream(&self) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin>;
async fn broadcast_block_chunk(&self, view: &View, chunk_msg: ProposalChunkMsg);
async fn approvals_stream(&self) -> Box<dyn Stream<Item = Approval>>;
async fn approvals_stream(&self) -> Box<dyn Stream<Item = Approval> + Send>;
async fn forward_approval(&self, approval: ApprovalMsg);
}

View File

@ -89,8 +89,12 @@ impl<'view, const C: usize> Member<'view, C> {
}
// Return participants in the children committees this member should interact with
pub fn children_committes(&self) -> (Committee, Committee) {
self.committee.children()
pub fn children_committes(&self) -> (Option<Committee>, Option<Committee>) {
let (left, right) = self.committee.children();
(
self.committees.get_committee_members(left).map(|_| left),
self.committees.get_committee_members(right).map(|_| right),
)
}
}
@ -125,15 +129,22 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const
.await;
}
async fn collect_approvals(
async fn approve_and_forward(
&self,
_block: Block,
_block: &Block,
adapter: &Network,
) -> tokio::sync::mpsc::Receiver<Approval> {
) -> Result<(), Box<dyn Error>> {
// roughly, we want to do something like this:
// 1. wait for left and right children committees to approve
// 2. approve the block
// 3. forward the approval to the parent committee
//
// However this will likely change depending on the position
// of the committee in the tree
todo!()
}
async fn forward_approval(&self, _approval: Approval, adapter: &Network) {
todo!()
async fn wait_for_consensus(&self, _approval: &Block, adapter: &Network) {
// maybe the leader publishing the QC?
}
}

View File

@ -0,0 +1,119 @@
use std::collections::HashSet;
// std
use std::error::Error;
// crates
use futures::StreamExt;
// internal
use super::*;
use crate::network::messages::{ApprovalMsg, ProposalChunkMsg};
use crate::network::NetworkAdapter;
const DEFAULT_THRESHOLD: Threshold = Threshold::new(2, 3);
/// The share of nodes that need to approve a block for it to be valid
/// expressed as a fraction of the total number of nodes
#[derive(Copy, Clone, Debug)]
pub struct Threshold {
num: u64,
den: u64,
}
impl Threshold {
pub const fn new(num: u64, den: u64) -> Self {
Self { num, den }
}
}
/// A flat overlay, everyone is in the same committee.
/// As far as the API is concerned, this should be equivalent to any other
/// overlay and far simpler to implement.
/// For this reason, this might act as a 'reference' overlay for testing.
pub struct Flat<'view> {
view: &'view View,
// TODO: this should be a const param, but we can't do that yet
threshold: Threshold,
node_id: NodeId,
}
impl<'view> Flat<'view> {
pub fn new(view: &'view View, node_id: NodeId) -> Self {
Self {
view,
threshold: DEFAULT_THRESHOLD,
node_id,
}
}
fn approve(&self, _block: &Block) -> Approval {
// we still need to define how votes look like
todo!()
}
}
#[async_trait::async_trait]
impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync>
Overlay<'view, Network, Fountain> for Flat<'view>
{
fn new(view: &'view View, node: NodeId) -> Self {
Flat::new(view, node)
}
async fn reconstruct_proposal_block(
&self,
adapter: &Network,
fountain: &Fountain,
) -> Result<Block, FountainError> {
let message_stream = adapter.proposal_chunks_stream().await;
fountain.decode(message_stream).await.map(Block::from_bytes)
}
async fn broadcast_block(&self, block: Block, adapter: &Network, fountain: &Fountain) {
let block_bytes = block.as_bytes();
let encoded_stream = fountain.encode(&block_bytes);
encoded_stream
.for_each_concurrent(None, |chunk| async move {
let message = ProposalChunkMsg { chunk };
adapter.broadcast_block_chunk(self.view, message).await;
})
.await;
}
async fn approve_and_forward(
&self,
block: &Block,
adapter: &Network,
) -> Result<(), Box<dyn Error>> {
// in the flat overlay, there's no need to wait for anyone before approving the block
let approval = self.approve(block);
adapter
.forward_approval(ApprovalMsg {
approval,
source: self.node_id,
})
.await;
Ok(())
}
async fn wait_for_consensus(&self, _approval: &Block, adapter: &Network) {
// for now, let's pretend that consensus is reached as soon as the
// block is approved by a share of the nodes
let mut approvals = HashSet::new();
let mut stream = Box::into_pin(adapter.approvals_stream().await);
// Shadow the original binding so that it can't be directly accessed
// ever again.
while let Some(approval) = stream.next().await {
// insert the approval in the map to deduplicate
// TODO: validate approval
approvals.insert(approval);
// ceil(num/den * n)
let threshold =
(self.threshold.num * self.view.staking_keys.len() as u64 + self.threshold.den - 1)
/ self.threshold.den;
if approvals.len() as u64 >= threshold {
// consensus reached
break;
}
}
}
}

View File

@ -1,7 +1,9 @@
#[allow(unused)]
mod committees;
mod flat;
// std
use std::error::Error;
// crates
// internal
use super::{Approval, NodeId, View};
@ -21,10 +23,14 @@ pub trait Overlay<'view, Network: NetworkAdapter, Fountain: FountainCode> {
fountain: &Fountain,
) -> Result<Block, FountainError>;
async fn broadcast_block(&self, block: Block, adapter: &Network, fountain: &Fountain);
async fn collect_approvals(
/// Different overlays might have different needs or the same overlay might
/// require different steps depending on the node role
/// For now let's put this responsibility on the overlay
async fn approve_and_forward(
&self,
block: Block,
block: &Block,
adapter: &Network,
) -> tokio::sync::mpsc::Receiver<Approval>;
async fn forward_approval(&self, approval: Approval, adapter: &Network);
) -> Result<(), Box<dyn Error>>;
/// Wait for consensus on a block
async fn wait_for_consensus(&self, block: &Block, adapter: &Network);
}