Complete consensus (#65)

* complete consensus

* review comments
This commit is contained in:
Giacomo Pasini 2023-02-08 10:23:55 +01:00 committed by GitHub
parent 5f21a2734a
commit bbb783e1da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 235 additions and 76 deletions

View File

@ -12,8 +12,7 @@ pub struct Block;
pub struct BlockHeader;
/// Identifier of a block
#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub struct BlockId;
pub type BlockId = [u8; 32];
impl Block {
/// Encode block into bytes
@ -24,10 +23,14 @@ impl Block {
pub fn from_bytes(_: Bytes) -> Self {
Self
}
pub fn header(&self) -> BlockHeader {
BlockHeader
}
}
impl BlockHeader {
pub fn id(&self) -> BlockId {
BlockId
todo!()
}
}

View File

@ -40,6 +40,7 @@ impl<Tx, Id> Leadership<Tx, Id> {
&self,
view: &'view View,
tip: &Tip,
qc: Approval,
) -> LeadershipResult<'view> {
let ancestor_hint = todo!("get the ancestor from the tip");
if view.is_leader(self.key.key) {

View File

@ -7,6 +7,8 @@
mod leadership;
mod network;
pub mod overlay;
#[cfg(test)]
mod test;
mod tip;
// std
@ -111,8 +113,6 @@ where
}
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
let mut view_generator = self.view_generator().await;
let network_relay: OutboundRelay<_> = self
.network_relay
.connect()
@ -137,13 +137,18 @@ where
let fountain = F::new(fountain_settings);
let leadership = Leadership::new(private_key, mempool_relay);
// FIXME: this should be taken from config
let mut cur_view = View {
seed: [0; 32],
staking_keys: BTreeMap::new(),
view_n: 0,
};
loop {
let view = view_generator.next().await;
// if we want to process multiple views at the same time this can
// be spawned as a separate future
// FIXME: this should probably have a timer to detect failed rounds
let res = view
let res = cur_view
.resolve::<A, Member<'_, COMMITTEE_SIZE>, _, _, _>(
private_key,
&tip,
@ -153,10 +158,11 @@ where
)
.await;
match res {
Ok(_block) => {
Ok((_block, view)) => {
// resolved block, mark as verified and possibly update the tip
// not sure what mark as verified means, e.g. if we want an event subscription
// system for this to be used for example by the ledger, storage and mempool
cur_view = view;
}
Err(e) => {
tracing::error!("Error while resolving view: {}", e);
@ -166,35 +172,6 @@ where
}
}
impl<A, P, M, F> CarnotConsensus<A, P, M, F>
where
F: FountainCode + Send + Sync + 'static,
A: NetworkAdapter + Send + Sync + 'static,
P: MemPool + Send + Sync + 'static,
P::Settings: Clone + Send + Sync + 'static,
P::Tx: Debug + Send + Sync + 'static,
P::Id: Debug + Send + Sync + 'static,
M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static,
{
// Build a service that generates new views as they become available
async fn view_generator(&self) -> ViewGenerator {
todo!()
}
}
/// Tracks new views and make them available as soon as they are available
///
/// A new view is normally generated as soon a a block is approved, but
/// additional logic is needed in failure cases, like when no new block is
/// approved for a long enough period of time
struct ViewGenerator;
impl ViewGenerator {
async fn next(&mut self) -> View {
todo!()
}
}
#[derive(Hash, Eq, PartialEq)]
pub struct Approval;
@ -203,67 +180,136 @@ pub struct Approval;
pub struct View {
seed: Seed,
staking_keys: BTreeMap<NodeId, Stake>,
_view_n: u64,
pub view_n: u64,
}
impl View {
// TODO: might want to encode steps in the type system
async fn resolve<'view, A, O, F, Tx, Id>(
pub async fn resolve<'view, A, O, F, Tx, Id>(
&'view self,
node_id: NodeId,
tip: &Tip,
adapter: &A,
fountain: &F,
leadership: &Leadership<Tx, Id>,
) -> Result<Block, Box<dyn Error>>
) -> Result<(Block, View), Box<dyn Error>>
where
A: NetworkAdapter + Send + Sync + 'static,
F: FountainCode,
O: Overlay<'view, A, F>,
{
let res = if self.is_leader(node_id) {
let block = self
.resolve_leader::<A, O, F, _, _>(node_id, tip, adapter, fountain, leadership)
.await
.unwrap(); // FIXME: handle sad path
let next_view = self.generate_next_view(&block);
(block, next_view)
} else {
self.resolve_non_leader::<A, O, F>(node_id, adapter, fountain)
.await
.unwrap() // FIXME: handle sad path
};
// Commit phase:
// Upon verifing a block B, if B.parent = B' and B'.parent = B'' and
// B'.view = B''.view + 1, then the node commits B''.
// This happens implicitly at the chain level and does not require any
// explicit action from the node.
Ok(res)
}
async fn resolve_leader<'view, A, O, F, Tx, Id>(
&'view self,
node_id: NodeId,
tip: &Tip,
adapter: &A,
fountain: &F,
leadership: &Leadership<Tx, Id>,
) -> Result<Block, ()>
where
A: NetworkAdapter + Send + Sync + 'static,
F: FountainCode,
O: Overlay<'view, A, F>,
{
let overlay = O::new(self, node_id);
// FIXME: this is still a working in progress and best-of-my-understanding
// of the consensus protocol, having pseudocode would be very helpful
// Consensus in Carnot is achieved in 4 steps from the point of view of a node:
// 1) The node receives a block proposal from a leader and verifies it
// 2, The node signals to the network its approval for the block.
// Depending on the overlay, this may require waiting for a certain number
// of other approvals.
// 3) The node waits for consensus to be reached and mark the block as verified
// 4) Upon verifing a block B, if B.parent = B' and B'.parent = B'' and
// B'.view = B''.view + 1, then the node commits B''.
// This happens implicitly at the chain level and does not require any
// explicit action from the node.
// We need to build the QC for the block we are proposing
let qc = overlay.build_qc(adapter).await;
// 1) Collect and verify block proposal.
// If this node is the leader this is trivial
let block = if let LeadershipResult::Leader { block, .. } =
leadership.try_propose_block(self, tip).await
{
block
} else {
overlay
.reconstruct_proposal_block(adapter, fountain)
.await?
// TODO: reshare the block?
// TODO: verify
};
let LeadershipResult::Leader { block, _view } = leadership
.try_propose_block(self, tip, qc)
.await else { panic!("we are leader")};
// 2) Signal approval to the network
overlay.approve_and_forward(&block, adapter).await?;
// 3) Wait for consensus
overlay.wait_for_consensus(&block, adapter).await;
overlay
.broadcast_block(block.clone(), adapter, fountain)
.await;
Ok(block)
}
async fn resolve_non_leader<'view, A, O, F>(
&'view self,
node_id: NodeId,
adapter: &A,
fountain: &F,
) -> Result<(Block, View), ()>
where
A: NetworkAdapter + Send + Sync + 'static,
F: FountainCode,
O: Overlay<'view, A, F>,
{
let overlay = O::new(self, node_id);
// Consensus in Carnot is achieved in 2 steps from the point of view of a node:
// 1) The node receives a block proposal from a leader and verifies it
// 2) The node signals to the network its approval for the block.
// Depending on the overlay, this may require waiting for a certain number
// of other approvals.
// 1) Collect and verify block proposal.
let block = overlay
.reconstruct_proposal_block(adapter, fountain)
.await
.unwrap(); // FIXME: handle sad path
// TODO: verify
// TODO: reshare the block?
let next_view = self.generate_next_view(&block);
// 2) Signal approval to the network
// We only consider the happy path for now
if self.pipelined_safe_block(&block) {
overlay
.approve_and_forward(&block, adapter, &next_view)
.await
.unwrap(); // FIXME: handle sad path
}
Ok((block, next_view))
}
pub fn is_leader(&self, _node_id: NodeId) -> bool {
todo!()
false
}
pub fn id(&self) -> u64 {
self._view_n
self.view_n
}
// Verifies the block is new and the previous leader did not fail
fn pipelined_safe_block(&self, _: &Block) -> bool {
// return b.view_n >= self.view_n && b.view_n == b.qc.view_n
true
}
fn generate_next_view(&self, _b: &Block) -> View {
let mut seed = self.seed;
seed[0] += 1;
View {
seed,
staking_keys: self.staking_keys.clone(),
view_n: self.view_n + 1,
}
}
}

View File

@ -154,6 +154,7 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const
&self,
_block: &Block,
_adapter: &Network,
_next_view: &View,
) -> Result<(), Box<dyn Error>> {
// roughly, we want to do something like this:
// 1. wait for left and right children committees to approve
@ -165,7 +166,8 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const
todo!()
}
async fn wait_for_consensus(&self, _approval: &Block, _adapter: &Network) {
async fn build_qc(&self, _adapter: &Network) -> Approval {
// maybe the leader publishing the QC?
todo!()
}
}

View File

@ -87,6 +87,7 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync>
&self,
block: &Block,
adapter: &Network,
_next_view: &View,
) -> Result<(), Box<dyn Error>> {
// in the flat overlay, there's no need to wait for anyone before approving the block
let approval = self.approve(block);
@ -103,7 +104,7 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync>
Ok(())
}
async fn wait_for_consensus(&self, _approval: &Block, adapter: &Network) {
async fn build_qc(&self, adapter: &Network) -> Approval {
// for now, let's pretend that consensus is reached as soon as the
// block is approved by a share of the nodes
let mut approvals = HashSet::new();
@ -121,8 +122,10 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync>
/ self.threshold.den;
if approvals.len() as u64 >= threshold {
// consensus reached
break;
// FIXME: build a real QC
return Approval;
}
}
unimplemented!("consensus not reached")
}
}

View File

@ -29,7 +29,8 @@ pub trait Overlay<'view, Network: NetworkAdapter, Fountain: FountainCode> {
&self,
block: &Block,
adapter: &Network,
next_view: &View,
) -> Result<(), Box<dyn Error>>;
/// Wait for consensus on a block
async fn wait_for_consensus(&self, block: &Block, adapter: &Network);
async fn build_qc(&self, adapter: &Network) -> Approval;
}

View File

@ -0,0 +1,103 @@
use crate::network::messages::*;
use crate::overlay::committees::*;
use crate::overlay::*;
use crate::*;
use async_trait::async_trait;
use bytes::Bytes;
use futures::Stream;
use nomos_core::block::*;
use nomos_core::fountain::{mock::MockFountain, FountainCode, FountainError};
use nomos_network::backends::NetworkBackend;
use nomos_network::NetworkService;
use overwatch_rs::services::relay::*;
use tokio::sync::broadcast::Receiver;
struct DummyOverlay;
struct DummyAdapter;
struct DummyBackend;
#[async_trait]
impl<'view, N: NetworkAdapter + Sync, F: FountainCode + Sync> Overlay<'view, N, F>
for DummyOverlay
{
fn new(_: &View, _: NodeId) -> Self {
DummyOverlay
}
async fn build_qc(&self, _: &N) -> Approval {
Approval
}
async fn broadcast_block(&self, _: Block, _: &N, _: &F) {}
async fn reconstruct_proposal_block(&self, _: &N, _: &F) -> Result<Block, FountainError> {
Ok(Block)
}
async fn approve_and_forward(&self, _: &Block, _: &N, _: &View) -> Result<(), Box<dyn Error>> {
Ok(())
}
}
#[async_trait]
impl NetworkAdapter for DummyAdapter {
type Backend = DummyBackend;
async fn new(
_: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self {
DummyAdapter
}
async fn proposal_chunks_stream(
&self,
_: Committee,
_: &View,
) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin> {
unimplemented!()
}
async fn broadcast_block_chunk(&self, _: Committee, _: &View, _: ProposalChunkMsg) {
unimplemented!()
}
async fn approvals_stream(
&self,
_: Committee,
_: &View,
) -> Box<dyn Stream<Item = Approval> + Send> {
unimplemented!()
}
async fn forward_approval(&self, _: Committee, _: &View, _: ApprovalMsg) {}
}
#[async_trait]
impl NetworkBackend for DummyBackend {
type Settings = ();
type State = NoState<()>;
type Message = ();
type EventKind = ();
type NetworkEvent = ();
fn new(_config: Self::Settings) -> Self {
Self
}
async fn process(&self, _: Self::Message) {}
async fn subscribe(&mut self, _: Self::EventKind) -> Receiver<Self::NetworkEvent> {
unimplemented!()
}
}
#[tokio::test]
async fn test_single_round_non_leader() {
let view = View {
seed: [0; 32],
staking_keys: BTreeMap::new(),
view_n: 0,
};
let (_, next_view) = view
.resolve_non_leader::<DummyAdapter, DummyOverlay, MockFountain>(
[0; 32],
&DummyAdapter,
&MockFountain,
)
.await
.unwrap();
assert!(next_view.view_n == 1);
}