diff --git a/nomos-services/consensus/Cargo.toml b/nomos-services/consensus/Cargo.toml
index f1c3e186..94f6cf19 100644
--- a/nomos-services/consensus/Cargo.toml
+++ b/nomos-services/consensus/Cargo.toml
@@ -19,6 +19,7 @@ tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1"
futures = "0.3"
waku-bindings = { version = "0.1.0-beta2", optional = true}
+tracing = "0.1"
[features]
default = []
diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs
index 61374242..b75b3024 100644
--- a/nomos-services/consensus/src/lib.rs
+++ b/nomos-services/consensus/src/lib.rs
@@ -10,7 +10,8 @@ pub mod overlay;
mod tip;
// std
-use std::collections::{BTreeMap, HashSet};
+use std::collections::BTreeMap;
+use std::error::Error;
use std::fmt::Debug;
// crates
// internal
@@ -140,15 +141,27 @@ where
let view = view_generator.next().await;
// if we want to process multiple views at the same time this can
// be spawned as a separate future
- // TODO: add leadership module
- view.resolve::, _, _, _>(
- private_key,
- &tip,
- &network_adapter,
- &fountain,
- &leadership,
- )
- .await;
+
+ // FIXME: this should probably have a timer to detect failed rounds
+ let res = view
+ .resolve::, _, _, _>(
+ private_key,
+ &tip,
+ &network_adapter,
+ &fountain,
+ &leadership,
+ )
+ .await;
+ match res {
+ Ok(_block) => {
+ // resolved block, mark as verified and possibly update the tip
+ // not sure what mark as verified means, e.g. if we want an event subscription
+ // system for this to be used for example by the ledger, storage and mempool
+ }
+ Err(e) => {
+ tracing::error!("Error while resolving view: {}", e);
+ }
+ }
}
}
}
@@ -194,8 +207,6 @@ pub struct View {
}
impl View {
- const APPROVAL_THRESHOLD: usize = 1;
-
// TODO: might want to encode steps in the type system
async fn resolve<'view, A, O, F, Tx, Id>(
&'view self,
@@ -204,60 +215,48 @@ impl View {
adapter: &A,
fountain: &F,
leadership: &Leadership,
- ) where
+ ) -> Result>
+ where
A: NetworkAdapter + Send + Sync + 'static,
F: FountainCode,
O: Overlay<'view, A, F>,
{
let overlay = O::new(self, node_id);
+ // FIXME: this is still a working in progress and best-of-my-understanding
+ // of the consensus protocol, having pseudocode would be very helpful
+ // Consensus in Carnot is achieved in 4 steps from the point of view of a node:
+ // 1) The node receives a block proposal from a leader and verifies it
+ // 2, The node signals to the network its approval for the block.
+ // Depending on the overlay, this may require waiting for a certain number
+ // of other approvals.
+ // 3) The node waits for consensus to be reached and mark the block as verified
+ // 4) Upon verifing a block B, if B.parent = B' and B'.parent = B'' and
+ // B'.view = B''.view + 1, then the node commits B''.
+ // This happens implicitly at the chain level and does not require any
+ // explicit action from the node.
+
+ // 1) Collect and verify block proposal.
+ // If this node is the leader this is trivial
let block = if let LeadershipResult::Leader { block, .. } =
leadership.try_propose_block(self, tip).await
{
- Ok(block)
+ block
} else {
- overlay.reconstruct_proposal_block(adapter, fountain).await
+ overlay
+ .reconstruct_proposal_block(adapter, fountain)
+ .await?
+ // TODO: reshare the block?
+ // TODO: verify
};
- match block {
- Ok(block) => {
- // TODO: verify?
- overlay
- .broadcast_block(block.clone(), adapter, fountain)
- .await;
- self.approve(&overlay, block, adapter).await;
- }
- Err(_e) => {
- // TODO: log error
- }
- }
- }
- async fn approve<
- 'view,
- Network: NetworkAdapter,
- Fountain: FountainCode,
- O: Overlay<'view, Network, Fountain>,
- >(
- &'view self,
- overlay: &O,
- block: Block,
- adapter: &Network,
- ) {
- // wait for approval in the overlay, if necessary
- let mut approvals = HashSet::new();
- let mut stream = overlay.collect_approvals(block, adapter).await;
- while let Some(approval) = stream.recv().await {
- approvals.insert(approval);
- if approvals.len() > Self::APPROVAL_THRESHOLD {
- let self_approval = self.craft_proof_of_approval(approvals.into_iter());
- overlay.forward_approval(self_approval, adapter).await;
- return;
- }
- }
- }
+ // 2) Signal approval to the network
+ overlay.approve_and_forward(&block, adapter).await?;
- fn craft_proof_of_approval(&self, _approvals: impl Iterator- ) -> Approval {
- todo!()
+ // 3) Wait for consensus
+ overlay.wait_for_consensus(&block, adapter).await;
+
+ Ok(block)
}
pub fn is_leader(&self, _node_id: NodeId) -> bool {
diff --git a/nomos-services/consensus/src/network/adapters/waku.rs b/nomos-services/consensus/src/network/adapters/waku.rs
index 7244f363..a9849d85 100644
--- a/nomos-services/consensus/src/network/adapters/waku.rs
+++ b/nomos-services/consensus/src/network/adapters/waku.rs
@@ -110,7 +110,7 @@ impl NetworkAdapter for WakuAdapter {
};
}
- async fn approvals_stream(&self) -> Box> {
+ async fn approvals_stream(&self) -> Box + Send> {
let stream_channel = self
.message_subscriber_channel()
.await
diff --git a/nomos-services/consensus/src/network/mod.rs b/nomos-services/consensus/src/network/mod.rs
index 68277ae5..684e0f13 100644
--- a/nomos-services/consensus/src/network/mod.rs
+++ b/nomos-services/consensus/src/network/mod.rs
@@ -21,6 +21,6 @@ pub trait NetworkAdapter {
) -> Self;
async fn proposal_chunks_stream(&self) -> Box + Send + Sync + Unpin>;
async fn broadcast_block_chunk(&self, view: &View, chunk_msg: ProposalChunkMsg);
- async fn approvals_stream(&self) -> Box>;
+ async fn approvals_stream(&self) -> Box + Send>;
async fn forward_approval(&self, approval: ApprovalMsg);
}
diff --git a/nomos-services/consensus/src/overlay/committees.rs b/nomos-services/consensus/src/overlay/committees.rs
index 5acea30b..96b42511 100644
--- a/nomos-services/consensus/src/overlay/committees.rs
+++ b/nomos-services/consensus/src/overlay/committees.rs
@@ -89,8 +89,12 @@ impl<'view, const C: usize> Member<'view, C> {
}
// Return participants in the children committees this member should interact with
- pub fn children_committes(&self) -> (Committee, Committee) {
- self.committee.children()
+ pub fn children_committes(&self) -> (Option, Option) {
+ let (left, right) = self.committee.children();
+ (
+ self.committees.get_committee_members(left).map(|_| left),
+ self.committees.get_committee_members(right).map(|_| right),
+ )
}
}
@@ -125,15 +129,22 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const
.await;
}
- async fn collect_approvals(
+ async fn approve_and_forward(
&self,
- _block: Block,
+ _block: &Block,
adapter: &Network,
- ) -> tokio::sync::mpsc::Receiver {
+ ) -> Result<(), Box> {
+ // roughly, we want to do something like this:
+ // 1. wait for left and right children committees to approve
+ // 2. approve the block
+ // 3. forward the approval to the parent committee
+ //
+ // However this will likely change depending on the position
+ // of the committee in the tree
todo!()
}
- async fn forward_approval(&self, _approval: Approval, adapter: &Network) {
- todo!()
+ async fn wait_for_consensus(&self, _approval: &Block, adapter: &Network) {
+ // maybe the leader publishing the QC?
}
}
diff --git a/nomos-services/consensus/src/overlay/flat.rs b/nomos-services/consensus/src/overlay/flat.rs
new file mode 100644
index 00000000..0e87d7a1
--- /dev/null
+++ b/nomos-services/consensus/src/overlay/flat.rs
@@ -0,0 +1,119 @@
+use std::collections::HashSet;
+// std
+use std::error::Error;
+// crates
+use futures::StreamExt;
+// internal
+use super::*;
+use crate::network::messages::{ApprovalMsg, ProposalChunkMsg};
+use crate::network::NetworkAdapter;
+
+const DEFAULT_THRESHOLD: Threshold = Threshold::new(2, 3);
+
+/// The share of nodes that need to approve a block for it to be valid
+/// expressed as a fraction of the total number of nodes
+#[derive(Copy, Clone, Debug)]
+pub struct Threshold {
+ num: u64,
+ den: u64,
+}
+
+impl Threshold {
+ pub const fn new(num: u64, den: u64) -> Self {
+ Self { num, den }
+ }
+}
+
+/// A flat overlay, everyone is in the same committee.
+/// As far as the API is concerned, this should be equivalent to any other
+/// overlay and far simpler to implement.
+/// For this reason, this might act as a 'reference' overlay for testing.
+pub struct Flat<'view> {
+ view: &'view View,
+ // TODO: this should be a const param, but we can't do that yet
+ threshold: Threshold,
+ node_id: NodeId,
+}
+
+impl<'view> Flat<'view> {
+ pub fn new(view: &'view View, node_id: NodeId) -> Self {
+ Self {
+ view,
+ threshold: DEFAULT_THRESHOLD,
+ node_id,
+ }
+ }
+
+ fn approve(&self, _block: &Block) -> Approval {
+ // we still need to define how votes look like
+ todo!()
+ }
+}
+
+#[async_trait::async_trait]
+impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync>
+ Overlay<'view, Network, Fountain> for Flat<'view>
+{
+ fn new(view: &'view View, node: NodeId) -> Self {
+ Flat::new(view, node)
+ }
+
+ async fn reconstruct_proposal_block(
+ &self,
+ adapter: &Network,
+ fountain: &Fountain,
+ ) -> Result {
+ let message_stream = adapter.proposal_chunks_stream().await;
+ fountain.decode(message_stream).await.map(Block::from_bytes)
+ }
+
+ async fn broadcast_block(&self, block: Block, adapter: &Network, fountain: &Fountain) {
+ let block_bytes = block.as_bytes();
+ let encoded_stream = fountain.encode(&block_bytes);
+ encoded_stream
+ .for_each_concurrent(None, |chunk| async move {
+ let message = ProposalChunkMsg { chunk };
+ adapter.broadcast_block_chunk(self.view, message).await;
+ })
+ .await;
+ }
+
+ async fn approve_and_forward(
+ &self,
+ block: &Block,
+ adapter: &Network,
+ ) -> Result<(), Box> {
+ // in the flat overlay, there's no need to wait for anyone before approving the block
+ let approval = self.approve(block);
+ adapter
+ .forward_approval(ApprovalMsg {
+ approval,
+ source: self.node_id,
+ })
+ .await;
+ Ok(())
+ }
+
+ async fn wait_for_consensus(&self, _approval: &Block, adapter: &Network) {
+ // for now, let's pretend that consensus is reached as soon as the
+ // block is approved by a share of the nodes
+ let mut approvals = HashSet::new();
+ let mut stream = Box::into_pin(adapter.approvals_stream().await);
+
+ // Shadow the original binding so that it can't be directly accessed
+ // ever again.
+ while let Some(approval) = stream.next().await {
+ // insert the approval in the map to deduplicate
+ // TODO: validate approval
+ approvals.insert(approval);
+ // ceil(num/den * n)
+ let threshold =
+ (self.threshold.num * self.view.staking_keys.len() as u64 + self.threshold.den - 1)
+ / self.threshold.den;
+ if approvals.len() as u64 >= threshold {
+ // consensus reached
+ break;
+ }
+ }
+ }
+}
diff --git a/nomos-services/consensus/src/overlay/mod.rs b/nomos-services/consensus/src/overlay/mod.rs
index e770a28d..dab6a871 100644
--- a/nomos-services/consensus/src/overlay/mod.rs
+++ b/nomos-services/consensus/src/overlay/mod.rs
@@ -1,7 +1,9 @@
#[allow(unused)]
mod committees;
+mod flat;
// std
+use std::error::Error;
// crates
// internal
use super::{Approval, NodeId, View};
@@ -21,10 +23,14 @@ pub trait Overlay<'view, Network: NetworkAdapter, Fountain: FountainCode> {
fountain: &Fountain,
) -> Result;
async fn broadcast_block(&self, block: Block, adapter: &Network, fountain: &Fountain);
- async fn collect_approvals(
+ /// Different overlays might have different needs or the same overlay might
+ /// require different steps depending on the node role
+ /// For now let's put this responsibility on the overlay
+ async fn approve_and_forward(
&self,
- block: Block,
+ block: &Block,
adapter: &Network,
- ) -> tokio::sync::mpsc::Receiver;
- async fn forward_approval(&self, approval: Approval, adapter: &Network);
+ ) -> Result<(), Box>;
+ /// Wait for consensus on a block
+ async fn wait_for_consensus(&self, block: &Block, adapter: &Network);
}