Consensus orchestrator (#128)

* Add happy-path consensus engine

* tmp

* Fit types from spec (#124)

* Match types to spec

* Remove Output import

* Consensus engine rework (#126)

* rework

* fix test

* clippy happy

---------

Co-authored-by: Giacomo Pasini <Zeegomo@users.noreply.github.com>

* Adapt carnot network adapter interfaces and implementations

* Fix errors

* Update network with engine types

* Fit types yet again

* Remove leadership and old overlay
Create carnot event builder
Added some adjustments

* Add view to vote

* Fix serde derive in consensus-engine

* Add serde feature for engine in core

* Use view in tally

* Move carnot tally to consensus service

* Add new view msg

* Fit engine types in adapter

* Missing serde feature in consensus service

* Implement carnot event builder

* Implement even builder run main tasks

* Fill up view resolver

* Fix errors on network adapter implementations

* Clippy happy

* Extract event handling to independent methods in View

* Fix test

* Refactor carnot event builder (#135)

* refactor

* format

* Discriminate proposal messages (#136)

* Derive block id from wire format (#139)

* Derive block id from wire format

* Derive id on block creation

* Use compile time hash size

* Add leader role (#138)

* add leadership stub

* fix gather_new_views

* fmt

* actually build qc

* remove redundant fields

* add flat overlay (#143)

* add flat overlay

* fix

* sort imports

* fix tests

* Unhappy tally (#137)

* Refactor tally module

* Implement tally for new view messages

* Assess pr comments

* Fix rebase

* simplify tally

---------

Co-authored-by: Giacomo Pasini <g.pasini98@gmail.com>

* Working node (#149)

* fix gather_new_views

* working node

* fix unhappy path

* remove leftover

* fix comments

* update waku (#146)

* update waku

* Fix waku update

---------

Co-authored-by: danielsanchezq <sanchez.quiros.daniel@gmail.com>

* little fixes

* Consensus tasks cancellation (#147)

* fix

* Create view cancel and cancel cache

* Attach cancellation to consensus tasks

* Fix view binds

---------

Co-authored-by: Giacomo Pasini <g.pasini98@gmail.com>

---------

Co-authored-by: danielsanchezq <sanchez.quiros.daniel@gmail.com>

* Remove clones on consts

---------

Co-authored-by: Al Liu <scygliu1@gmail.com>
Co-authored-by: Giacomo Pasini <g.pasini98@gmail.com>
Co-authored-by: Giacomo Pasini <Zeegomo@users.noreply.github.com>
This commit is contained in:
Daniel Sanchez 2023-05-22 14:56:56 +02:00 committed by GitHub
parent fdc22111d3
commit bc453b686f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1155 additions and 988 deletions

View File

@ -6,8 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
serde = { version = "1.0", optional = true }
serde = { version = "1.0", features = ["derive"], optional = true }
[features]
default = []
serde1 = ["serde"]

View File

@ -23,9 +23,13 @@ impl<O: Overlay> Carnot<O> {
highest_voted_view: -1,
last_view_timeout_qc: None,
overlay,
safe_blocks: [(id, genesis_block)].into(),
safe_blocks: [(genesis_block.id, genesis_block)].into(),
}
}
pub fn current_view(&self) -> View {
self.current_view
}
/// Upon reception of a block
///
/// Preconditions:
@ -40,12 +44,10 @@ impl<O: Overlay> Carnot<O> {
self.safe_blocks.contains_key(&block.parent()),
"out of order view not supported, missing parent block for {block:?}",
);
// if the block has already been processed, return early
if self.safe_blocks.contains_key(&block.id) {
return Ok(self.clone());
}
if self.blocks_in_view(block.view).contains(&block)
|| block.view <= self.latest_committed_view()
{
@ -56,7 +58,6 @@ impl<O: Overlay> Carnot<O> {
// By rejecting any other blocks except the first one received for a view this code does NOT do that.
return Err(());
}
let mut new_state = self.clone();
if new_state.block_is_safe(block.clone()) {
@ -66,7 +67,6 @@ impl<O: Overlay> Carnot<O> {
// Non safe block, not necessarily an error
return Err(());
}
Ok(new_state)
}
@ -94,8 +94,13 @@ impl<O: Overlay> Carnot<O> {
/// Preconditions:
/// * `receive_block(b)` must have been called successfully before trying to approve a block b.
/// * A node should not attempt to vote for a block in a view earlier than the latest one it actively participated in.
pub fn approve_block(&self, block: Block) -> (Self, Output) {
assert!(self.safe_blocks.contains_key(&block.id));
pub fn approve_block(&self, block: Block) -> (Self, Send) {
assert!(
self.safe_blocks.contains_key(&block.id),
"{:?} not in {:?}",
block,
self.safe_blocks
);
assert!(
self.highest_voted_view < block.view,
"can't vote for a block in the past"
@ -114,9 +119,12 @@ impl<O: Overlay> Carnot<O> {
};
(
new_state,
Output::Send {
Send {
to,
payload: Payload::Vote(Vote { block: block.id }),
payload: Payload::Vote(Vote {
block: block.id,
view: block.view,
}),
},
)
}
@ -132,7 +140,7 @@ impl<O: Overlay> Carnot<O> {
&self,
timeout_qc: TimeoutQc,
new_views: HashSet<NewView>,
) -> (Self, Output) {
) -> (Self, Send) {
let new_view = timeout_qc.view + 1;
assert!(
new_view
@ -180,7 +188,7 @@ impl<O: Overlay> Carnot<O> {
};
(
new_state,
Output::Send {
Send {
to,
payload: Payload::NewView(new_view_msg),
},
@ -192,7 +200,7 @@ impl<O: Overlay> Carnot<O> {
/// Preconditions: none!
/// Just notice that the timer only reset after a view change, i.e. a node can't timeout
/// more than once for the same view
pub fn local_timeout(&self) -> (Self, Option<Output>) {
pub fn local_timeout(&self) -> (Self, Option<Send>) {
let mut new_state = self.clone();
new_state.highest_voted_view = new_state.current_view;
@ -208,7 +216,7 @@ impl<O: Overlay> Carnot<O> {
let to = new_state.overlay.root_committee();
return (
new_state,
Some(Output::Send {
Some(Send {
to,
payload: Payload::Timeout(timeout_msg),
}),
@ -305,6 +313,50 @@ impl<O: Overlay> Carnot<O> {
}
res
}
pub fn last_view_timeout_qc(&self) -> Option<TimeoutQc> {
self.last_view_timeout_qc.clone()
}
pub fn high_qc(&self) -> StandardQc {
self.local_high_qc.clone()
}
pub fn is_leader_for_view(&self, view: View) -> bool {
self.overlay.leader(view) == self.id
}
pub fn super_majority_threshold(&self) -> usize {
self.overlay.super_majority_threshold(self.id)
}
pub fn leader_super_majority_threshold(&self) -> usize {
self.overlay.leader_super_majority_threshold(self.id)
}
pub fn id(&self) -> NodeId {
self.id
}
pub fn self_committee(&self) -> Committee {
self.overlay.node_committee(self.id)
}
pub fn child_committees(&self) -> Vec<Committee> {
self.overlay.child_committees(self.id)
}
pub fn parent_committee(&self) -> Committee {
self.overlay.parent_committee(self.id)
}
pub fn root_committee(&self) -> Committee {
self.overlay.root_committee()
}
pub fn is_member_of_root_committee(&self) -> bool {
self.overlay.is_member_of_root_committee(self.id)
}
}
#[cfg(test)]
@ -315,6 +367,10 @@ mod test {
struct NoOverlay;
impl Overlay for NoOverlay {
fn new(_nodes: Vec<NodeId>) -> Self {
Self
}
fn root_committee(&self) -> Committee {
todo!()
}
@ -339,11 +395,19 @@ mod test {
todo!()
}
fn node_committee(&self, _id: NodeId) -> Committee {
todo!()
}
fn parent_committee(&self, _id: NodeId) -> Committee {
todo!()
}
fn leaf_committees(&self, _id: NodeId) -> HashSet<Committee> {
fn child_committees(&self, _id: NodeId) -> Vec<Committee> {
todo!()
}
fn leaf_committees(&self, _id: NodeId) -> Vec<Committee> {
todo!()
}
@ -355,7 +419,7 @@ mod test {
todo!()
}
fn leader_super_majority_threshold(&self, _view: View) -> usize {
fn leader_super_majority_threshold(&self, _id: NodeId) -> usize {
todo!()
}
}

View File

@ -1,5 +1,9 @@
// std
use std::collections::HashSet;
use std::hash::Hash;
// crates
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
pub type View = i64;
pub type NodeId = [u8; 32];
@ -12,7 +16,7 @@ pub type Committee = HashSet<NodeId>;
/// This enum represents the different types of messages that can be sent from the perspective of consensus and
/// can't be directly used in the network as they lack things like cryptographic signatures.
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum Payload {
/// Vote for a block in a view
Vote(Vote),
@ -23,14 +27,15 @@ pub enum Payload {
}
/// Returned
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Vote {
pub view: View,
pub block: BlockId,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Timeout {
pub view: View,
pub sender: NodeId,
@ -38,8 +43,10 @@ pub struct Timeout {
pub timeout_qc: Option<TimeoutQc>,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
// TODO: We are making "mandatory" to have received the timeout_qc before the new_view votes.
// We should consider to remove the TimoutQc from the NewView message and use a hash or id instead.
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct NewView {
pub view: View,
pub sender: NodeId,
@ -47,8 +54,8 @@ pub struct NewView {
pub high_qc: Qc,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct TimeoutQc {
pub view: View,
pub high_qc: Qc,
@ -56,8 +63,9 @@ pub struct TimeoutQc {
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Block {
#[cfg_attr(feature = "serde", serde(skip))]
pub id: BlockId,
pub view: View,
pub parent_qc: Qc,
@ -71,25 +79,20 @@ impl Block {
/// Possible output events.
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum Output {
Send {
to: HashSet<NodeId>,
payload: Payload,
},
Broadcast {
payload: Payload,
},
pub struct Send {
pub to: HashSet<NodeId>,
pub payload: Payload,
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct StandardQc {
pub view: View,
pub id: BlockId,
}
impl StandardQc {
pub(crate) fn genesis() -> Self {
pub fn genesis() -> Self {
Self {
view: -1,
id: [0; 32],
@ -98,14 +101,14 @@ impl StandardQc {
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct AggregateQc {
pub high_qc: StandardQc,
pub view: View,
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum Qc {
Standard(StandardQc),
Aggregated(AggregateQc),
@ -136,9 +139,17 @@ impl Qc {
Qc::Aggregated(AggregateQc { high_qc, .. }) => high_qc.id,
}
}
pub fn high_qc(&self) -> StandardQc {
match self {
Qc::Standard(qc) => qc.clone(),
Qc::Aggregated(AggregateQc { high_qc, .. }) => high_qc.clone(),
}
}
}
pub trait Overlay: Clone {
fn new(nodes: Vec<NodeId>) -> Self;
fn root_committee(&self) -> Committee;
fn rebuild(&mut self, timeout_qc: TimeoutQc);
fn is_member_of_child_committee(&self, parent: NodeId, child: NodeId) -> bool;
@ -146,8 +157,10 @@ pub trait Overlay: Clone {
fn is_member_of_leaf_committee(&self, id: NodeId) -> bool;
fn is_child_of_root_committee(&self, id: NodeId) -> bool;
fn parent_committee(&self, id: NodeId) -> Committee;
fn leaf_committees(&self, id: NodeId) -> HashSet<Committee>;
fn child_committees(&self, id: NodeId) -> Vec<Committee>;
fn leaf_committees(&self, id: NodeId) -> Vec<Committee>;
fn node_committee(&self, id: NodeId) -> Committee;
fn leader(&self, view: View) -> NodeId;
fn super_majority_threshold(&self, id: NodeId) -> usize;
fn leader_super_majority_threshold(&self, view: View) -> usize;
fn leader_super_majority_threshold(&self, id: NodeId) -> usize;
}

View File

@ -22,6 +22,7 @@ nomos-network = { path = "../../nomos-services/network", features = ["waku"] }
nomos-log = { path = "../../nomos-services/log" }
nomos-mempool = { path = "../../nomos-services/mempool", features = ["waku", "mock"] }
nomos-http = { path = "../../nomos-services/http", features = ["http"] }
nomos-consensus = { path = "../../nomos-services/consensus", features = ["waku"] }
tracing-subscriber = "0.3"
tokio = {version = "1.24", features = ["sync"] }
serde_json = "1.0"

View File

@ -3,13 +3,19 @@ mod tx;
use clap::Parser;
use color_eyre::eyre::{eyre, Result};
use nomos_consensus::{
network::adapters::waku::WakuAdapter as ConsensusWakuAdapter, overlay::FlatRoundRobin,
CarnotConsensus,
};
use nomos_core::fountain::mock::MockFountain;
use nomos_http::backends::axum::AxumBackend;
use nomos_http::bridge::{HttpBridge, HttpBridgeService, HttpBridgeSettings};
use nomos_http::http::HttpService;
use nomos_log::Logger;
use nomos_mempool::backend::mockpool::MockPool;
use nomos_mempool::network::adapters::waku::WakuAdapter;
use nomos_mempool::MempoolService;
use nomos_mempool::{
backend::mockpool::MockPool, network::adapters::waku::WakuAdapter as MempoolWakuAdapter,
MempoolService,
};
use nomos_network::{backends::waku::Waku, NetworkService};
use overwatch_derive::*;
use overwatch_rs::{
@ -28,18 +34,28 @@ struct Args {
config: std::path::PathBuf,
}
type Carnot = CarnotConsensus<
ConsensusWakuAdapter,
MockPool<Tx>,
MempoolWakuAdapter<Tx>,
MockFountain,
FlatRoundRobin,
>;
#[derive(Deserialize)]
struct Config {
log: <Logger as ServiceData>::Settings,
network: <NetworkService<Waku> as ServiceData>::Settings,
http: <HttpService<AxumBackend> as ServiceData>::Settings,
consensus: <Carnot as ServiceData>::Settings,
}
#[derive(Services)]
struct MockPoolNode {
logging: ServiceHandle<Logger>,
network: ServiceHandle<NetworkService<Waku>>,
mockpool: ServiceHandle<MempoolService<WakuAdapter<Tx>, MockPool<Tx>>>,
mockpool: ServiceHandle<MempoolService<MempoolWakuAdapter<Tx>, MockPool<Tx>>>,
consensus: ServiceHandle<Carnot>,
http: ServiceHandle<HttpService<AxumBackend>>,
bridges: ServiceHandle<HttpBridgeService>,
}
@ -80,6 +96,7 @@ fn main() -> Result<()> {
logging: config.log,
http: config.http,
mockpool: (),
consensus: config.consensus,
bridges: HttpBridgeSettings { bridges },
},
None,

View File

@ -3,7 +3,7 @@ use nomos_core::tx::{Transaction, TransactionHasher};
use serde::{Deserialize, Serialize};
use std::hash::Hash;
#[derive(Clone, Debug, Serialize, Deserialize, Hash)]
#[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq, Eq)]
pub struct Tx(pub String);
fn hash_tx(tx: &Tx) -> String {

View File

@ -12,7 +12,7 @@ authors = [
async-trait = { version = "0.1" }
blake2 = { version = "0.10" }
bytes = "1.3"
consensus-engine = { path = "../consensus-engine"}
consensus-engine = { path = "../consensus-engine", features = ["serde"]}
futures = "0.3"
nomos-network = { path = "../nomos-services/network", optional = true }
raptorq = { version = "1.7", optional = true }
@ -20,7 +20,7 @@ serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0"
bincode = "1.3"
once_cell = "1.0"
indexmap = { version = "1.9", features = ["serde-1"] }
indexmap = { version = "1.9", features = ["serde"] }
serde_json = { version = "1", optional = true }
[dev-dependencies]

View File

@ -1,4 +1,4 @@
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct AccountId;

View File

@ -2,7 +2,10 @@ use indexmap::IndexSet;
// std
use core::hash::Hash;
// crates
use crate::wire;
use bytes::Bytes;
use consensus_engine::{Qc, View};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
// internal
@ -10,39 +13,34 @@ pub type TxHash = [u8; 32];
/// A block
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Block<Qc: Clone, TxId: Clone + Eq + Hash> {
header: BlockHeader<Qc>,
pub struct Block<TxId: Clone + Eq + Hash> {
header: consensus_engine::Block,
transactions: IndexSet<TxId>,
}
/// A block header
#[derive(Copy, Clone, Default, Debug, Serialize, Deserialize)]
pub struct BlockHeader<Qc: Clone> {
id: BlockId,
qc: Qc,
}
/// Identifier of a block
pub type BlockId = [u8; 32];
impl<Qc: Clone, TxId: Clone + Eq + Hash> Block<Qc, TxId> {
pub fn new(qc: Qc, txs: impl Iterator<Item = TxId>) -> Self {
impl<TxId: Clone + Eq + Hash + Serialize + DeserializeOwned> Block<TxId> {
pub fn new(view: View, parent_qc: Qc, txs: impl Iterator<Item = TxId>) -> Self {
let transactions = txs.collect();
// FIXME: Calculate header Id
let header = BlockHeader { id: [0; 32], qc };
Self {
let header = consensus_engine::Block {
id: [view as u8; 32],
view,
parent_qc,
};
let mut s = Self {
header,
transactions,
}
};
let id = id_from_wire_content(&s.as_bytes());
s.header.id = id;
s
}
/// Encode block into bytes
pub fn as_bytes(&self) -> Bytes {
Bytes::new()
}
pub fn header(&self) -> BlockHeader<Qc> {
self.header.clone()
pub fn header(&self) -> &consensus_engine::Block {
&self.header
}
pub fn transactions(&self) -> impl Iterator<Item = &TxId> + '_ {
@ -50,12 +48,23 @@ impl<Qc: Clone, TxId: Clone + Eq + Hash> Block<Qc, TxId> {
}
}
impl<Qc: Clone> BlockHeader<Qc> {
pub fn id(&self) -> BlockId {
self.id
fn id_from_wire_content(bytes: &[u8]) -> consensus_engine::BlockId {
use blake2::digest::{consts::U32, Digest};
use blake2::Blake2b;
let mut hasher = Blake2b::<U32>::new();
hasher.update(bytes);
hasher.finalize().into()
}
impl<TxId: Clone + Eq + Hash + Serialize + DeserializeOwned> Block<TxId> {
/// Encode block into bytes
pub fn as_bytes(&self) -> Bytes {
wire::serialize(self).unwrap().into()
}
pub fn qc(&self) -> &Qc {
&self.qc
pub fn from_bytes(bytes: &[u8]) -> Self {
let mut result: Self = wire::deserialize(bytes).unwrap();
result.header.id = id_from_wire_content(bytes);
result
}
}

View File

@ -8,7 +8,7 @@ use crate::fountain::{FountainCode, FountainError};
/// Fountain code that does no protocol at all.
/// Just bypasses the raw bytes into a single chunk and reconstruct from it.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct MockFountain;
#[async_trait]

View File

@ -8,7 +8,7 @@ use crate::tx::{Transaction, TransactionHasher};
mod transaction;
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum Tx {
Transfer(TransferTransaction),
}

View File

@ -6,7 +6,7 @@ use crate::crypto::Signature;
/// Can only be constructed if the signature is valid,
/// but does not imply that it can be successfully applied
/// to the ledger.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TransferTransaction {
pub from: AccountId,
pub to: AccountId,

View File

@ -1,121 +0,0 @@
#![allow(dead_code)]
// TODO: Well, remove this when we actually use the fields from the specification
// std
use std::collections::HashSet;
// crates
use futures::{Stream, StreamExt};
// internal
use crate::block::BlockId;
use crate::crypto::PublicKey;
use crate::vote::Tally;
pub type NodeId = PublicKey;
pub enum QuorumCertificate {
Simple(SimpleQuorumCertificate),
Aggregated(AggregatedQuorumCertificate),
}
impl QuorumCertificate {
pub fn view(&self) -> u64 {
match self {
QuorumCertificate::Simple(qc) => qc.view,
QuorumCertificate::Aggregated(qc) => qc.view,
}
}
}
pub struct SimpleQuorumCertificate {
view: u64,
block: BlockId,
}
pub struct AggregatedQuorumCertificate {
view: u64,
high_qh: Box<QuorumCertificate>,
}
pub struct Vote {
block: BlockId,
view: u64,
voter: NodeId, // TODO: this should be some id, probably the node pk
qc: Option<QuorumCertificate>,
}
impl Vote {
pub fn valid_view(&self, view: u64) -> bool {
self.view == view && self.qc.as_ref().map_or(true, |qc| qc.view() == view - 1)
}
}
#[derive(thiserror::Error, Debug)]
pub enum CarnotTallyError {
#[error("Received invalid vote: {0}")]
InvalidVote(String),
#[error("Did not receive enough votes")]
InsufficientVotes,
}
#[derive(Clone)]
pub struct CarnotTallySettings {
threshold: usize,
// TODO: this probably should be dynamic and should change with the view (?)
participating_nodes: HashSet<NodeId>,
}
pub struct CarnotTally {
settings: CarnotTallySettings,
}
#[async_trait::async_trait]
impl Tally for CarnotTally {
type Vote = Vote;
type Qc = QuorumCertificate;
type Outcome = ();
type TallyError = CarnotTallyError;
type Settings = CarnotTallySettings;
fn new(settings: Self::Settings) -> Self {
Self { settings }
}
async fn tally<S: Stream<Item = Self::Vote> + Unpin + Send>(
&self,
view: u64,
mut vote_stream: S,
) -> Result<(Self::Qc, Self::Outcome), Self::TallyError> {
let mut approved = 0usize;
let mut seen = HashSet::new();
while let Some(vote) = vote_stream.next().await {
// check vote view is valid
if !vote.valid_view(view) {
return Err(CarnotTallyError::InvalidVote("Invalid view".to_string()));
}
// check for duplicated votes
if seen.contains(&vote.voter) {
return Err(CarnotTallyError::InvalidVote(
"Double voted node".to_string(),
));
}
// check for individual nodes votes
if !self.settings.participating_nodes.contains(&vote.voter) {
return Err(CarnotTallyError::InvalidVote(
"Non-participating node".to_string(),
));
}
seen.insert(vote.voter);
approved += 1;
if approved >= self.settings.threshold {
return Ok((
QuorumCertificate::Simple(SimpleQuorumCertificate {
view,
block: vote.block,
}),
(),
));
}
}
Err(CarnotTallyError::InsufficientVotes)
}
}

View File

@ -1,5 +1,6 @@
// std
// crates
use consensus_engine::{Block, View};
use futures::{Stream, StreamExt};
use serde::{Deserialize, Serialize};
// internal
@ -7,11 +8,11 @@ use crate::vote::Tally;
#[derive(Serialize, Deserialize)]
pub struct MockVote {
view: u64,
view: View,
}
impl MockVote {
pub fn view(&self) -> u64 {
pub fn view(&self) -> View {
self.view
}
}
@ -48,6 +49,7 @@ impl Tally for MockTally {
type Vote = MockVote;
type Qc = MockQc;
type Outcome = ();
type Subject = Block;
type TallyError = Error;
type Settings = MockTallySettings;
@ -58,12 +60,12 @@ impl Tally for MockTally {
async fn tally<S: Stream<Item = Self::Vote> + Unpin + Send>(
&self,
view: u64,
block: Block,
mut vote_stream: S,
) -> Result<(Self::Qc, Self::Outcome), Self::TallyError> {
let mut count_votes = 0;
while let Some(vote) = vote_stream.next().await {
if vote.view() != view {
if vote.view() != block.view {
return Err(Error("Invalid vote".into()));
}
count_votes += 1;

View File

@ -1,6 +1,4 @@
pub mod carnot;
pub mod mock;
use futures::Stream;
#[async_trait::async_trait]
@ -8,12 +6,13 @@ pub trait Tally {
type Vote;
type Qc;
type Outcome;
type Subject;
type TallyError;
type Settings: Clone;
fn new(settings: Self::Settings) -> Self;
async fn tally<S: Stream<Item = Self::Vote> + Unpin + Send>(
&self,
view: u64,
subject: Self::Subject,
vote_stream: S,
) -> Result<(Self::Qc, Self::Outcome), Self::TallyError>;
}

View File

@ -7,9 +7,10 @@ edition = "2021"
[dependencies]
async-trait = "0.1"
async-stream = "0.3"
bytes = "1.3"
chrono = "0.4"
consensus-engine = { path = "../../consensus-engine", features = ["serde1"] }
consensus-engine = { path = "../../consensus-engine", features = ["serde"] }
futures = "0.3"
nomos-network = { path = "../network" }
nomos-mempool = { path = "../mempool" }
@ -18,10 +19,12 @@ overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main"
rand_chacha = "0.3"
rand = "0.8"
serde = { version = "1", features = ["derive"] }
thiserror = "1.0"
tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1"
waku-bindings = { version = "0.1.0-rc.2", optional = true}
tokio-util = "0.7"
tracing = "0.1"
waku-bindings = { version = "0.1.0-rc.2", optional = true}
[features]
default = []

View File

@ -1,69 +0,0 @@
// std
use std::marker::PhantomData;
// crates
// internal
use nomos_core::crypto::PrivateKey;
use nomos_core::tx::Transaction;
use nomos_mempool::MempoolMsg;
use super::*;
// TODO: take care of sensitve material
struct Enclave {
key: PrivateKey,
}
pub struct Leadership<Tx: Transaction> {
key: Enclave,
mempool: OutboundRelay<MempoolMsg<Tx>>,
}
pub enum LeadershipResult<'view, Qc: Clone, TxId: Clone + Eq + core::hash::Hash> {
Leader {
block: Block<Qc, TxId>,
_view: PhantomData<&'view u8>,
},
NotLeader {
_view: PhantomData<&'view u8>,
},
}
impl<Tx> Leadership<Tx>
where
Tx: Transaction,
Tx::Hash: Debug,
{
pub fn new(key: PrivateKey, mempool: OutboundRelay<MempoolMsg<Tx>>) -> Self {
Self {
key: Enclave { key },
mempool,
}
}
#[allow(unused, clippy::diverging_sub_expression)]
pub async fn try_propose_block<'view, Qc: Clone>(
&self,
view: &'view View,
tip: &Tip,
qc: Qc,
) -> LeadershipResult<'view, Qc, Tx::Hash> {
// TODO: get the correct ancestor for the tip
// let ancestor_hint = todo!("get the ancestor from the tip");
let ancestor_hint = [0; 32];
if view.is_leader(self.key.key) {
let (tx, rx) = tokio::sync::oneshot::channel();
self.mempool.send(MempoolMsg::View {
ancestor_hint,
reply_channel: tx,
});
let iter = rx.await.unwrap();
LeadershipResult::Leader {
_view: PhantomData,
block: Block::new(qc, iter.map(|ref tx| <Tx as Transaction>::hash(tx))),
}
} else {
LeadershipResult::NotLeader { _view: PhantomData }
}
}
}

View File

@ -4,29 +4,39 @@
//! are always synchronized (i.e. it cannot happen that we accidentally use committees from different views).
//! It's obviously extremely important that the information contained in `View` is synchronized across different
//! nodes, but that has to be achieved through different means.
mod leadership;
mod network;
pub mod network;
pub mod overlay;
mod tip;
mod tally;
mod view_cancel;
// std
use std::collections::BTreeMap;
use std::collections::HashSet;
use std::fmt::Debug;
use std::hash::Hash;
use std::pin::Pin;
use std::time::Duration;
// crates
use serde::{Deserialize, Serialize};
use futures::{stream::FuturesUnordered, Future, Stream, StreamExt};
use serde::Deserialize;
use serde::{de::DeserializeOwned, Serialize};
// internal
use crate::network::messages::{NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg};
use crate::network::NetworkAdapter;
use leadership::{Leadership, LeadershipResult};
use crate::tally::{happy::CarnotTally, unhappy::NewViewTally, CarnotTallySettings};
use crate::view_cancel::ViewCancelCache;
use consensus_engine::{
AggregateQc, Carnot, Committee, NewView, Overlay, Payload, Qc, StandardQc, Timeout, TimeoutQc,
Vote,
};
use nomos_core::block::Block;
use nomos_core::crypto::PublicKey;
use nomos_core::fountain::FountainCode;
use nomos_core::staking::Stake;
use nomos_core::tx::Transaction;
use nomos_core::vote::Tally;
use nomos_mempool::{backend::MemPool, network::NetworkAdapter as MempoolAdapter, MempoolService};
use nomos_mempool::{
backend::MemPool, network::NetworkAdapter as MempoolAdapter, MempoolMsg, MempoolService,
};
use nomos_network::NetworkService;
use overlay::Overlay;
use overwatch_rs::services::relay::{OutboundRelay, Relay};
use overwatch_rs::services::{
handle::ServiceStateHandle,
@ -34,54 +44,54 @@ use overwatch_rs::services::{
state::{NoOperator, NoState},
ServiceCore, ServiceData, ServiceId,
};
use tip::Tip;
// TODO: tale this from config
const TIMEOUT: Duration = Duration::from_secs(60);
// Raw bytes for now, could be a ed25519 public key
pub type NodeId = PublicKey;
// Random seed for each round provided by the protocol
pub type Seed = [u8; 32];
#[derive(Debug)]
pub struct CarnotSettings<Fountain: FountainCode, VoteTally: Tally> {
#[derive(Debug, Deserialize, Serialize)]
pub struct CarnotSettings<Fountain: FountainCode> {
private_key: [u8; 32],
fountain_settings: Fountain::Settings,
tally_settings: VoteTally::Settings,
nodes: Vec<NodeId>,
}
impl<Fountain: FountainCode, VoteTally: Tally> Clone for CarnotSettings<Fountain, VoteTally> {
impl<Fountain: FountainCode> Clone for CarnotSettings<Fountain> {
fn clone(&self) -> Self {
Self {
private_key: self.private_key,
fountain_settings: self.fountain_settings.clone(),
tally_settings: self.tally_settings.clone(),
nodes: self.nodes.clone(),
}
}
}
impl<Fountain: FountainCode, VoteTally: Tally> CarnotSettings<Fountain, VoteTally> {
impl<Fountain: FountainCode> CarnotSettings<Fountain> {
#[inline]
pub const fn new(
private_key: [u8; 32],
fountain_settings: Fountain::Settings,
tally_settings: VoteTally::Settings,
nodes: Vec<NodeId>,
) -> Self {
Self {
private_key,
fountain_settings,
tally_settings,
nodes,
}
}
}
pub struct CarnotConsensus<A, P, M, F, T, O>
pub struct CarnotConsensus<A, P, M, F, O>
where
F: FountainCode,
A: NetworkAdapter,
M: MempoolAdapter<Tx = P::Tx>,
P: MemPool,
T: Tally,
T::Qc: Clone,
O: Overlay<A, F, T, <P::Tx as Transaction>::Hash>,
O: Overlay + Debug,
P::Tx: Transaction + Debug + 'static,
<P::Tx as Transaction>::Hash: Debug,
A::Backend: 'static,
@ -92,44 +102,38 @@ where
network_relay: Relay<NetworkService<A::Backend>>,
mempool_relay: Relay<MempoolService<M, P>>,
_fountain: std::marker::PhantomData<F>,
_tally: std::marker::PhantomData<T>,
_overlay: std::marker::PhantomData<O>,
}
impl<A, P, M, F, T, O> ServiceData for CarnotConsensus<A, P, M, F, T, O>
impl<A, P, M, F, O> ServiceData for CarnotConsensus<A, P, M, F, O>
where
F: FountainCode,
A: NetworkAdapter,
P: MemPool,
T: Tally,
T::Qc: Clone,
P::Tx: Transaction + Debug,
<P::Tx as Transaction>::Hash: Debug,
M: MempoolAdapter<Tx = P::Tx>,
O: Overlay<A, F, T, <P::Tx as Transaction>::Hash>,
O: Overlay + Debug,
{
const SERVICE_ID: ServiceId = "Carnot";
type Settings = CarnotSettings<F, T>;
type Settings = CarnotSettings<F>;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = NoMessage;
}
#[async_trait::async_trait]
impl<A, P, M, F, T, O> ServiceCore for CarnotConsensus<A, P, M, F, T, O>
impl<A, P, M, F, O> ServiceCore for CarnotConsensus<A, P, M, F, O>
where
F: FountainCode + Send + Sync + 'static,
A: NetworkAdapter + Send + Sync + 'static,
F: FountainCode + Clone + Send + Sync + 'static,
A: NetworkAdapter + Clone + Send + Sync + 'static,
P: MemPool + Send + Sync + 'static,
T: Tally + Send + Sync + 'static,
T::Settings: Send + Sync + 'static,
T::Outcome: Send + Sync,
T::Qc: Clone + Send + Sync,
P::Settings: Send + Sync + 'static,
P::Tx: Debug + Clone + serde::de::DeserializeOwned + Send + Sync + 'static,
P::Tx:
Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
<P::Tx as Transaction>::Hash: Debug + Send + Sync,
M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static,
O: Overlay<A, F, T, <P::Tx as Transaction>::Hash> + Send + Sync + 'static,
O: Overlay + Debug + Send + Sync + 'static,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay();
@ -138,7 +142,6 @@ where
service_state,
network_relay,
_fountain: Default::default(),
_tally: Default::default(),
_overlay: Default::default(),
mempool_relay,
})
@ -160,224 +163,455 @@ where
let CarnotSettings {
private_key,
fountain_settings,
tally_settings,
nodes,
} = self.service_state.settings_reader.get_updated_settings();
let network_adapter = A::new(network_relay).await;
let tip = Tip;
let fountain = F::new(fountain_settings);
let tally = T::new(tally_settings);
let leadership = Leadership::<P::Tx>::new(private_key, mempool_relay.clone());
// FIXME: this should be taken from config
let mut cur_view = View {
seed: [0; 32],
staking_keys: BTreeMap::new(),
view_n: 0,
let overlay = O::new(nodes);
let genesis = consensus_engine::Block {
id: [0; 32],
view: 0,
parent_qc: Qc::Standard(StandardQc::genesis()),
};
loop {
// if we want to process multiple views at the same time this can
// be spawned as a separate future
let mut carnot = Carnot::from_genesis(private_key, genesis, overlay);
let network_adapter = A::new(network_relay).await;
let adapter = &network_adapter;
let _self_committee = carnot.self_committee();
let self_committee = &_self_committee;
let _leader_committee = [carnot.id()].into_iter().collect();
let leader_committee = &_leader_committee;
let fountain = F::new(fountain_settings);
let _tally_settings = CarnotTallySettings {
threshold: carnot.super_majority_threshold(),
participating_nodes: carnot.child_committees().into_iter().flatten().collect(),
};
let tally_settings = &_tally_settings;
let _leader_tally_settings = CarnotTallySettings {
threshold: carnot.leader_super_majority_threshold(),
// TODO: add children of root committee
participating_nodes: carnot.root_committee(),
};
let leader_tally_settings = &_leader_tally_settings;
// FIXME: this should probably have a timer to detect failed rounds
let res = cur_view
.resolve::<A, O, _, _, _>(
private_key,
&tip,
&network_adapter,
&fountain,
&tally,
&leadership,
let mut view_cancel_cache = ViewCancelCache::new();
let events: FuturesUnordered<Pin<Box<dyn Future<Output = Event<P::Tx>> + Send>>> =
FuturesUnordered::new();
let genesis_block = carnot.genesis_block();
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
genesis_block.view + 1,
Self::gather_block(adapter, genesis_block.view + 1),
)));
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
genesis_block.view,
Self::gather_votes(
adapter,
self_committee,
genesis_block.clone(),
tally_settings.clone(),
),
)));
if carnot.is_leader_for_view(genesis_block.view + 1) {
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
genesis_block.view + 1,
async move {
let Event::Approve { qc, .. } = Self::gather_votes(
adapter,
leader_committee,
genesis_block,
leader_tally_settings.clone(),
)
.await;
match res {
Ok((block, view)) => {
// resolved block, mark as verified and possibly update the tip
// not sure what mark as verified means, e.g. if we want an event subscription
// system for this to be used for example by the ledger, storage and mempool
.await else { unreachable!() };
Event::ProposeBlock { qc }
},
)));
}
tokio::pin!(events);
while let Some(event) = events.next().await {
let mut output = None;
let prev_view = carnot.current_view();
match event {
Event::Proposal { block, mut stream } => {
tracing::debug!("received proposal {:?}", block);
let block = block.header().clone();
match carnot.receive_block(block.clone()) {
Ok(new_state) => {
let new_view = new_state.current_view();
if new_view != carnot.current_view() {
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
block.view,
Self::gather_votes(
adapter,
self_committee,
block.clone(),
tally_settings.clone(),
),
)));
} else {
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
block.view,
async move {
if let Some(block) = stream.next().await {
Event::Proposal { block, stream }
} else {
Event::None
}
},
)));
}
carnot = new_state;
}
Err(_) => tracing::debug!("invalid block {:?}", block),
}
if carnot.is_leader_for_view(block.view + 1) {
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
block.view,
async move {
let Event::Approve { qc, .. } = Self::gather_votes(
adapter,
leader_committee,
block,
leader_tally_settings.clone(),
)
.await else { unreachable!() };
Event::ProposeBlock { qc }
},
)));
}
}
Event::Approve { block, .. } => {
tracing::debug!("approving proposal {:?}", block);
let (new_carnot, out) = carnot.approve_block(block.clone());
carnot = new_carnot;
output = Some(Output::Send::<P::Tx>(out));
}
Event::LocalTimeout => {
tracing::debug!("local timeout");
let (new_carnot, out) = carnot.local_timeout();
carnot = new_carnot;
output = out.map(Output::Send);
}
Event::NewView {
timeout_qc,
new_views,
} => {
tracing::debug!("approving new view {:?}", timeout_qc);
let (new_carnot, out) = carnot.approve_new_view(timeout_qc.clone(), new_views);
carnot = new_carnot;
output = Some(Output::Send(out));
let next_view = timeout_qc.view + 2;
if carnot.is_leader_for_view(next_view) {
let high_qc = carnot.high_qc();
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
timeout_qc.view + 1,
async move {
let _votes = Self::gather_new_views(
adapter,
leader_committee,
timeout_qc,
leader_tally_settings.clone(),
)
.await;
Event::ProposeBlock {
qc: Qc::Aggregated(AggregateQc {
high_qc,
view: next_view,
}),
}
},
)));
}
}
Event::TimeoutQc { timeout_qc } => {
tracing::debug!("timeout received {:?}", timeout_qc);
carnot = carnot.receive_timeout_qc(timeout_qc.clone());
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
timeout_qc.view + 1,
Self::gather_new_views(
adapter,
self_committee,
timeout_qc,
tally_settings.clone(),
),
)));
}
Event::RootTimeout { timeouts } => {
tracing::debug!("root timeout {:?}", timeouts);
// timeout detected
}
Event::ProposeBlock { qc } => {
tracing::debug!("proposing block");
let (reply_channel, rx) = tokio::sync::oneshot::channel();
mempool_relay
.send(nomos_mempool::MempoolMsg::MarkInBlock {
ids: block.transactions().cloned().collect(),
block: block.header().id(),
.send(MempoolMsg::View {
ancestor_hint: [0; 32],
reply_channel,
})
.await
.map_err(|(e, _)| {
tracing::error!("Error while sending MarkInBlock message: {}", e);
e
})?;
.unwrap_or_else(|(e, _)| {
eprintln!("Could not get transactions from mempool {e}")
});
match rx.await {
Ok(txs) => {
let proposal = Block::new(qc.view() + 1, qc, txs);
output = Some(Output::BroadcastProposal { proposal });
}
Err(e) => tracing::error!("Could not fetch txs {e}"),
}
}
Event::None => {}
}
cur_view = view;
}
Err(e) => {
tracing::error!("Error while resolving view: {}", e);
let current_view = carnot.current_view();
if current_view != prev_view {
// First we cancel previous processing view tasks
view_cancel_cache.cancel(prev_view);
tracing::debug!("Advanced view from {prev_view} to {current_view}");
// View change!
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
current_view,
async {
tokio::time::sleep(TIMEOUT).await;
Event::LocalTimeout
},
)));
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
current_view + 1,
Self::gather_block(adapter, current_view + 1),
)));
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
current_view,
Self::gather_timeout_qc(adapter, current_view),
)));
if carnot.is_member_of_root_committee() {
let threshold = carnot.leader_super_majority_threshold();
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
current_view,
Self::gather_timeout(adapter, self_committee, current_view, threshold),
)));
}
}
if let Some(output) = output {
handle_output(adapter, &fountain, carnot.id(), output).await;
}
}
unreachable!("carnot exited");
}
}
#[allow(dead_code)] // TODO: remove this when using broadcasting events
enum Output<Tx: Clone + Eq + Hash> {
Send(consensus_engine::Send),
BroadcastTimeoutQc { timeout_qc: TimeoutQc },
BroadcastProposal { proposal: Block<Tx> },
}
impl<A, P, M, F, O> CarnotConsensus<A, P, M, F, O>
where
F: FountainCode + Clone + Send + Sync + 'static,
A: NetworkAdapter + Clone + Send + Sync + 'static,
P: MemPool + Send + Sync + 'static,
P::Settings: Send + Sync + 'static,
P::Tx:
Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
<P::Tx as Transaction>::Hash: Debug + Send + Sync,
M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static,
O: Overlay + Debug + Send + Sync + 'static,
{
async fn gather_timeout_qc(adapter: &A, view: consensus_engine::View) -> Event<P::Tx> {
if let Some(timeout_qc) = adapter
.timeout_qc_stream(view)
.await
.map(|msg| msg.qc)
.next()
.await
{
Event::TimeoutQc { timeout_qc }
} else {
Event::None
}
}
async fn gather_votes(
adapter: &A,
committee: &Committee,
block: consensus_engine::Block,
tally: CarnotTallySettings,
) -> Event<P::Tx> {
let tally = CarnotTally::new(tally);
let votes_stream = adapter.votes_stream(committee, block.view, block.id).await;
match tally.tally(block.clone(), votes_stream).await {
Ok((qc, votes)) => Event::Approve { qc, votes, block },
Err(_e) => {
todo!("Handle tally error {_e}");
}
}
}
}
#[derive(Hash, Eq, PartialEq, Serialize, Deserialize)]
pub struct Approval;
// Consensus round, also aids in guaranteeing synchronization
// between various data structures by means of lifetimes
pub struct View {
seed: Seed,
staking_keys: BTreeMap<NodeId, Stake>,
pub view_n: consensus_engine::View,
}
impl View {
// TODO: might want to encode steps in the type system
pub async fn resolve<'view, A, O, F, T, Tx>(
&'view self,
node_id: NodeId,
tip: &Tip,
async fn gather_new_views(
adapter: &A,
fountain: &F,
tally: &T,
leadership: &Leadership<Tx>,
) -> Result<(Block<T::Qc, Tx::Hash>, View), Box<dyn std::error::Error + Send + Sync + 'static>>
where
A: NetworkAdapter + Send + Sync + 'static,
F: FountainCode,
Tx: Transaction,
Tx::Hash: Debug,
T: Tally + Send + Sync + 'static,
T::Outcome: Send + Sync,
T::Qc: Clone,
O: Overlay<A, F, T, Tx::Hash>,
{
let res = if self.is_leader(node_id) {
let block = self
.resolve_leader::<A, O, F, T, _>(node_id, tip, adapter, fountain, tally, leadership)
.await
.unwrap(); // FIXME: handle sad path
let next_view = self.generate_next_view(&block);
(block, next_view)
} else {
self.resolve_non_leader::<A, O, F, T, Tx>(node_id, adapter, fountain, tally)
.await
.unwrap() // FIXME: handle sad path
};
// Commit phase:
// Upon verifing a block B, if B.parent = B' and B'.parent = B'' and
// B'.view = B''.view + 1, then the node commits B''.
// This happens implicitly at the chain level and does not require any
// explicit action from the node.
Ok(res)
}
async fn resolve_leader<'view, A, O, F, T, Tx>(
&'view self,
node_id: NodeId,
tip: &Tip,
adapter: &A,
fountain: &F,
tally: &T,
leadership: &Leadership<Tx>,
) -> Result<Block<T::Qc, <Tx as Transaction>::Hash>, ()>
where
A: NetworkAdapter + Send + Sync + 'static,
F: FountainCode,
T: Tally + Send + Sync + 'static,
T::Outcome: Send + Sync,
T::Qc: Clone,
Tx: Transaction,
Tx::Hash: Debug,
O: Overlay<A, F, T, Tx::Hash>,
{
let overlay = O::new(self, node_id);
// We need to build the QC for the block we are proposing
let qc = overlay.build_qc(self, adapter, tally).await;
let LeadershipResult::Leader { block, _view } = leadership
.try_propose_block(self, tip, qc)
.await else { panic!("we are leader")};
overlay
.broadcast_block(self, block.clone(), adapter, fountain)
committee: &Committee,
timeout_qc: TimeoutQc,
tally: CarnotTallySettings,
) -> Event<P::Tx> {
let tally = NewViewTally::new(tally);
let stream = adapter
.new_view_stream(committee, timeout_qc.view + 1)
.await;
Ok(block)
}
async fn resolve_non_leader<'view, A, O, F, T, Tx>(
&'view self,
node_id: NodeId,
adapter: &A,
fountain: &F,
tally: &T,
) -> Result<(Block<T::Qc, Tx::Hash>, View), ()>
where
A: NetworkAdapter + Send + Sync + 'static,
F: FountainCode,
T: Tally + Send + Sync + 'static,
T::Qc: Clone,
Tx: Transaction,
Tx::Hash: Debug,
O: Overlay<A, F, T, Tx::Hash>,
{
let overlay = O::new(self, node_id);
// Consensus in Carnot is achieved in 2 steps from the point of view of a node:
// 1) The node receives a block proposal from a leader and verifies it
// 2) The node signals to the network its approval for the block.
// Depending on the overlay, this may require waiting for a certain number
// of other approvals.
// 1) Collect and verify block proposal.
let block = overlay
.reconstruct_proposal_block(self, adapter, fountain)
.await
.unwrap(); // FIXME: handle sad path
// TODO: verify
// TODO: reshare the block?
let next_view = self.generate_next_view(&block);
// 2) Signal approval to the network
// We only consider the happy path for now
if self.pipelined_safe_block(&block) {
overlay
.approve_and_forward(self, &block, adapter, tally, &next_view)
.await
.unwrap(); // FIXME: handle sad path
match tally.tally(timeout_qc.clone(), stream).await {
Ok((_qc, new_views)) => Event::NewView {
timeout_qc,
new_views,
},
Err(_e) => {
todo!("Handle tally error {_e}");
}
}
Ok((block, next_view))
}
pub fn is_leader(&self, _node_id: NodeId) -> bool {
true
async fn gather_timeout(
adapter: &A,
committee: &Committee,
view: consensus_engine::View,
threshold: usize,
) -> Event<P::Tx> {
futures::pending!();
let timeouts = adapter
.timeout_stream(committee, view)
.await
.take(threshold)
.map(|msg| msg.vote)
.collect()
.await;
Event::RootTimeout { timeouts }
}
// TODO: use consensus_engine::View instead
pub fn id(&self) -> u64 {
self.view_n.try_into().unwrap()
}
// Verifies the block is new and the previous leader did not fail
fn pipelined_safe_block<Qc: Clone, TxId: Clone + Eq + Hash>(
&self,
_: &Block<Qc, TxId>,
) -> bool {
// return b.view_n >= self.view_n && b.view_n == b.qc.view_n
true
}
fn generate_next_view<Qc: Clone, TxId: Clone + Eq + Hash>(&self, _b: &Block<Qc, TxId>) -> View {
let mut seed = self.seed;
seed[0] += 1;
View {
seed,
staking_keys: self.staking_keys.clone(),
view_n: self.view_n + 1,
async fn gather_block(adapter: &A, view: consensus_engine::View) -> Event<P::Tx> {
let stream = adapter
.proposal_chunks_stream(view)
.await
.filter_map(move |msg| {
async move {
let proposal = Block::from_bytes(&msg.chunk);
if proposal.header().id == msg.proposal {
// TODO: Leader is faulty? what should we do?
Some(proposal)
} else {
None
}
}
});
let mut stream = Box::pin(stream);
if let Some(block) = stream.next().await {
Event::Proposal { block, stream }
} else {
Event::None
}
}
}
async fn handle_output<A, F, Tx>(adapter: &A, fountain: &F, node_id: NodeId, output: Output<Tx>)
where
A: NetworkAdapter,
F: FountainCode,
Tx: Hash + Eq + Clone + Serialize + DeserializeOwned + Debug,
{
match output {
Output::Send(consensus_engine::Send { to, payload }) => match payload {
Payload::Vote(vote) => {
adapter
.send(
&to,
vote.view,
VoteMsg {
voter: node_id,
vote,
qc: None, // TODO: handle root commmittee members
}
.as_bytes(),
"votes",
)
.await;
}
Payload::Timeout(timeout) => {
adapter
.send(
&to,
timeout.view,
TimeoutMsg {
voter: node_id,
vote: timeout,
}
.as_bytes(),
"timeout",
)
.await;
}
Payload::NewView(new_view) => {
adapter
.send(
&to,
new_view.view,
NewViewMsg {
voter: node_id,
vote: new_view,
}
.as_bytes(),
"new-view",
)
.await;
}
},
Output::BroadcastProposal { proposal } => {
fountain
.encode(&proposal.as_bytes())
.for_each(|chunk| {
adapter.broadcast_block_chunk(ProposalChunkMsg {
proposal: proposal.header().id,
chunk: chunk.to_vec().into_boxed_slice(),
view: proposal.header().view,
})
})
.await;
}
Output::BroadcastTimeoutQc { timeout_qc } => {
adapter
.broadcast_timeout_qc(TimeoutQcMsg {
source: node_id,
qc: timeout_qc,
})
.await;
}
}
}
pub(crate) enum Event<Tx: Clone + Hash + Eq> {
Proposal {
block: Block<Tx>,
stream: Pin<Box<dyn Stream<Item = Block<Tx>> + Send>>,
},
#[allow(dead_code)]
Approve {
qc: Qc,
block: consensus_engine::Block,
votes: HashSet<Vote>,
},
LocalTimeout,
NewView {
timeout_qc: TimeoutQc,
new_views: HashSet<NewView>,
},
TimeoutQc {
timeout_qc: TimeoutQc,
},
RootTimeout {
timeouts: HashSet<Timeout>,
},
ProposeBlock {
qc: Qc,
},
None,
}

View File

@ -1,4 +1,3 @@
use bytes::Bytes;
use futures::StreamExt;
use nomos_network::{
backends::mock::{
@ -7,25 +6,21 @@ use nomos_network::{
NetworkMsg, NetworkService,
};
use overwatch_rs::services::{relay::OutboundRelay, ServiceData};
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio_stream::{wrappers::BroadcastStream, Stream};
use crate::network::messages::TimeoutQcMsg;
use crate::{
network::{
messages::{ProposalChunkMsg, VoteMsg},
NetworkAdapter,
},
overlay::committees::Committee,
use crate::network::messages::{NewViewMsg, TimeoutMsg, TimeoutQcMsg};
use crate::network::{
messages::{ProposalChunkMsg, VoteMsg},
NetworkAdapter,
};
use consensus_engine::{TimeoutQc, View};
use consensus_engine::{BlockId, Committee, View};
const MOCK_PUB_SUB_TOPIC: &str = "MockPubSubTopic";
const MOCK_BLOCK_CONTENT_TOPIC: MockContentTopic = MockContentTopic::new("MockSim", 1, "MockBlock");
const MOCK_APPROVAL_CONTENT_TOPIC: MockContentTopic =
MockContentTopic::new("MockSim", 1, "MockApproval");
#[derive(Clone)]
pub struct MockAdapter {
network_relay: OutboundRelay<<NetworkService<Mock> as ServiceData>::Message>,
}
@ -65,7 +60,7 @@ impl NetworkAdapter for MockAdapter {
async fn proposal_chunks_stream(
&self,
_view: View,
) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin> {
) -> Box<dyn Stream<Item = ProposalChunkMsg> + Send + Sync + Unpin> {
let stream_channel = self
.message_subscriber_channel()
.await
@ -80,9 +75,7 @@ impl NetworkAdapter for MockAdapter {
== message.content_topic().content_topic_name
{
let payload = message.payload();
Some(Bytes::from(
ProposalChunkMsg::from_bytes(payload.as_bytes()).chunk,
))
Some(ProposalChunkMsg::from_bytes(payload.as_bytes()))
} else {
None
}
@ -117,24 +110,33 @@ impl NetworkAdapter for MockAdapter {
todo!()
}
async fn timeout_qc_stream(
async fn timeout_stream(
&self,
_committee: &Committee,
_view: View,
) -> Box<dyn Stream<Item = TimeoutQc> + Send + Sync + Unpin> {
) -> Box<dyn Stream<Item = TimeoutMsg> + Send + Sync + Unpin> {
todo!()
}
async fn votes_stream<Vote: DeserializeOwned>(
async fn timeout_qc_stream(
&self,
_committee: Committee,
_view: View,
) -> Box<dyn Stream<Item = Vote> + Send> {
) -> Box<dyn Stream<Item = TimeoutQcMsg> + Send + Sync + Unpin> {
todo!()
}
async fn votes_stream(
&self,
_committee: &Committee,
_view: View,
_proposal_id: BlockId,
) -> Box<dyn Stream<Item = VoteMsg> + Send + Unpin> {
let stream_channel = self
.message_subscriber_channel()
.await
.unwrap_or_else(|_e| todo!("handle error"));
Box::new(
BroadcastStream::new(stream_channel).filter_map(|msg| async move {
Box::new(Box::pin(BroadcastStream::new(stream_channel).filter_map(
|msg| async move {
match msg {
Ok(event) => match event {
NetworkEvent::RawMessage(message) => {
@ -142,7 +144,7 @@ impl NetworkAdapter for MockAdapter {
== message.content_topic().content_topic_name
{
let payload = message.payload();
Some(VoteMsg::from_bytes(payload.as_bytes()).vote)
Some(VoteMsg::from_bytes(payload.as_bytes()))
} else {
None
}
@ -150,20 +152,21 @@ impl NetworkAdapter for MockAdapter {
},
Err(_e) => None,
}
}),
)
},
)))
}
async fn send_vote<Vote: Serialize>(
async fn new_view_stream(
&self,
_committee: Committee,
_committee: &Committee,
_view: View,
approval_message: VoteMsg<Vote>,
) where
Vote: Send,
{
) -> Box<dyn Stream<Item = NewViewMsg> + Send + Unpin> {
todo!()
}
async fn send(&self, _committee: &Committee, _view: View, payload: Box<[u8]>, _channel: &str) {
let message = MockMessage::new(
String::from_utf8_lossy(&approval_message.as_bytes()).to_string(),
String::from_utf8_lossy(&payload).to_string(),
MOCK_APPROVAL_CONTENT_TOPIC,
1,
chrono::Utc::now().timestamp_nanos() as usize,

View File

@ -1,24 +1,22 @@
// std
use std::borrow::Cow;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
// crates
use bytes::Bytes;
use futures::{Stream, StreamExt};
use tokio_stream::wrappers::BroadcastStream;
// internal
use crate::network::messages::TimeoutQcMsg;
use crate::network::messages::{NewViewMsg, TimeoutMsg, TimeoutQcMsg};
use crate::network::{
messages::{ProposalChunkMsg, VoteMsg},
NetworkAdapter,
};
use crate::overlay::committees::Committee;
use consensus_engine::{TimeoutQc, View};
use consensus_engine::{BlockId, Committee, View};
use nomos_network::{
backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage},
NetworkMsg, NetworkService,
};
use overwatch_rs::services::{relay::OutboundRelay, ServiceData};
use serde::de::DeserializeOwned;
use serde::Serialize;
use waku_bindings::{
ContentFilter, Encoding, StoreQuery, WakuContentTopic, WakuMessage, WakuPubSubTopic,
};
@ -29,6 +27,7 @@ pub const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic =
const APPLICATION_NAME: &str = "CarnotSim";
const VERSION: usize = 1;
#[derive(Clone)]
pub struct WakuAdapter {
network_relay: OutboundRelay<<NetworkService<Waku> as ServiceData>::Message>,
}
@ -146,19 +145,16 @@ impl NetworkAdapter for WakuAdapter {
async fn proposal_chunks_stream(
&self,
view: View,
) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin> {
) -> Box<dyn Stream<Item = ProposalChunkMsg> + Send + Sync + Unpin> {
Box::new(Box::pin(
self.cached_stream_with_content_topic(PROPOSAL_CONTENT_TOPIC.clone())
self.cached_stream_with_content_topic(PROPOSAL_CONTENT_TOPIC)
.await
.filter_map(move |message| {
let payload = message.payload();
let ProposalChunkMsg {
view: msg_view,
chunk,
} = ProposalChunkMsg::from_bytes(payload);
let proposal = ProposalChunkMsg::from_bytes(payload);
async move {
if view == msg_view {
Some(Bytes::from(chunk))
if view == proposal.view {
Some(proposal)
} else {
None
}
@ -168,8 +164,24 @@ impl NetworkAdapter for WakuAdapter {
}
async fn broadcast_block_chunk(&self, chunk_message: ProposalChunkMsg) {
self.broadcast(chunk_message.as_bytes(), PROPOSAL_CONTENT_TOPIC)
let message = WakuMessage::new(
chunk_message.as_bytes(),
PROPOSAL_CONTENT_TOPIC,
1,
chrono::Utc::now().timestamp_nanos() as usize,
[],
false,
);
if let Err((_, _e)) = self
.network_relay
.send(NetworkMsg::Process(WakuBackendMessage::Broadcast {
message,
topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC),
}))
.await
{
todo!("log error");
};
}
async fn broadcast_timeout_qc(&self, timeout_qc_msg: TimeoutQcMsg) {
@ -177,18 +189,41 @@ impl NetworkAdapter for WakuAdapter {
.await
}
async fn timeout_qc_stream(
async fn timeout_stream(
&self,
committee: &Committee,
view: View,
) -> Box<dyn Stream<Item = TimeoutQc> + Send + Sync + Unpin> {
) -> Box<dyn Stream<Item = TimeoutMsg> + Send + Sync + Unpin> {
let content_topic = create_topic("timeout", committee, view);
Box::new(Box::pin(
self.cached_stream_with_content_topic(TIMEOUT_QC_CONTENT_TOPIC.clone())
self.cached_stream_with_content_topic(content_topic)
.await
.filter_map(move |message| {
let payload = message.payload();
let qc = TimeoutQcMsg::from_bytes(payload).qc;
let timeout = TimeoutMsg::from_bytes(payload);
async move {
if qc.view > view {
if timeout.vote.view == view {
Some(timeout)
} else {
None
}
}
}),
))
}
async fn timeout_qc_stream(
&self,
view: View,
) -> Box<dyn Stream<Item = TimeoutQcMsg> + Send + Sync + Unpin> {
Box::new(Box::pin(
self.cached_stream_with_content_topic(TIMEOUT_QC_CONTENT_TOPIC)
.await
.filter_map(move |message| {
let payload = message.payload();
let qc = TimeoutQcMsg::from_bytes(payload);
async move {
if qc.qc.view > view {
Some(qc)
} else {
None
@ -198,32 +233,51 @@ impl NetworkAdapter for WakuAdapter {
))
}
async fn votes_stream<Vote: DeserializeOwned>(
async fn votes_stream(
&self,
committee: Committee,
committee: &Committee,
view: View,
) -> Box<dyn Stream<Item = Vote> + Send> {
let content_topic = votes_topic(committee, view);
proposal_id: BlockId,
) -> Box<dyn Stream<Item = VoteMsg> + Send + Unpin> {
let content_topic = create_topic("votes", committee, view);
Box::new(Box::pin(
self.cached_stream_with_content_topic(content_topic)
.await
.filter_map(move |message| {
let payload = message.payload();
let vote = VoteMsg::from_bytes(payload);
async move {
if vote.vote.block == proposal_id {
Some(vote)
} else {
None
}
}
}),
))
}
async fn new_view_stream(
&self,
committee: &Committee,
view: View,
) -> Box<dyn Stream<Item = NewViewMsg> + Send + Unpin> {
let content_topic = create_topic("votes", committee, view);
Box::new(Box::pin(
self.cached_stream_with_content_topic(content_topic)
.await
.map(|message| {
let payload = message.payload();
VoteMsg::from_bytes(payload).vote
NewViewMsg::from_bytes(payload)
}),
))
}
async fn send_vote<Vote: Serialize + Send>(
&self,
committee: Committee,
view: View,
approval_message: VoteMsg<Vote>,
) {
let content_topic = votes_topic(committee, view);
async fn send(&self, committee: &Committee, view: View, payload: Box<[u8]>, channel: &str) {
let content_topic = create_topic(channel, committee, view);
let message = WakuMessage::new(
approval_message.as_bytes(),
payload,
content_topic,
1,
chrono::Utc::now().timestamp_nanos() as usize,
@ -234,7 +288,7 @@ impl NetworkAdapter for WakuAdapter {
.network_relay
.send(NetworkMsg::Process(WakuBackendMessage::Broadcast {
message,
topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()),
topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC),
}))
.await
{
@ -243,11 +297,11 @@ impl NetworkAdapter for WakuAdapter {
}
}
fn votes_topic(committee: Committee, view: View) -> WakuContentTopic {
fn create_topic(tag: &str, committee: &Committee, view: View) -> WakuContentTopic {
WakuContentTopic {
application_name: Cow::Borrowed(APPLICATION_NAME),
version: VERSION,
content_topic_name: Cow::Owned(format!("votes-{}-{}", committee.id(), view)),
content_topic_name: Cow::Owned(format!("{}-{}-{}", tag, hash_set(committee), view)),
encoding: Encoding::Proto,
}
}
@ -256,3 +310,12 @@ const PROPOSAL_CONTENT_TOPIC: WakuContentTopic =
WakuContentTopic::new(APPLICATION_NAME, VERSION, "proposal", Encoding::Proto);
const TIMEOUT_QC_CONTENT_TOPIC: WakuContentTopic =
WakuContentTopic::new(APPLICATION_NAME, VERSION, "timeout-qc", Encoding::Proto);
// TODO: Maybe use a secure hasher instead
fn hash_set(c: &Committee) -> u64 {
let mut s = DefaultHasher::new();
for e in c.iter() {
e.hash(&mut s);
}
s.finish()
}

View File

@ -1,15 +1,15 @@
// std
// crates
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
// internal
use crate::NodeId;
use consensus_engine::{TimeoutQc, View};
use consensus_engine::{BlockId, NewView, Qc, Timeout, TimeoutQc, View, Vote};
use nomos_core::wire;
#[derive(Clone, Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct ProposalChunkMsg {
pub chunk: Box<[u8]>,
pub proposal: BlockId,
pub view: View,
}
@ -23,25 +23,47 @@ impl ProposalChunkMsg {
}
}
#[derive(Serialize, Deserialize)]
pub struct VoteMsg<Vote> {
pub source: NodeId,
#[derive(Eq, PartialEq, Hash, Serialize, Deserialize, Clone)]
pub struct VoteMsg {
pub voter: NodeId,
pub vote: Vote,
pub qc: Option<Qc>,
}
impl<Vote> VoteMsg<Vote>
where
Vote: Serialize,
{
impl VoteMsg {
pub fn as_bytes(&self) -> Box<[u8]> {
wire::serialize(self).unwrap().into_boxed_slice()
}
pub fn from_bytes(data: &[u8]) -> Self {
wire::deserialize(data).unwrap()
}
}
impl<Vote> VoteMsg<Vote>
where
Vote: DeserializeOwned,
{
#[derive(Serialize, Deserialize)]
pub struct NewViewMsg {
pub voter: NodeId,
pub vote: NewView,
}
impl NewViewMsg {
pub fn as_bytes(&self) -> Box<[u8]> {
wire::serialize(self).unwrap().into_boxed_slice()
}
pub fn from_bytes(data: &[u8]) -> Self {
wire::deserialize(data).unwrap()
}
}
#[derive(Serialize, Deserialize)]
pub struct TimeoutMsg {
pub voter: NodeId,
pub vote: Timeout,
}
impl TimeoutMsg {
pub fn as_bytes(&self) -> Box<[u8]> {
wire::serialize(self).unwrap().into_boxed_slice()
}
pub fn from_bytes(data: &[u8]) -> Self {
wire::deserialize(data).unwrap()
}
@ -57,7 +79,6 @@ impl TimeoutQcMsg {
pub fn as_bytes(&self) -> Box<[u8]> {
wire::serialize(self).unwrap().into_boxed_slice()
}
pub fn from_bytes(data: &[u8]) -> Self {
wire::deserialize(data).unwrap()
}

View File

@ -2,19 +2,15 @@ pub mod adapters;
pub mod messages;
// std
use bytes::Bytes;
// crates
use futures::Stream;
// internal
use crate::network::messages::{ProposalChunkMsg, TimeoutQcMsg, VoteMsg};
use crate::overlay::committees::Committee;
use consensus_engine::{TimeoutQc, View};
use crate::network::messages::{NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg};
use consensus_engine::{BlockId, Committee, View};
use nomos_network::backends::NetworkBackend;
use nomos_network::NetworkService;
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use serde::de::DeserializeOwned;
use serde::Serialize;
#[async_trait::async_trait]
pub trait NetworkAdapter {
@ -25,22 +21,28 @@ pub trait NetworkAdapter {
async fn proposal_chunks_stream(
&self,
view: View,
) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin>;
) -> Box<dyn Stream<Item = ProposalChunkMsg> + Send + Sync + Unpin>;
async fn broadcast_block_chunk(&self, chunk_msg: ProposalChunkMsg);
async fn broadcast_timeout_qc(&self, timeout_qc_msg: TimeoutQcMsg);
async fn timeout_stream(
&self,
committee: &Committee,
view: View,
) -> Box<dyn Stream<Item = TimeoutMsg> + Send + Sync + Unpin>;
async fn timeout_qc_stream(
&self,
view: View,
) -> Box<dyn Stream<Item = TimeoutQc> + Send + Sync + Unpin>;
async fn votes_stream<Vote: DeserializeOwned>(
) -> Box<dyn Stream<Item = TimeoutQcMsg> + Send + Sync + Unpin>;
async fn votes_stream(
&self,
committee: Committee,
committee: &Committee,
view: View,
) -> Box<dyn Stream<Item = Vote> + Send>;
async fn send_vote<Vote: Serialize + Send>(
proposal_id: BlockId,
) -> Box<dyn Stream<Item = VoteMsg> + Send + Unpin>;
async fn new_view_stream(
&self,
committee: Committee,
committee: &Committee,
view: View,
vote: VoteMsg<Vote>,
);
) -> Box<dyn Stream<Item = NewViewMsg> + Send + Unpin>;
async fn send(&self, committee: &Committee, view: View, payload: Box<[u8]>, channel: &str);
}

View File

@ -0,0 +1,65 @@
use consensus_engine::{Committee, NodeId, Overlay, View};
#[derive(Clone, Debug)]
/// Flat overlay with a single committee and round robin leader selection.
pub struct FlatRoundRobin {
nodes: Vec<NodeId>,
}
impl Overlay for FlatRoundRobin {
fn new(nodes: Vec<NodeId>) -> Self {
Self { nodes }
}
fn root_committee(&self) -> consensus_engine::Committee {
self.nodes.clone().into_iter().collect()
}
fn rebuild(&mut self, _timeout_qc: consensus_engine::TimeoutQc) {
todo!()
}
fn is_member_of_child_committee(&self, _parent: NodeId, _child: NodeId) -> bool {
false
}
fn is_member_of_root_committee(&self, _id: NodeId) -> bool {
true
}
fn is_member_of_leaf_committee(&self, _id: NodeId) -> bool {
true
}
fn is_child_of_root_committee(&self, _id: NodeId) -> bool {
false
}
fn parent_committee(&self, _id: NodeId) -> consensus_engine::Committee {
Committee::new()
}
fn node_committee(&self, _id: NodeId) -> consensus_engine::Committee {
self.nodes.clone().into_iter().collect()
}
fn child_committees(&self, _id: NodeId) -> Vec<consensus_engine::Committee> {
vec![]
}
fn leaf_committees(&self, _id: NodeId) -> Vec<consensus_engine::Committee> {
vec![self.root_committee()]
}
fn leader(&self, view: View) -> NodeId {
self.nodes[view as usize % self.nodes.len()]
}
fn super_majority_threshold(&self, _id: NodeId) -> usize {
0
}
fn leader_super_majority_threshold(&self, _id: NodeId) -> usize {
self.nodes.len() * 2 / 3 + 1
}
}

View File

@ -1,189 +0,0 @@
// std
use std::hash::Hash;
// crates
use futures::StreamExt;
use nomos_core::wire::deserializer;
use rand::{seq::SliceRandom, SeedableRng};
// internal
use super::*;
use crate::network::messages::ProposalChunkMsg;
use crate::network::NetworkAdapter;
/// View of the tree overlay centered around a specific member
pub struct Member<const C: usize> {
// id is not used now, but gonna probably used it for later checking later on
#[allow(dead_code)]
id: NodeId,
committee: Committee,
committees: Committees<C>,
view_n: consensus_engine::View,
}
/// #Just a newtype index to be able to implement parent/children methods
#[derive(Copy, Clone)]
pub struct Committee(usize);
pub struct Committees<const C: usize> {
nodes: Box<[NodeId]>,
}
impl<const C: usize> Committees<C> {
pub fn new(view: &View) -> Self {
let mut nodes = view.staking_keys.keys().cloned().collect::<Box<[NodeId]>>();
let mut rng = rand_chacha::ChaCha20Rng::from_seed(view.seed);
nodes.shuffle(&mut rng);
Self { nodes }
}
pub fn into_member(self, id: NodeId, view: &View) -> Option<Member<C>> {
let member_idx = self.nodes.iter().position(|m| m == &id)?;
Some(Member {
committee: Committee(member_idx / C),
committees: self,
id,
view_n: view.view_n,
})
}
fn get_committee_members(&self, committee: Committee) -> Option<&[NodeId]> {
let leftb = committee.0 * C;
let rightb = std::cmp::min(self.nodes.len(), leftb + C);
if leftb < rightb {
Some(&self.nodes[leftb..rightb])
} else {
None
}
}
}
impl Committee {
pub const fn root() -> Self {
Self(0)
}
pub fn id(&self) -> usize {
self.0
}
/// Return the left and right children committee, if any
pub fn children(&self) -> (Committee, Committee) {
(
// left child
Committee(self.0 * 2 + 1),
// right child
Committee(self.0 + 2 + 2),
)
}
/// Return the parent committee, if any
pub fn parent(&self) -> Option<Committee> {
if self.0 == 0 {
None
} else {
Some(Committee((self.0 - 1) / 2))
}
}
}
impl<const C: usize> Member<C> {
/// Return other members of this committee
pub fn peers(&self) -> &[NodeId] {
self.committees
.get_committee_members(self.committee)
.unwrap()
}
/// Return the participant in the parent committee this member should interact
/// with
pub fn parent_committee(&self) -> Option<Committee> {
self.committee.parent()
}
// Return participants in the children committees this member should interact with
pub fn children_committes(&self) -> (Option<Committee>, Option<Committee>) {
let (left, right) = self.committee.children();
(
self.committees.get_committee_members(left).map(|_| left),
self.committees.get_committee_members(right).map(|_| right),
)
}
}
#[async_trait::async_trait]
impl<Network, Fountain, VoteTally, TxId, const C: usize> Overlay<Network, Fountain, VoteTally, TxId>
for Member<C>
where
Network: NetworkAdapter + Sync,
Fountain: FountainCode + Sync,
VoteTally: Tally + Sync,
VoteTally::Qc: serde::de::DeserializeOwned + Clone + Send + Sync + 'static,
TxId: serde::de::DeserializeOwned + Clone + Hash + Eq + Send + Sync + 'static,
{
// we still need view here to help us initialize
fn new(view: &View, node: NodeId) -> Self {
let committees = Committees::new(view);
committees.into_member(node, view).unwrap()
}
async fn reconstruct_proposal_block(
&self,
view: &View,
adapter: &Network,
fountain: &Fountain,
) -> Result<Block<VoteTally::Qc, TxId>, FountainError> {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
let message_stream = adapter.proposal_chunks_stream(view.view_n).await;
fountain.decode(message_stream).await.and_then(|b| {
deserializer(&b)
.deserialize::<Block<VoteTally::Qc, TxId>>()
.map_err(|e| FountainError::from(e.to_string().as_str()))
})
}
async fn broadcast_block(
&self,
view: &View,
block: Block<VoteTally::Qc, TxId>,
adapter: &Network,
fountain: &Fountain,
) {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
let block_bytes = block.as_bytes();
let encoded_stream = fountain.encode(&block_bytes);
encoded_stream
.for_each_concurrent(None, |chunk| async move {
let message = ProposalChunkMsg {
chunk: chunk.to_vec().into_boxed_slice(),
view: view.view_n,
};
adapter.broadcast_block_chunk(message.clone()).await;
})
.await;
}
async fn approve_and_forward(
&self,
view: &View,
_block: &Block<VoteTally::Qc, TxId>,
_adapter: &Network,
_tally: &VoteTally,
_next_view: &View,
) -> Result<(), Box<dyn Error>> {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
// roughly, we want to do something like this:
// 1. wait for left and right children committees to approve
// 2. approve the block
// 3. forward the approval to the parent committee
//
// However this will likely change depending on the position
// of the committee in the tree
todo!()
}
async fn build_qc(&self, view: &View, _adapter: &Network, _tally: &VoteTally) -> VoteTally::Qc {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
// maybe the leader publishing the QC?
todo!()
}
}

View File

@ -1,128 +0,0 @@
// std
use std::error::Error;
use std::hash::Hash;
// crates
use futures::StreamExt;
use serde::de::DeserializeOwned;
use serde::Serialize;
// internal
use super::*;
use crate::network::messages::{ProposalChunkMsg, VoteMsg};
use crate::network::NetworkAdapter;
use crate::overlay::committees::Committee;
use nomos_core::wire::deserializer;
const FLAT_COMMITTEE: Committee = Committee::root();
/// A flat overlay, everyone is in the same committee.
/// As far as the API is concerned, this should be equivalent to any other
/// overlay and far simpler to implement.
/// For this reason, this might act as a 'reference' overlay for testing.
pub struct Flat {
// TODO: this should be a const param, but we can't do that yet
node_id: NodeId,
view_n: consensus_engine::View,
}
impl Flat {
pub fn new(view_n: consensus_engine::View, node_id: NodeId) -> Self {
Self { node_id, view_n }
}
fn approve<Qc: Clone, TxId: Clone + Eq + Hash>(&self, _block: &Block<Qc, TxId>) -> Approval {
// we still need to define how votes look like
Approval
}
}
#[async_trait::async_trait]
impl<Network, Fountain, VoteTally, TxId> Overlay<Network, Fountain, VoteTally, TxId> for Flat
where
TxId: serde::de::DeserializeOwned + Clone + Eq + Hash + Send + Sync + 'static,
Network: NetworkAdapter + Sync,
Fountain: FountainCode + Sync,
VoteTally: Tally + Sync,
VoteTally::Vote: Serialize + DeserializeOwned + Send,
VoteTally::Qc: Clone + DeserializeOwned + Send + Sync + 'static,
{
fn new(view: &View, node: NodeId) -> Self {
Flat::new(view.view_n, node)
}
async fn reconstruct_proposal_block(
&self,
view: &View,
adapter: &Network,
fountain: &Fountain,
) -> Result<Block<VoteTally::Qc, TxId>, FountainError> {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
let message_stream = adapter.proposal_chunks_stream(view.view_n).await;
fountain.decode(message_stream).await.and_then(|b| {
deserializer(&b)
.deserialize::<Block<VoteTally::Qc, TxId>>()
.map_err(|e| FountainError::from(e.to_string().as_str()))
})
}
async fn broadcast_block(
&self,
view: &View,
block: Block<VoteTally::Qc, TxId>,
adapter: &Network,
fountain: &Fountain,
) {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
let block_bytes = block.as_bytes();
let encoded_stream = fountain.encode(&block_bytes);
encoded_stream
.for_each_concurrent(None, |chunk| async move {
let message = ProposalChunkMsg {
chunk: chunk.to_vec().into_boxed_slice(),
view: view.view_n,
};
adapter.broadcast_block_chunk(message).await;
})
.await;
}
async fn approve_and_forward(
&self,
view: &View,
block: &Block<VoteTally::Qc, TxId>,
adapter: &Network,
_tally: &VoteTally,
_next_view: &View,
) -> Result<(), Box<dyn Error>> {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
// in the flat overlay, there's no need to wait for anyone before approving the block
let approval = self.approve(block);
adapter
.send_vote(
FLAT_COMMITTEE,
view.view_n,
VoteMsg {
vote: approval,
source: self.node_id,
},
)
.await;
Ok(())
}
async fn build_qc(&self, view: &View, adapter: &Network, tally: &VoteTally) -> VoteTally::Qc {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
// for now, let's pretend that consensus is reached as soon as the
// block is approved by a share of the nodes
let stream = Box::into_pin(adapter.votes_stream(FLAT_COMMITTEE, view.view_n).await);
// Shadow the original binding so that it can't be directly accessed
// ever again.
// TODO: Remove the `try_into` call when tally is refactored to use with latest consensus engine types
if let Ok((qc, _)) = tally.tally(view.view_n.try_into().unwrap(), stream).await {
qc
} else {
unimplemented!("consensus not reached")
}
}
}

View File

@ -1,59 +0,0 @@
pub mod committees;
mod flat;
// std
use std::error::Error;
use std::hash::Hash;
// crates
// internal
use super::{Approval, NodeId, View};
use crate::network::NetworkAdapter;
pub use committees::Member;
use nomos_core::block::Block;
use nomos_core::fountain::{FountainCode, FountainError};
use nomos_core::vote::Tally;
/// Dissemination overlay, tied to a specific view
#[async_trait::async_trait]
pub trait Overlay<
Network: NetworkAdapter,
Fountain: FountainCode,
VoteTally: Tally,
TxId: Clone + Eq + Hash,
> where
VoteTally::Qc: Clone,
{
fn new(view: &View, node: NodeId) -> Self;
async fn reconstruct_proposal_block(
&self,
view: &View,
adapter: &Network,
fountain: &Fountain,
) -> Result<Block<VoteTally::Qc, TxId>, FountainError>;
async fn broadcast_block(
&self,
view: &View,
block: Block<VoteTally::Qc, TxId>,
adapter: &Network,
fountain: &Fountain,
);
/// Different overlays might have different needs or the same overlay might
/// require different steps depending on the node role
/// For now let's put this responsibility on the overlay
async fn approve_and_forward(
&self,
view: &View,
block: &Block<VoteTally::Qc, TxId>,
adapter: &Network,
vote_tally: &VoteTally,
next_view: &View,
) -> Result<(), Box<dyn Error>>;
/// Wait for consensus on a block
async fn build_qc(
&self,
view: &View,
adapter: &Network,
vote_tally: &VoteTally,
) -> VoteTally::Qc;
}

View File

@ -0,0 +1,85 @@
#![allow(dead_code)]
// TODO: Well, remove this when we actually use the fields from the specification
// std
use std::collections::HashSet;
// crates
use futures::{Stream, StreamExt};
// internal
use super::CarnotTallySettings;
use crate::network::messages::VoteMsg;
use consensus_engine::{Block, Qc, StandardQc, Vote};
use nomos_core::crypto::PublicKey;
use nomos_core::vote::Tally;
pub type NodeId = PublicKey;
#[derive(thiserror::Error, Debug)]
pub enum CarnotTallyError {
#[error("Received invalid vote: {0}")]
InvalidVote(String),
#[error("Did not receive enough votes")]
InsufficientVotes,
}
#[derive(Clone, Debug)]
pub struct CarnotTally {
settings: CarnotTallySettings,
}
#[async_trait::async_trait]
impl Tally for CarnotTally {
type Vote = VoteMsg;
type Qc = Qc;
type Subject = Block;
type Outcome = HashSet<Vote>;
type TallyError = CarnotTallyError;
type Settings = CarnotTallySettings;
fn new(settings: Self::Settings) -> Self {
Self { settings }
}
async fn tally<S: Stream<Item = Self::Vote> + Unpin + Send>(
&self,
block: Block,
mut vote_stream: S,
) -> Result<(Self::Qc, Self::Outcome), Self::TallyError> {
let mut seen = HashSet::new();
let mut outcome = HashSet::new();
// return early for leaf nodes
if self.settings.threshold == 0 {
return Ok((
Qc::Standard(StandardQc {
view: block.view,
id: block.id,
}),
outcome,
));
}
while let Some(vote) = vote_stream.next().await {
// check vote view is valid
if vote.vote.view != block.view || vote.vote.block != block.id {
continue;
}
// check for individual nodes votes
if !self.settings.participating_nodes.contains(&vote.voter) {
continue;
}
seen.insert(vote.voter);
outcome.insert(vote.vote.clone());
if seen.len() >= self.settings.threshold {
return Ok((
Qc::Standard(StandardQc {
view: vote.vote.view,
id: vote.vote.block,
}),
outcome,
));
}
}
unreachable!()
}
}

View File

@ -0,0 +1,18 @@
pub mod happy;
pub mod unhappy;
// std
use std::collections::HashSet;
// crates
use serde::{Deserialize, Serialize};
// internal
use consensus_engine::NodeId;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CarnotTallySettings {
pub threshold: usize,
// TODO: this probably should be dynamic and should change with the view (?)
pub participating_nodes: HashSet<NodeId>,
}

View File

@ -0,0 +1,67 @@
// std
use std::collections::HashSet;
// crates
use futures::{Stream, StreamExt};
use serde::{Deserialize, Serialize};
// internal
use super::CarnotTallySettings;
use crate::network::messages::NewViewMsg;
use consensus_engine::{NewView, TimeoutQc};
use nomos_core::vote::Tally;
#[derive(thiserror::Error, Debug)]
pub enum NewViewTallyError {
#[error("Did not receive enough votes")]
InsufficientVotes,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NewViewTally {
settings: CarnotTallySettings,
}
#[async_trait::async_trait]
impl Tally for NewViewTally {
type Vote = NewViewMsg;
type Qc = ();
type Subject = TimeoutQc;
type Outcome = HashSet<NewView>;
type TallyError = NewViewTallyError;
type Settings = CarnotTallySettings;
fn new(settings: Self::Settings) -> Self {
Self { settings }
}
async fn tally<S: Stream<Item = Self::Vote> + Unpin + Send>(
&self,
timeout_qc: TimeoutQc,
mut vote_stream: S,
) -> Result<(Self::Qc, Self::Outcome), Self::TallyError> {
let mut seen = HashSet::new();
let mut outcome = HashSet::new();
// return early for leaf nodes
if self.settings.threshold == 0 {
return Ok(((), outcome));
}
while let Some(vote) = vote_stream.next().await {
// check vote view is valid
if !vote.vote.view != timeout_qc.view {
continue;
}
// check for individual nodes votes
if !self.settings.participating_nodes.contains(&vote.voter) {
continue;
}
seen.insert(vote.voter);
outcome.insert(vote.vote.clone());
if seen.len() >= self.settings.threshold {
return Ok(((), outcome));
}
}
Err(NewViewTallyError::InsufficientVotes)
}
}

View File

@ -1,2 +0,0 @@
/// Assuming determining which tip to consider is integral part of consensus
pub struct Tip;

View File

@ -0,0 +1,70 @@
use crate::Event;
use consensus_engine::View;
use std::collections::HashMap;
use std::future::Future;
use std::hash::Hash;
use tokio::select;
use tokio_util::sync::CancellationToken;
pub struct ViewCancel(CancellationToken);
impl ViewCancel {
pub fn new() -> Self {
ViewCancel(CancellationToken::new())
}
pub fn cancel(&self) {
self.0.cancel();
}
pub fn cancel_token(&self) -> CancellationToken {
self.0.clone()
}
}
impl Drop for ViewCancel {
fn drop(&mut self) {
if !self.0.is_cancelled() {
self.cancel();
}
}
}
pub struct ViewCancelCache {
cancels: HashMap<View, ViewCancel>,
}
impl ViewCancelCache {
pub fn new() -> Self {
ViewCancelCache {
cancels: HashMap::new(),
}
}
pub fn cancel(&mut self, view: View) {
if let Some(cancel) = self.cancels.remove(&view) {
cancel.cancel();
}
}
pub fn cancel_token(&mut self, view: View) -> CancellationToken {
self.cancels
.entry(view)
.or_insert_with(ViewCancel::new)
.cancel_token()
}
pub(crate) fn cancelable_event_future<Tx: Clone + Hash + Eq, F: Future<Output = Event<Tx>>>(
&mut self,
view: View,
f: F,
) -> impl Future<Output = Event<Tx>> {
let token = self.cancel_token(view);
async move {
select! {
event = f => event,
_ = token.cancelled() => Event::None,
}
}
}
}