diff --git a/consensus-engine/Cargo.toml b/consensus-engine/Cargo.toml index bec28a37..5710bb3e 100644 --- a/consensus-engine/Cargo.toml +++ b/consensus-engine/Cargo.toml @@ -6,8 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -serde = { version = "1.0", optional = true } +serde = { version = "1.0", features = ["derive"], optional = true } [features] default = [] -serde1 = ["serde"] diff --git a/consensus-engine/src/lib.rs b/consensus-engine/src/lib.rs index 9cd526f3..1d8acf88 100644 --- a/consensus-engine/src/lib.rs +++ b/consensus-engine/src/lib.rs @@ -23,9 +23,13 @@ impl Carnot { highest_voted_view: -1, last_view_timeout_qc: None, overlay, - safe_blocks: [(id, genesis_block)].into(), + safe_blocks: [(genesis_block.id, genesis_block)].into(), } } + + pub fn current_view(&self) -> View { + self.current_view + } /// Upon reception of a block /// /// Preconditions: @@ -40,12 +44,10 @@ impl Carnot { self.safe_blocks.contains_key(&block.parent()), "out of order view not supported, missing parent block for {block:?}", ); - // if the block has already been processed, return early if self.safe_blocks.contains_key(&block.id) { return Ok(self.clone()); } - if self.blocks_in_view(block.view).contains(&block) || block.view <= self.latest_committed_view() { @@ -56,7 +58,6 @@ impl Carnot { // By rejecting any other blocks except the first one received for a view this code does NOT do that. return Err(()); } - let mut new_state = self.clone(); if new_state.block_is_safe(block.clone()) { @@ -66,7 +67,6 @@ impl Carnot { // Non safe block, not necessarily an error return Err(()); } - Ok(new_state) } @@ -94,8 +94,13 @@ impl Carnot { /// Preconditions: /// * `receive_block(b)` must have been called successfully before trying to approve a block b. /// * A node should not attempt to vote for a block in a view earlier than the latest one it actively participated in. - pub fn approve_block(&self, block: Block) -> (Self, Output) { - assert!(self.safe_blocks.contains_key(&block.id)); + pub fn approve_block(&self, block: Block) -> (Self, Send) { + assert!( + self.safe_blocks.contains_key(&block.id), + "{:?} not in {:?}", + block, + self.safe_blocks + ); assert!( self.highest_voted_view < block.view, "can't vote for a block in the past" @@ -114,9 +119,12 @@ impl Carnot { }; ( new_state, - Output::Send { + Send { to, - payload: Payload::Vote(Vote { block: block.id }), + payload: Payload::Vote(Vote { + block: block.id, + view: block.view, + }), }, ) } @@ -132,7 +140,7 @@ impl Carnot { &self, timeout_qc: TimeoutQc, new_views: HashSet, - ) -> (Self, Output) { + ) -> (Self, Send) { let new_view = timeout_qc.view + 1; assert!( new_view @@ -180,7 +188,7 @@ impl Carnot { }; ( new_state, - Output::Send { + Send { to, payload: Payload::NewView(new_view_msg), }, @@ -192,7 +200,7 @@ impl Carnot { /// Preconditions: none! /// Just notice that the timer only reset after a view change, i.e. a node can't timeout /// more than once for the same view - pub fn local_timeout(&self) -> (Self, Option) { + pub fn local_timeout(&self) -> (Self, Option) { let mut new_state = self.clone(); new_state.highest_voted_view = new_state.current_view; @@ -208,7 +216,7 @@ impl Carnot { let to = new_state.overlay.root_committee(); return ( new_state, - Some(Output::Send { + Some(Send { to, payload: Payload::Timeout(timeout_msg), }), @@ -305,6 +313,50 @@ impl Carnot { } res } + + pub fn last_view_timeout_qc(&self) -> Option { + self.last_view_timeout_qc.clone() + } + + pub fn high_qc(&self) -> StandardQc { + self.local_high_qc.clone() + } + + pub fn is_leader_for_view(&self, view: View) -> bool { + self.overlay.leader(view) == self.id + } + + pub fn super_majority_threshold(&self) -> usize { + self.overlay.super_majority_threshold(self.id) + } + + pub fn leader_super_majority_threshold(&self) -> usize { + self.overlay.leader_super_majority_threshold(self.id) + } + + pub fn id(&self) -> NodeId { + self.id + } + + pub fn self_committee(&self) -> Committee { + self.overlay.node_committee(self.id) + } + + pub fn child_committees(&self) -> Vec { + self.overlay.child_committees(self.id) + } + + pub fn parent_committee(&self) -> Committee { + self.overlay.parent_committee(self.id) + } + + pub fn root_committee(&self) -> Committee { + self.overlay.root_committee() + } + + pub fn is_member_of_root_committee(&self) -> bool { + self.overlay.is_member_of_root_committee(self.id) + } } #[cfg(test)] @@ -315,6 +367,10 @@ mod test { struct NoOverlay; impl Overlay for NoOverlay { + fn new(_nodes: Vec) -> Self { + Self + } + fn root_committee(&self) -> Committee { todo!() } @@ -339,11 +395,19 @@ mod test { todo!() } + fn node_committee(&self, _id: NodeId) -> Committee { + todo!() + } + fn parent_committee(&self, _id: NodeId) -> Committee { todo!() } - fn leaf_committees(&self, _id: NodeId) -> HashSet { + fn child_committees(&self, _id: NodeId) -> Vec { + todo!() + } + + fn leaf_committees(&self, _id: NodeId) -> Vec { todo!() } @@ -355,7 +419,7 @@ mod test { todo!() } - fn leader_super_majority_threshold(&self, _view: View) -> usize { + fn leader_super_majority_threshold(&self, _id: NodeId) -> usize { todo!() } } diff --git a/consensus-engine/src/types.rs b/consensus-engine/src/types.rs index 20cba87b..2bed6dff 100644 --- a/consensus-engine/src/types.rs +++ b/consensus-engine/src/types.rs @@ -1,5 +1,9 @@ +// std use std::collections::HashSet; use std::hash::Hash; +// crates +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; pub type View = i64; pub type NodeId = [u8; 32]; @@ -12,7 +16,7 @@ pub type Committee = HashSet; /// This enum represents the different types of messages that can be sent from the perspective of consensus and /// can't be directly used in the network as they lack things like cryptographic signatures. #[derive(Debug, Clone, Eq, PartialEq)] -#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum Payload { /// Vote for a block in a view Vote(Vote), @@ -23,14 +27,15 @@ pub enum Payload { } /// Returned -#[derive(Debug, Clone, Eq, PartialEq)] -#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct Vote { + pub view: View, pub block: BlockId, } -#[derive(Debug, Clone, Eq, PartialEq)] -#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct Timeout { pub view: View, pub sender: NodeId, @@ -38,8 +43,10 @@ pub struct Timeout { pub timeout_qc: Option, } -#[derive(Debug, Clone, Eq, PartialEq)] -#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] +// TODO: We are making "mandatory" to have received the timeout_qc before the new_view votes. +// We should consider to remove the TimoutQc from the NewView message and use a hash or id instead. +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct NewView { pub view: View, pub sender: NodeId, @@ -47,8 +54,8 @@ pub struct NewView { pub high_qc: Qc, } -#[derive(Debug, Clone, Eq, PartialEq)] -#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct TimeoutQc { pub view: View, pub high_qc: Qc, @@ -56,8 +63,9 @@ pub struct TimeoutQc { } #[derive(Debug, Clone, Eq, PartialEq, Hash)] -#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct Block { + #[cfg_attr(feature = "serde", serde(skip))] pub id: BlockId, pub view: View, pub parent_qc: Qc, @@ -71,25 +79,20 @@ impl Block { /// Possible output events. #[derive(Debug, Clone, Eq, PartialEq)] -pub enum Output { - Send { - to: HashSet, - payload: Payload, - }, - Broadcast { - payload: Payload, - }, +pub struct Send { + pub to: HashSet, + pub payload: Payload, } #[derive(Debug, Clone, Eq, PartialEq, Hash)] -#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct StandardQc { pub view: View, pub id: BlockId, } impl StandardQc { - pub(crate) fn genesis() -> Self { + pub fn genesis() -> Self { Self { view: -1, id: [0; 32], @@ -98,14 +101,14 @@ impl StandardQc { } #[derive(Debug, Clone, Eq, PartialEq, Hash)] -#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct AggregateQc { pub high_qc: StandardQc, pub view: View, } #[derive(Debug, Clone, Eq, PartialEq, Hash)] -#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum Qc { Standard(StandardQc), Aggregated(AggregateQc), @@ -136,9 +139,17 @@ impl Qc { Qc::Aggregated(AggregateQc { high_qc, .. }) => high_qc.id, } } + + pub fn high_qc(&self) -> StandardQc { + match self { + Qc::Standard(qc) => qc.clone(), + Qc::Aggregated(AggregateQc { high_qc, .. }) => high_qc.clone(), + } + } } pub trait Overlay: Clone { + fn new(nodes: Vec) -> Self; fn root_committee(&self) -> Committee; fn rebuild(&mut self, timeout_qc: TimeoutQc); fn is_member_of_child_committee(&self, parent: NodeId, child: NodeId) -> bool; @@ -146,8 +157,10 @@ pub trait Overlay: Clone { fn is_member_of_leaf_committee(&self, id: NodeId) -> bool; fn is_child_of_root_committee(&self, id: NodeId) -> bool; fn parent_committee(&self, id: NodeId) -> Committee; - fn leaf_committees(&self, id: NodeId) -> HashSet; + fn child_committees(&self, id: NodeId) -> Vec; + fn leaf_committees(&self, id: NodeId) -> Vec; + fn node_committee(&self, id: NodeId) -> Committee; fn leader(&self, view: View) -> NodeId; fn super_majority_threshold(&self, id: NodeId) -> usize; - fn leader_super_majority_threshold(&self, view: View) -> usize; + fn leader_super_majority_threshold(&self, id: NodeId) -> usize; } diff --git a/nodes/mockpool-node/Cargo.toml b/nodes/mockpool-node/Cargo.toml index e50779ba..9f128e55 100644 --- a/nodes/mockpool-node/Cargo.toml +++ b/nodes/mockpool-node/Cargo.toml @@ -22,6 +22,7 @@ nomos-network = { path = "../../nomos-services/network", features = ["waku"] } nomos-log = { path = "../../nomos-services/log" } nomos-mempool = { path = "../../nomos-services/mempool", features = ["waku", "mock"] } nomos-http = { path = "../../nomos-services/http", features = ["http"] } +nomos-consensus = { path = "../../nomos-services/consensus", features = ["waku"] } tracing-subscriber = "0.3" tokio = {version = "1.24", features = ["sync"] } serde_json = "1.0" diff --git a/nodes/mockpool-node/src/main.rs b/nodes/mockpool-node/src/main.rs index 786b0c1b..5de599c4 100644 --- a/nodes/mockpool-node/src/main.rs +++ b/nodes/mockpool-node/src/main.rs @@ -3,13 +3,19 @@ mod tx; use clap::Parser; use color_eyre::eyre::{eyre, Result}; +use nomos_consensus::{ + network::adapters::waku::WakuAdapter as ConsensusWakuAdapter, overlay::FlatRoundRobin, + CarnotConsensus, +}; +use nomos_core::fountain::mock::MockFountain; use nomos_http::backends::axum::AxumBackend; use nomos_http::bridge::{HttpBridge, HttpBridgeService, HttpBridgeSettings}; use nomos_http::http::HttpService; use nomos_log::Logger; -use nomos_mempool::backend::mockpool::MockPool; -use nomos_mempool::network::adapters::waku::WakuAdapter; -use nomos_mempool::MempoolService; +use nomos_mempool::{ + backend::mockpool::MockPool, network::adapters::waku::WakuAdapter as MempoolWakuAdapter, + MempoolService, +}; use nomos_network::{backends::waku::Waku, NetworkService}; use overwatch_derive::*; use overwatch_rs::{ @@ -28,18 +34,28 @@ struct Args { config: std::path::PathBuf, } +type Carnot = CarnotConsensus< + ConsensusWakuAdapter, + MockPool, + MempoolWakuAdapter, + MockFountain, + FlatRoundRobin, +>; + #[derive(Deserialize)] struct Config { log: ::Settings, network: as ServiceData>::Settings, http: as ServiceData>::Settings, + consensus: ::Settings, } #[derive(Services)] struct MockPoolNode { logging: ServiceHandle, network: ServiceHandle>, - mockpool: ServiceHandle, MockPool>>, + mockpool: ServiceHandle, MockPool>>, + consensus: ServiceHandle, http: ServiceHandle>, bridges: ServiceHandle, } @@ -80,6 +96,7 @@ fn main() -> Result<()> { logging: config.log, http: config.http, mockpool: (), + consensus: config.consensus, bridges: HttpBridgeSettings { bridges }, }, None, diff --git a/nodes/mockpool-node/src/tx.rs b/nodes/mockpool-node/src/tx.rs index 542de0a0..26c5f989 100644 --- a/nodes/mockpool-node/src/tx.rs +++ b/nodes/mockpool-node/src/tx.rs @@ -3,7 +3,7 @@ use nomos_core::tx::{Transaction, TransactionHasher}; use serde::{Deserialize, Serialize}; use std::hash::Hash; -#[derive(Clone, Debug, Serialize, Deserialize, Hash)] +#[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq, Eq)] pub struct Tx(pub String); fn hash_tx(tx: &Tx) -> String { diff --git a/nomos-core/Cargo.toml b/nomos-core/Cargo.toml index 842b5eaf..e9df9d15 100644 --- a/nomos-core/Cargo.toml +++ b/nomos-core/Cargo.toml @@ -12,7 +12,7 @@ authors = [ async-trait = { version = "0.1" } blake2 = { version = "0.10" } bytes = "1.3" -consensus-engine = { path = "../consensus-engine"} +consensus-engine = { path = "../consensus-engine", features = ["serde"]} futures = "0.3" nomos-network = { path = "../nomos-services/network", optional = true } raptorq = { version = "1.7", optional = true } @@ -20,7 +20,7 @@ serde = { version = "1.0", features = ["derive"] } thiserror = "1.0" bincode = "1.3" once_cell = "1.0" -indexmap = { version = "1.9", features = ["serde-1"] } +indexmap = { version = "1.9", features = ["serde"] } serde_json = { version = "1", optional = true } [dev-dependencies] diff --git a/nomos-core/src/account.rs b/nomos-core/src/account.rs index b2dacf92..1f00593d 100644 --- a/nomos-core/src/account.rs +++ b/nomos-core/src/account.rs @@ -1,4 +1,4 @@ use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct AccountId; diff --git a/nomos-core/src/block.rs b/nomos-core/src/block.rs index 7514707d..19a8bdc9 100644 --- a/nomos-core/src/block.rs +++ b/nomos-core/src/block.rs @@ -2,7 +2,10 @@ use indexmap::IndexSet; // std use core::hash::Hash; // crates +use crate::wire; use bytes::Bytes; +use consensus_engine::{Qc, View}; +use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; // internal @@ -10,39 +13,34 @@ pub type TxHash = [u8; 32]; /// A block #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Block { - header: BlockHeader, +pub struct Block { + header: consensus_engine::Block, transactions: IndexSet, } -/// A block header -#[derive(Copy, Clone, Default, Debug, Serialize, Deserialize)] -pub struct BlockHeader { - id: BlockId, - qc: Qc, -} - /// Identifier of a block pub type BlockId = [u8; 32]; -impl Block { - pub fn new(qc: Qc, txs: impl Iterator) -> Self { +impl Block { + pub fn new(view: View, parent_qc: Qc, txs: impl Iterator) -> Self { let transactions = txs.collect(); - // FIXME: Calculate header Id - let header = BlockHeader { id: [0; 32], qc }; - Self { + let header = consensus_engine::Block { + id: [view as u8; 32], + view, + parent_qc, + }; + + let mut s = Self { header, transactions, - } + }; + let id = id_from_wire_content(&s.as_bytes()); + s.header.id = id; + s } - /// Encode block into bytes - pub fn as_bytes(&self) -> Bytes { - Bytes::new() - } - - pub fn header(&self) -> BlockHeader { - self.header.clone() + pub fn header(&self) -> &consensus_engine::Block { + &self.header } pub fn transactions(&self) -> impl Iterator + '_ { @@ -50,12 +48,23 @@ impl Block { } } -impl BlockHeader { - pub fn id(&self) -> BlockId { - self.id +fn id_from_wire_content(bytes: &[u8]) -> consensus_engine::BlockId { + use blake2::digest::{consts::U32, Digest}; + use blake2::Blake2b; + let mut hasher = Blake2b::::new(); + hasher.update(bytes); + hasher.finalize().into() +} + +impl Block { + /// Encode block into bytes + pub fn as_bytes(&self) -> Bytes { + wire::serialize(self).unwrap().into() } - pub fn qc(&self) -> &Qc { - &self.qc + pub fn from_bytes(bytes: &[u8]) -> Self { + let mut result: Self = wire::deserialize(bytes).unwrap(); + result.header.id = id_from_wire_content(bytes); + result } } diff --git a/nomos-core/src/fountain/mock.rs b/nomos-core/src/fountain/mock.rs index a6ec9b46..5017168d 100644 --- a/nomos-core/src/fountain/mock.rs +++ b/nomos-core/src/fountain/mock.rs @@ -8,7 +8,7 @@ use crate::fountain::{FountainCode, FountainError}; /// Fountain code that does no protocol at all. /// Just bypasses the raw bytes into a single chunk and reconstruct from it. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MockFountain; #[async_trait] diff --git a/nomos-core/src/tx/carnot/mod.rs b/nomos-core/src/tx/carnot/mod.rs index 63d6e319..a68be323 100644 --- a/nomos-core/src/tx/carnot/mod.rs +++ b/nomos-core/src/tx/carnot/mod.rs @@ -8,7 +8,7 @@ use crate::tx::{Transaction, TransactionHasher}; mod transaction; -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum Tx { Transfer(TransferTransaction), } diff --git a/nomos-core/src/tx/carnot/transaction.rs b/nomos-core/src/tx/carnot/transaction.rs index 516e9ceb..16a049db 100644 --- a/nomos-core/src/tx/carnot/transaction.rs +++ b/nomos-core/src/tx/carnot/transaction.rs @@ -6,7 +6,7 @@ use crate::crypto::Signature; /// Can only be constructed if the signature is valid, /// but does not imply that it can be successfully applied /// to the ledger. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct TransferTransaction { pub from: AccountId, pub to: AccountId, diff --git a/nomos-core/src/vote/carnot.rs b/nomos-core/src/vote/carnot.rs deleted file mode 100644 index be4a336d..00000000 --- a/nomos-core/src/vote/carnot.rs +++ /dev/null @@ -1,121 +0,0 @@ -#![allow(dead_code)] -// TODO: Well, remove this when we actually use the fields from the specification -// std - -use std::collections::HashSet; -// crates -use futures::{Stream, StreamExt}; -// internal -use crate::block::BlockId; -use crate::crypto::PublicKey; -use crate::vote::Tally; - -pub type NodeId = PublicKey; - -pub enum QuorumCertificate { - Simple(SimpleQuorumCertificate), - Aggregated(AggregatedQuorumCertificate), -} - -impl QuorumCertificate { - pub fn view(&self) -> u64 { - match self { - QuorumCertificate::Simple(qc) => qc.view, - QuorumCertificate::Aggregated(qc) => qc.view, - } - } -} - -pub struct SimpleQuorumCertificate { - view: u64, - block: BlockId, -} - -pub struct AggregatedQuorumCertificate { - view: u64, - high_qh: Box, -} - -pub struct Vote { - block: BlockId, - view: u64, - voter: NodeId, // TODO: this should be some id, probably the node pk - qc: Option, -} - -impl Vote { - pub fn valid_view(&self, view: u64) -> bool { - self.view == view && self.qc.as_ref().map_or(true, |qc| qc.view() == view - 1) - } -} - -#[derive(thiserror::Error, Debug)] -pub enum CarnotTallyError { - #[error("Received invalid vote: {0}")] - InvalidVote(String), - #[error("Did not receive enough votes")] - InsufficientVotes, -} - -#[derive(Clone)] -pub struct CarnotTallySettings { - threshold: usize, - // TODO: this probably should be dynamic and should change with the view (?) - participating_nodes: HashSet, -} - -pub struct CarnotTally { - settings: CarnotTallySettings, -} - -#[async_trait::async_trait] -impl Tally for CarnotTally { - type Vote = Vote; - type Qc = QuorumCertificate; - type Outcome = (); - type TallyError = CarnotTallyError; - type Settings = CarnotTallySettings; - - fn new(settings: Self::Settings) -> Self { - Self { settings } - } - - async fn tally + Unpin + Send>( - &self, - view: u64, - mut vote_stream: S, - ) -> Result<(Self::Qc, Self::Outcome), Self::TallyError> { - let mut approved = 0usize; - let mut seen = HashSet::new(); - while let Some(vote) = vote_stream.next().await { - // check vote view is valid - if !vote.valid_view(view) { - return Err(CarnotTallyError::InvalidVote("Invalid view".to_string())); - } - // check for duplicated votes - if seen.contains(&vote.voter) { - return Err(CarnotTallyError::InvalidVote( - "Double voted node".to_string(), - )); - } - // check for individual nodes votes - if !self.settings.participating_nodes.contains(&vote.voter) { - return Err(CarnotTallyError::InvalidVote( - "Non-participating node".to_string(), - )); - } - seen.insert(vote.voter); - approved += 1; - if approved >= self.settings.threshold { - return Ok(( - QuorumCertificate::Simple(SimpleQuorumCertificate { - view, - block: vote.block, - }), - (), - )); - } - } - Err(CarnotTallyError::InsufficientVotes) - } -} diff --git a/nomos-core/src/vote/mock.rs b/nomos-core/src/vote/mock.rs index 68d1c328..4ae4f9d0 100644 --- a/nomos-core/src/vote/mock.rs +++ b/nomos-core/src/vote/mock.rs @@ -1,5 +1,6 @@ // std // crates +use consensus_engine::{Block, View}; use futures::{Stream, StreamExt}; use serde::{Deserialize, Serialize}; // internal @@ -7,11 +8,11 @@ use crate::vote::Tally; #[derive(Serialize, Deserialize)] pub struct MockVote { - view: u64, + view: View, } impl MockVote { - pub fn view(&self) -> u64 { + pub fn view(&self) -> View { self.view } } @@ -48,6 +49,7 @@ impl Tally for MockTally { type Vote = MockVote; type Qc = MockQc; type Outcome = (); + type Subject = Block; type TallyError = Error; type Settings = MockTallySettings; @@ -58,12 +60,12 @@ impl Tally for MockTally { async fn tally + Unpin + Send>( &self, - view: u64, + block: Block, mut vote_stream: S, ) -> Result<(Self::Qc, Self::Outcome), Self::TallyError> { let mut count_votes = 0; while let Some(vote) = vote_stream.next().await { - if vote.view() != view { + if vote.view() != block.view { return Err(Error("Invalid vote".into())); } count_votes += 1; diff --git a/nomos-core/src/vote/mod.rs b/nomos-core/src/vote/mod.rs index cb0e2b58..bbf79f2f 100644 --- a/nomos-core/src/vote/mod.rs +++ b/nomos-core/src/vote/mod.rs @@ -1,6 +1,4 @@ -pub mod carnot; pub mod mock; - use futures::Stream; #[async_trait::async_trait] @@ -8,12 +6,13 @@ pub trait Tally { type Vote; type Qc; type Outcome; + type Subject; type TallyError; type Settings: Clone; fn new(settings: Self::Settings) -> Self; async fn tally + Unpin + Send>( &self, - view: u64, + subject: Self::Subject, vote_stream: S, ) -> Result<(Self::Qc, Self::Outcome), Self::TallyError>; } diff --git a/nomos-services/consensus/Cargo.toml b/nomos-services/consensus/Cargo.toml index ba21fc73..47492b5b 100644 --- a/nomos-services/consensus/Cargo.toml +++ b/nomos-services/consensus/Cargo.toml @@ -7,9 +7,10 @@ edition = "2021" [dependencies] async-trait = "0.1" +async-stream = "0.3" bytes = "1.3" chrono = "0.4" -consensus-engine = { path = "../../consensus-engine", features = ["serde1"] } +consensus-engine = { path = "../../consensus-engine", features = ["serde"] } futures = "0.3" nomos-network = { path = "../network" } nomos-mempool = { path = "../mempool" } @@ -18,10 +19,12 @@ overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" rand_chacha = "0.3" rand = "0.8" serde = { version = "1", features = ["derive"] } +thiserror = "1.0" tokio = { version = "1", features = ["sync"] } tokio-stream = "0.1" -waku-bindings = { version = "0.1.0-rc.2", optional = true} +tokio-util = "0.7" tracing = "0.1" +waku-bindings = { version = "0.1.0-rc.2", optional = true} [features] default = [] diff --git a/nomos-services/consensus/src/leadership.rs b/nomos-services/consensus/src/leadership.rs deleted file mode 100644 index b64e0bad..00000000 --- a/nomos-services/consensus/src/leadership.rs +++ /dev/null @@ -1,69 +0,0 @@ -// std -use std::marker::PhantomData; -// crates -// internal -use nomos_core::crypto::PrivateKey; -use nomos_core::tx::Transaction; -use nomos_mempool::MempoolMsg; - -use super::*; - -// TODO: take care of sensitve material -struct Enclave { - key: PrivateKey, -} - -pub struct Leadership { - key: Enclave, - mempool: OutboundRelay>, -} - -pub enum LeadershipResult<'view, Qc: Clone, TxId: Clone + Eq + core::hash::Hash> { - Leader { - block: Block, - _view: PhantomData<&'view u8>, - }, - NotLeader { - _view: PhantomData<&'view u8>, - }, -} - -impl Leadership -where - Tx: Transaction, - Tx::Hash: Debug, -{ - pub fn new(key: PrivateKey, mempool: OutboundRelay>) -> Self { - Self { - key: Enclave { key }, - mempool, - } - } - - #[allow(unused, clippy::diverging_sub_expression)] - pub async fn try_propose_block<'view, Qc: Clone>( - &self, - view: &'view View, - tip: &Tip, - qc: Qc, - ) -> LeadershipResult<'view, Qc, Tx::Hash> { - // TODO: get the correct ancestor for the tip - // let ancestor_hint = todo!("get the ancestor from the tip"); - let ancestor_hint = [0; 32]; - if view.is_leader(self.key.key) { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.mempool.send(MempoolMsg::View { - ancestor_hint, - reply_channel: tx, - }); - let iter = rx.await.unwrap(); - - LeadershipResult::Leader { - _view: PhantomData, - block: Block::new(qc, iter.map(|ref tx| ::hash(tx))), - } - } else { - LeadershipResult::NotLeader { _view: PhantomData } - } - } -} diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 1b801cac..7e19201e 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -4,29 +4,39 @@ //! are always synchronized (i.e. it cannot happen that we accidentally use committees from different views). //! It's obviously extremely important that the information contained in `View` is synchronized across different //! nodes, but that has to be achieved through different means. -mod leadership; -mod network; +pub mod network; pub mod overlay; -mod tip; +mod tally; +mod view_cancel; // std -use std::collections::BTreeMap; +use std::collections::HashSet; use std::fmt::Debug; use std::hash::Hash; +use std::pin::Pin; +use std::time::Duration; // crates -use serde::{Deserialize, Serialize}; +use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; +use serde::Deserialize; +use serde::{de::DeserializeOwned, Serialize}; // internal +use crate::network::messages::{NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg}; use crate::network::NetworkAdapter; -use leadership::{Leadership, LeadershipResult}; +use crate::tally::{happy::CarnotTally, unhappy::NewViewTally, CarnotTallySettings}; +use crate::view_cancel::ViewCancelCache; +use consensus_engine::{ + AggregateQc, Carnot, Committee, NewView, Overlay, Payload, Qc, StandardQc, Timeout, TimeoutQc, + Vote, +}; use nomos_core::block::Block; use nomos_core::crypto::PublicKey; use nomos_core::fountain::FountainCode; -use nomos_core::staking::Stake; use nomos_core::tx::Transaction; use nomos_core::vote::Tally; -use nomos_mempool::{backend::MemPool, network::NetworkAdapter as MempoolAdapter, MempoolService}; +use nomos_mempool::{ + backend::MemPool, network::NetworkAdapter as MempoolAdapter, MempoolMsg, MempoolService, +}; use nomos_network::NetworkService; -use overlay::Overlay; use overwatch_rs::services::relay::{OutboundRelay, Relay}; use overwatch_rs::services::{ handle::ServiceStateHandle, @@ -34,54 +44,54 @@ use overwatch_rs::services::{ state::{NoOperator, NoState}, ServiceCore, ServiceData, ServiceId, }; -use tip::Tip; + +// TODO: tale this from config +const TIMEOUT: Duration = Duration::from_secs(60); // Raw bytes for now, could be a ed25519 public key pub type NodeId = PublicKey; // Random seed for each round provided by the protocol pub type Seed = [u8; 32]; -#[derive(Debug)] -pub struct CarnotSettings { +#[derive(Debug, Deserialize, Serialize)] +pub struct CarnotSettings { private_key: [u8; 32], fountain_settings: Fountain::Settings, - tally_settings: VoteTally::Settings, + nodes: Vec, } -impl Clone for CarnotSettings { +impl Clone for CarnotSettings { fn clone(&self) -> Self { Self { private_key: self.private_key, fountain_settings: self.fountain_settings.clone(), - tally_settings: self.tally_settings.clone(), + nodes: self.nodes.clone(), } } } -impl CarnotSettings { +impl CarnotSettings { #[inline] pub const fn new( private_key: [u8; 32], fountain_settings: Fountain::Settings, - tally_settings: VoteTally::Settings, + nodes: Vec, ) -> Self { Self { private_key, fountain_settings, - tally_settings, + nodes, } } } -pub struct CarnotConsensus +pub struct CarnotConsensus where F: FountainCode, A: NetworkAdapter, M: MempoolAdapter, P: MemPool, - T: Tally, - T::Qc: Clone, - O: Overlay::Hash>, + O: Overlay + Debug, P::Tx: Transaction + Debug + 'static, ::Hash: Debug, A::Backend: 'static, @@ -92,44 +102,38 @@ where network_relay: Relay>, mempool_relay: Relay>, _fountain: std::marker::PhantomData, - _tally: std::marker::PhantomData, _overlay: std::marker::PhantomData, } -impl ServiceData for CarnotConsensus +impl ServiceData for CarnotConsensus where F: FountainCode, A: NetworkAdapter, P: MemPool, - T: Tally, - T::Qc: Clone, P::Tx: Transaction + Debug, ::Hash: Debug, M: MempoolAdapter, - O: Overlay::Hash>, + O: Overlay + Debug, { const SERVICE_ID: ServiceId = "Carnot"; - type Settings = CarnotSettings; + type Settings = CarnotSettings; type State = NoState; type StateOperator = NoOperator; type Message = NoMessage; } #[async_trait::async_trait] -impl ServiceCore for CarnotConsensus +impl ServiceCore for CarnotConsensus where - F: FountainCode + Send + Sync + 'static, - A: NetworkAdapter + Send + Sync + 'static, + F: FountainCode + Clone + Send + Sync + 'static, + A: NetworkAdapter + Clone + Send + Sync + 'static, P: MemPool + Send + Sync + 'static, - T: Tally + Send + Sync + 'static, - T::Settings: Send + Sync + 'static, - T::Outcome: Send + Sync, - T::Qc: Clone + Send + Sync, P::Settings: Send + Sync + 'static, - P::Tx: Debug + Clone + serde::de::DeserializeOwned + Send + Sync + 'static, + P::Tx: + Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static, ::Hash: Debug + Send + Sync, M: MempoolAdapter + Send + Sync + 'static, - O: Overlay::Hash> + Send + Sync + 'static, + O: Overlay + Debug + Send + Sync + 'static, { fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); @@ -138,7 +142,6 @@ where service_state, network_relay, _fountain: Default::default(), - _tally: Default::default(), _overlay: Default::default(), mempool_relay, }) @@ -160,224 +163,455 @@ where let CarnotSettings { private_key, fountain_settings, - tally_settings, + nodes, } = self.service_state.settings_reader.get_updated_settings(); - let network_adapter = A::new(network_relay).await; - - let tip = Tip; - - let fountain = F::new(fountain_settings); - let tally = T::new(tally_settings); - - let leadership = Leadership::::new(private_key, mempool_relay.clone()); - // FIXME: this should be taken from config - let mut cur_view = View { - seed: [0; 32], - staking_keys: BTreeMap::new(), - view_n: 0, + let overlay = O::new(nodes); + let genesis = consensus_engine::Block { + id: [0; 32], + view: 0, + parent_qc: Qc::Standard(StandardQc::genesis()), }; - loop { - // if we want to process multiple views at the same time this can - // be spawned as a separate future + let mut carnot = Carnot::from_genesis(private_key, genesis, overlay); + let network_adapter = A::new(network_relay).await; + let adapter = &network_adapter; + let _self_committee = carnot.self_committee(); + let self_committee = &_self_committee; + let _leader_committee = [carnot.id()].into_iter().collect(); + let leader_committee = &_leader_committee; + let fountain = F::new(fountain_settings); + let _tally_settings = CarnotTallySettings { + threshold: carnot.super_majority_threshold(), + participating_nodes: carnot.child_committees().into_iter().flatten().collect(), + }; + let tally_settings = &_tally_settings; + let _leader_tally_settings = CarnotTallySettings { + threshold: carnot.leader_super_majority_threshold(), + // TODO: add children of root committee + participating_nodes: carnot.root_committee(), + }; + let leader_tally_settings = &_leader_tally_settings; - // FIXME: this should probably have a timer to detect failed rounds - let res = cur_view - .resolve::( - private_key, - &tip, - &network_adapter, - &fountain, - &tally, - &leadership, + let mut view_cancel_cache = ViewCancelCache::new(); + + let events: FuturesUnordered> + Send>>> = + FuturesUnordered::new(); + let genesis_block = carnot.genesis_block(); + events.push(Box::pin(view_cancel_cache.cancelable_event_future( + genesis_block.view + 1, + Self::gather_block(adapter, genesis_block.view + 1), + ))); + events.push(Box::pin(view_cancel_cache.cancelable_event_future( + genesis_block.view, + Self::gather_votes( + adapter, + self_committee, + genesis_block.clone(), + tally_settings.clone(), + ), + ))); + if carnot.is_leader_for_view(genesis_block.view + 1) { + events.push(Box::pin(view_cancel_cache.cancelable_event_future( + genesis_block.view + 1, + async move { + let Event::Approve { qc, .. } = Self::gather_votes( + adapter, + leader_committee, + genesis_block, + leader_tally_settings.clone(), ) - .await; - match res { - Ok((block, view)) => { - // resolved block, mark as verified and possibly update the tip - // not sure what mark as verified means, e.g. if we want an event subscription - // system for this to be used for example by the ledger, storage and mempool + .await else { unreachable!() }; + Event::ProposeBlock { qc } + }, + ))); + } + tokio::pin!(events); + + while let Some(event) = events.next().await { + let mut output = None; + let prev_view = carnot.current_view(); + match event { + Event::Proposal { block, mut stream } => { + tracing::debug!("received proposal {:?}", block); + let block = block.header().clone(); + match carnot.receive_block(block.clone()) { + Ok(new_state) => { + let new_view = new_state.current_view(); + if new_view != carnot.current_view() { + events.push(Box::pin(view_cancel_cache.cancelable_event_future( + block.view, + Self::gather_votes( + adapter, + self_committee, + block.clone(), + tally_settings.clone(), + ), + ))); + } else { + events.push(Box::pin(view_cancel_cache.cancelable_event_future( + block.view, + async move { + if let Some(block) = stream.next().await { + Event::Proposal { block, stream } + } else { + Event::None + } + }, + ))); + } + carnot = new_state; + } + Err(_) => tracing::debug!("invalid block {:?}", block), + } + if carnot.is_leader_for_view(block.view + 1) { + events.push(Box::pin(view_cancel_cache.cancelable_event_future( + block.view, + async move { + let Event::Approve { qc, .. } = Self::gather_votes( + adapter, + leader_committee, + block, + leader_tally_settings.clone(), + ) + .await else { unreachable!() }; + Event::ProposeBlock { qc } + }, + ))); + } + } + Event::Approve { block, .. } => { + tracing::debug!("approving proposal {:?}", block); + let (new_carnot, out) = carnot.approve_block(block.clone()); + carnot = new_carnot; + output = Some(Output::Send::(out)); + } + Event::LocalTimeout => { + tracing::debug!("local timeout"); + let (new_carnot, out) = carnot.local_timeout(); + carnot = new_carnot; + output = out.map(Output::Send); + } + Event::NewView { + timeout_qc, + new_views, + } => { + tracing::debug!("approving new view {:?}", timeout_qc); + let (new_carnot, out) = carnot.approve_new_view(timeout_qc.clone(), new_views); + carnot = new_carnot; + output = Some(Output::Send(out)); + let next_view = timeout_qc.view + 2; + if carnot.is_leader_for_view(next_view) { + let high_qc = carnot.high_qc(); + events.push(Box::pin(view_cancel_cache.cancelable_event_future( + timeout_qc.view + 1, + async move { + let _votes = Self::gather_new_views( + adapter, + leader_committee, + timeout_qc, + leader_tally_settings.clone(), + ) + .await; + Event::ProposeBlock { + qc: Qc::Aggregated(AggregateQc { + high_qc, + view: next_view, + }), + } + }, + ))); + } + } + Event::TimeoutQc { timeout_qc } => { + tracing::debug!("timeout received {:?}", timeout_qc); + carnot = carnot.receive_timeout_qc(timeout_qc.clone()); + events.push(Box::pin(view_cancel_cache.cancelable_event_future( + timeout_qc.view + 1, + Self::gather_new_views( + adapter, + self_committee, + timeout_qc, + tally_settings.clone(), + ), + ))); + } + Event::RootTimeout { timeouts } => { + tracing::debug!("root timeout {:?}", timeouts); + // timeout detected + } + Event::ProposeBlock { qc } => { + tracing::debug!("proposing block"); + let (reply_channel, rx) = tokio::sync::oneshot::channel(); mempool_relay - .send(nomos_mempool::MempoolMsg::MarkInBlock { - ids: block.transactions().cloned().collect(), - block: block.header().id(), + .send(MempoolMsg::View { + ancestor_hint: [0; 32], + reply_channel, }) .await - .map_err(|(e, _)| { - tracing::error!("Error while sending MarkInBlock message: {}", e); - e - })?; + .unwrap_or_else(|(e, _)| { + eprintln!("Could not get transactions from mempool {e}") + }); + match rx.await { + Ok(txs) => { + let proposal = Block::new(qc.view() + 1, qc, txs); + output = Some(Output::BroadcastProposal { proposal }); + } + Err(e) => tracing::error!("Could not fetch txs {e}"), + } + } + Event::None => {} + } - cur_view = view; - } - Err(e) => { - tracing::error!("Error while resolving view: {}", e); + let current_view = carnot.current_view(); + if current_view != prev_view { + // First we cancel previous processing view tasks + view_cancel_cache.cancel(prev_view); + tracing::debug!("Advanced view from {prev_view} to {current_view}"); + // View change! + events.push(Box::pin(view_cancel_cache.cancelable_event_future( + current_view, + async { + tokio::time::sleep(TIMEOUT).await; + Event::LocalTimeout + }, + ))); + events.push(Box::pin(view_cancel_cache.cancelable_event_future( + current_view + 1, + Self::gather_block(adapter, current_view + 1), + ))); + events.push(Box::pin(view_cancel_cache.cancelable_event_future( + current_view, + Self::gather_timeout_qc(adapter, current_view), + ))); + if carnot.is_member_of_root_committee() { + let threshold = carnot.leader_super_majority_threshold(); + events.push(Box::pin(view_cancel_cache.cancelable_event_future( + current_view, + Self::gather_timeout(adapter, self_committee, current_view, threshold), + ))); } } + + if let Some(output) = output { + handle_output(adapter, &fountain, carnot.id(), output).await; + } + } + + unreachable!("carnot exited"); + } +} + +#[allow(dead_code)] // TODO: remove this when using broadcasting events +enum Output { + Send(consensus_engine::Send), + BroadcastTimeoutQc { timeout_qc: TimeoutQc }, + BroadcastProposal { proposal: Block }, +} + +impl CarnotConsensus +where + F: FountainCode + Clone + Send + Sync + 'static, + A: NetworkAdapter + Clone + Send + Sync + 'static, + P: MemPool + Send + Sync + 'static, + P::Settings: Send + Sync + 'static, + P::Tx: + Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static, + ::Hash: Debug + Send + Sync, + M: MempoolAdapter + Send + Sync + 'static, + O: Overlay + Debug + Send + Sync + 'static, +{ + async fn gather_timeout_qc(adapter: &A, view: consensus_engine::View) -> Event { + if let Some(timeout_qc) = adapter + .timeout_qc_stream(view) + .await + .map(|msg| msg.qc) + .next() + .await + { + Event::TimeoutQc { timeout_qc } + } else { + Event::None + } + } + + async fn gather_votes( + adapter: &A, + committee: &Committee, + block: consensus_engine::Block, + tally: CarnotTallySettings, + ) -> Event { + let tally = CarnotTally::new(tally); + let votes_stream = adapter.votes_stream(committee, block.view, block.id).await; + match tally.tally(block.clone(), votes_stream).await { + Ok((qc, votes)) => Event::Approve { qc, votes, block }, + Err(_e) => { + todo!("Handle tally error {_e}"); + } } } -} -#[derive(Hash, Eq, PartialEq, Serialize, Deserialize)] -pub struct Approval; - -// Consensus round, also aids in guaranteeing synchronization -// between various data structures by means of lifetimes -pub struct View { - seed: Seed, - staking_keys: BTreeMap, - pub view_n: consensus_engine::View, -} - -impl View { - // TODO: might want to encode steps in the type system - pub async fn resolve<'view, A, O, F, T, Tx>( - &'view self, - node_id: NodeId, - tip: &Tip, + async fn gather_new_views( adapter: &A, - fountain: &F, - tally: &T, - leadership: &Leadership, - ) -> Result<(Block, View), Box> - where - A: NetworkAdapter + Send + Sync + 'static, - F: FountainCode, - Tx: Transaction, - Tx::Hash: Debug, - T: Tally + Send + Sync + 'static, - T::Outcome: Send + Sync, - T::Qc: Clone, - O: Overlay, - { - let res = if self.is_leader(node_id) { - let block = self - .resolve_leader::(node_id, tip, adapter, fountain, tally, leadership) - .await - .unwrap(); // FIXME: handle sad path - let next_view = self.generate_next_view(&block); - (block, next_view) - } else { - self.resolve_non_leader::(node_id, adapter, fountain, tally) - .await - .unwrap() // FIXME: handle sad path - }; - - // Commit phase: - // Upon verifing a block B, if B.parent = B' and B'.parent = B'' and - // B'.view = B''.view + 1, then the node commits B''. - // This happens implicitly at the chain level and does not require any - // explicit action from the node. - - Ok(res) - } - - async fn resolve_leader<'view, A, O, F, T, Tx>( - &'view self, - node_id: NodeId, - tip: &Tip, - adapter: &A, - fountain: &F, - tally: &T, - leadership: &Leadership, - ) -> Result::Hash>, ()> - where - A: NetworkAdapter + Send + Sync + 'static, - F: FountainCode, - T: Tally + Send + Sync + 'static, - T::Outcome: Send + Sync, - T::Qc: Clone, - Tx: Transaction, - Tx::Hash: Debug, - O: Overlay, - { - let overlay = O::new(self, node_id); - - // We need to build the QC for the block we are proposing - let qc = overlay.build_qc(self, adapter, tally).await; - - let LeadershipResult::Leader { block, _view } = leadership - .try_propose_block(self, tip, qc) - .await else { panic!("we are leader")}; - - overlay - .broadcast_block(self, block.clone(), adapter, fountain) + committee: &Committee, + timeout_qc: TimeoutQc, + tally: CarnotTallySettings, + ) -> Event { + let tally = NewViewTally::new(tally); + let stream = adapter + .new_view_stream(committee, timeout_qc.view + 1) .await; - - Ok(block) - } - - async fn resolve_non_leader<'view, A, O, F, T, Tx>( - &'view self, - node_id: NodeId, - adapter: &A, - fountain: &F, - tally: &T, - ) -> Result<(Block, View), ()> - where - A: NetworkAdapter + Send + Sync + 'static, - F: FountainCode, - T: Tally + Send + Sync + 'static, - T::Qc: Clone, - Tx: Transaction, - Tx::Hash: Debug, - O: Overlay, - { - let overlay = O::new(self, node_id); - // Consensus in Carnot is achieved in 2 steps from the point of view of a node: - // 1) The node receives a block proposal from a leader and verifies it - // 2) The node signals to the network its approval for the block. - // Depending on the overlay, this may require waiting for a certain number - // of other approvals. - - // 1) Collect and verify block proposal. - let block = overlay - .reconstruct_proposal_block(self, adapter, fountain) - .await - .unwrap(); // FIXME: handle sad path - - // TODO: verify - // TODO: reshare the block? - let next_view = self.generate_next_view(&block); - - // 2) Signal approval to the network - // We only consider the happy path for now - if self.pipelined_safe_block(&block) { - overlay - .approve_and_forward(self, &block, adapter, tally, &next_view) - .await - .unwrap(); // FIXME: handle sad path + match tally.tally(timeout_qc.clone(), stream).await { + Ok((_qc, new_views)) => Event::NewView { + timeout_qc, + new_views, + }, + Err(_e) => { + todo!("Handle tally error {_e}"); + } } - - Ok((block, next_view)) } - pub fn is_leader(&self, _node_id: NodeId) -> bool { - true + async fn gather_timeout( + adapter: &A, + committee: &Committee, + view: consensus_engine::View, + threshold: usize, + ) -> Event { + futures::pending!(); + let timeouts = adapter + .timeout_stream(committee, view) + .await + .take(threshold) + .map(|msg| msg.vote) + .collect() + .await; + Event::RootTimeout { timeouts } } - // TODO: use consensus_engine::View instead - pub fn id(&self) -> u64 { - self.view_n.try_into().unwrap() - } - - // Verifies the block is new and the previous leader did not fail - fn pipelined_safe_block( - &self, - _: &Block, - ) -> bool { - // return b.view_n >= self.view_n && b.view_n == b.qc.view_n - true - } - - fn generate_next_view(&self, _b: &Block) -> View { - let mut seed = self.seed; - seed[0] += 1; - View { - seed, - staking_keys: self.staking_keys.clone(), - view_n: self.view_n + 1, + async fn gather_block(adapter: &A, view: consensus_engine::View) -> Event { + let stream = adapter + .proposal_chunks_stream(view) + .await + .filter_map(move |msg| { + async move { + let proposal = Block::from_bytes(&msg.chunk); + if proposal.header().id == msg.proposal { + // TODO: Leader is faulty? what should we do? + Some(proposal) + } else { + None + } + } + }); + let mut stream = Box::pin(stream); + if let Some(block) = stream.next().await { + Event::Proposal { block, stream } + } else { + Event::None } } } + +async fn handle_output(adapter: &A, fountain: &F, node_id: NodeId, output: Output) +where + A: NetworkAdapter, + F: FountainCode, + Tx: Hash + Eq + Clone + Serialize + DeserializeOwned + Debug, +{ + match output { + Output::Send(consensus_engine::Send { to, payload }) => match payload { + Payload::Vote(vote) => { + adapter + .send( + &to, + vote.view, + VoteMsg { + voter: node_id, + vote, + qc: None, // TODO: handle root commmittee members + } + .as_bytes(), + "votes", + ) + .await; + } + Payload::Timeout(timeout) => { + adapter + .send( + &to, + timeout.view, + TimeoutMsg { + voter: node_id, + vote: timeout, + } + .as_bytes(), + "timeout", + ) + .await; + } + Payload::NewView(new_view) => { + adapter + .send( + &to, + new_view.view, + NewViewMsg { + voter: node_id, + vote: new_view, + } + .as_bytes(), + "new-view", + ) + .await; + } + }, + Output::BroadcastProposal { proposal } => { + fountain + .encode(&proposal.as_bytes()) + .for_each(|chunk| { + adapter.broadcast_block_chunk(ProposalChunkMsg { + proposal: proposal.header().id, + chunk: chunk.to_vec().into_boxed_slice(), + view: proposal.header().view, + }) + }) + .await; + } + Output::BroadcastTimeoutQc { timeout_qc } => { + adapter + .broadcast_timeout_qc(TimeoutQcMsg { + source: node_id, + qc: timeout_qc, + }) + .await; + } + } +} + +pub(crate) enum Event { + Proposal { + block: Block, + stream: Pin> + Send>>, + }, + #[allow(dead_code)] + Approve { + qc: Qc, + block: consensus_engine::Block, + votes: HashSet, + }, + LocalTimeout, + NewView { + timeout_qc: TimeoutQc, + new_views: HashSet, + }, + TimeoutQc { + timeout_qc: TimeoutQc, + }, + RootTimeout { + timeouts: HashSet, + }, + ProposeBlock { + qc: Qc, + }, + None, +} diff --git a/nomos-services/consensus/src/network/adapters/mock.rs b/nomos-services/consensus/src/network/adapters/mock.rs index dc5cfcb2..86a91196 100644 --- a/nomos-services/consensus/src/network/adapters/mock.rs +++ b/nomos-services/consensus/src/network/adapters/mock.rs @@ -1,4 +1,3 @@ -use bytes::Bytes; use futures::StreamExt; use nomos_network::{ backends::mock::{ @@ -7,25 +6,21 @@ use nomos_network::{ NetworkMsg, NetworkService, }; use overwatch_rs::services::{relay::OutboundRelay, ServiceData}; -use serde::de::DeserializeOwned; -use serde::Serialize; use tokio_stream::{wrappers::BroadcastStream, Stream}; -use crate::network::messages::TimeoutQcMsg; -use crate::{ - network::{ - messages::{ProposalChunkMsg, VoteMsg}, - NetworkAdapter, - }, - overlay::committees::Committee, +use crate::network::messages::{NewViewMsg, TimeoutMsg, TimeoutQcMsg}; +use crate::network::{ + messages::{ProposalChunkMsg, VoteMsg}, + NetworkAdapter, }; -use consensus_engine::{TimeoutQc, View}; +use consensus_engine::{BlockId, Committee, View}; const MOCK_PUB_SUB_TOPIC: &str = "MockPubSubTopic"; const MOCK_BLOCK_CONTENT_TOPIC: MockContentTopic = MockContentTopic::new("MockSim", 1, "MockBlock"); const MOCK_APPROVAL_CONTENT_TOPIC: MockContentTopic = MockContentTopic::new("MockSim", 1, "MockApproval"); +#[derive(Clone)] pub struct MockAdapter { network_relay: OutboundRelay< as ServiceData>::Message>, } @@ -65,7 +60,7 @@ impl NetworkAdapter for MockAdapter { async fn proposal_chunks_stream( &self, _view: View, - ) -> Box + Send + Sync + Unpin> { + ) -> Box + Send + Sync + Unpin> { let stream_channel = self .message_subscriber_channel() .await @@ -80,9 +75,7 @@ impl NetworkAdapter for MockAdapter { == message.content_topic().content_topic_name { let payload = message.payload(); - Some(Bytes::from( - ProposalChunkMsg::from_bytes(payload.as_bytes()).chunk, - )) + Some(ProposalChunkMsg::from_bytes(payload.as_bytes())) } else { None } @@ -117,24 +110,33 @@ impl NetworkAdapter for MockAdapter { todo!() } - async fn timeout_qc_stream( + async fn timeout_stream( &self, + _committee: &Committee, _view: View, - ) -> Box + Send + Sync + Unpin> { + ) -> Box + Send + Sync + Unpin> { todo!() } - async fn votes_stream( + async fn timeout_qc_stream( &self, - _committee: Committee, _view: View, - ) -> Box + Send> { + ) -> Box + Send + Sync + Unpin> { + todo!() + } + + async fn votes_stream( + &self, + _committee: &Committee, + _view: View, + _proposal_id: BlockId, + ) -> Box + Send + Unpin> { let stream_channel = self .message_subscriber_channel() .await .unwrap_or_else(|_e| todo!("handle error")); - Box::new( - BroadcastStream::new(stream_channel).filter_map(|msg| async move { + Box::new(Box::pin(BroadcastStream::new(stream_channel).filter_map( + |msg| async move { match msg { Ok(event) => match event { NetworkEvent::RawMessage(message) => { @@ -142,7 +144,7 @@ impl NetworkAdapter for MockAdapter { == message.content_topic().content_topic_name { let payload = message.payload(); - Some(VoteMsg::from_bytes(payload.as_bytes()).vote) + Some(VoteMsg::from_bytes(payload.as_bytes())) } else { None } @@ -150,20 +152,21 @@ impl NetworkAdapter for MockAdapter { }, Err(_e) => None, } - }), - ) + }, + ))) } - async fn send_vote( + async fn new_view_stream( &self, - _committee: Committee, + _committee: &Committee, _view: View, - approval_message: VoteMsg, - ) where - Vote: Send, - { + ) -> Box + Send + Unpin> { + todo!() + } + + async fn send(&self, _committee: &Committee, _view: View, payload: Box<[u8]>, _channel: &str) { let message = MockMessage::new( - String::from_utf8_lossy(&approval_message.as_bytes()).to_string(), + String::from_utf8_lossy(&payload).to_string(), MOCK_APPROVAL_CONTENT_TOPIC, 1, chrono::Utc::now().timestamp_nanos() as usize, diff --git a/nomos-services/consensus/src/network/adapters/waku.rs b/nomos-services/consensus/src/network/adapters/waku.rs index 040fdf8b..52947394 100644 --- a/nomos-services/consensus/src/network/adapters/waku.rs +++ b/nomos-services/consensus/src/network/adapters/waku.rs @@ -1,24 +1,22 @@ // std use std::borrow::Cow; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; // crates -use bytes::Bytes; use futures::{Stream, StreamExt}; use tokio_stream::wrappers::BroadcastStream; // internal -use crate::network::messages::TimeoutQcMsg; +use crate::network::messages::{NewViewMsg, TimeoutMsg, TimeoutQcMsg}; use crate::network::{ messages::{ProposalChunkMsg, VoteMsg}, NetworkAdapter, }; -use crate::overlay::committees::Committee; -use consensus_engine::{TimeoutQc, View}; +use consensus_engine::{BlockId, Committee, View}; use nomos_network::{ backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage}, NetworkMsg, NetworkService, }; use overwatch_rs::services::{relay::OutboundRelay, ServiceData}; -use serde::de::DeserializeOwned; -use serde::Serialize; use waku_bindings::{ ContentFilter, Encoding, StoreQuery, WakuContentTopic, WakuMessage, WakuPubSubTopic, }; @@ -29,6 +27,7 @@ pub const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic = const APPLICATION_NAME: &str = "CarnotSim"; const VERSION: usize = 1; +#[derive(Clone)] pub struct WakuAdapter { network_relay: OutboundRelay< as ServiceData>::Message>, } @@ -146,19 +145,16 @@ impl NetworkAdapter for WakuAdapter { async fn proposal_chunks_stream( &self, view: View, - ) -> Box + Send + Sync + Unpin> { + ) -> Box + Send + Sync + Unpin> { Box::new(Box::pin( - self.cached_stream_with_content_topic(PROPOSAL_CONTENT_TOPIC.clone()) + self.cached_stream_with_content_topic(PROPOSAL_CONTENT_TOPIC) .await .filter_map(move |message| { let payload = message.payload(); - let ProposalChunkMsg { - view: msg_view, - chunk, - } = ProposalChunkMsg::from_bytes(payload); + let proposal = ProposalChunkMsg::from_bytes(payload); async move { - if view == msg_view { - Some(Bytes::from(chunk)) + if view == proposal.view { + Some(proposal) } else { None } @@ -168,8 +164,24 @@ impl NetworkAdapter for WakuAdapter { } async fn broadcast_block_chunk(&self, chunk_message: ProposalChunkMsg) { - self.broadcast(chunk_message.as_bytes(), PROPOSAL_CONTENT_TOPIC) + let message = WakuMessage::new( + chunk_message.as_bytes(), + PROPOSAL_CONTENT_TOPIC, + 1, + chrono::Utc::now().timestamp_nanos() as usize, + [], + false, + ); + if let Err((_, _e)) = self + .network_relay + .send(NetworkMsg::Process(WakuBackendMessage::Broadcast { + message, + topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC), + })) .await + { + todo!("log error"); + }; } async fn broadcast_timeout_qc(&self, timeout_qc_msg: TimeoutQcMsg) { @@ -177,18 +189,41 @@ impl NetworkAdapter for WakuAdapter { .await } - async fn timeout_qc_stream( + async fn timeout_stream( &self, + committee: &Committee, view: View, - ) -> Box + Send + Sync + Unpin> { + ) -> Box + Send + Sync + Unpin> { + let content_topic = create_topic("timeout", committee, view); Box::new(Box::pin( - self.cached_stream_with_content_topic(TIMEOUT_QC_CONTENT_TOPIC.clone()) + self.cached_stream_with_content_topic(content_topic) .await .filter_map(move |message| { let payload = message.payload(); - let qc = TimeoutQcMsg::from_bytes(payload).qc; + let timeout = TimeoutMsg::from_bytes(payload); async move { - if qc.view > view { + if timeout.vote.view == view { + Some(timeout) + } else { + None + } + } + }), + )) + } + + async fn timeout_qc_stream( + &self, + view: View, + ) -> Box + Send + Sync + Unpin> { + Box::new(Box::pin( + self.cached_stream_with_content_topic(TIMEOUT_QC_CONTENT_TOPIC) + .await + .filter_map(move |message| { + let payload = message.payload(); + let qc = TimeoutQcMsg::from_bytes(payload); + async move { + if qc.qc.view > view { Some(qc) } else { None @@ -198,32 +233,51 @@ impl NetworkAdapter for WakuAdapter { )) } - async fn votes_stream( + async fn votes_stream( &self, - committee: Committee, + committee: &Committee, view: View, - ) -> Box + Send> { - let content_topic = votes_topic(committee, view); + proposal_id: BlockId, + ) -> Box + Send + Unpin> { + let content_topic = create_topic("votes", committee, view); + Box::new(Box::pin( + self.cached_stream_with_content_topic(content_topic) + .await + .filter_map(move |message| { + let payload = message.payload(); + let vote = VoteMsg::from_bytes(payload); + async move { + if vote.vote.block == proposal_id { + Some(vote) + } else { + None + } + } + }), + )) + } + + async fn new_view_stream( + &self, + committee: &Committee, + view: View, + ) -> Box + Send + Unpin> { + let content_topic = create_topic("votes", committee, view); Box::new(Box::pin( self.cached_stream_with_content_topic(content_topic) .await .map(|message| { let payload = message.payload(); - VoteMsg::from_bytes(payload).vote + NewViewMsg::from_bytes(payload) }), )) } - async fn send_vote( - &self, - committee: Committee, - view: View, - approval_message: VoteMsg, - ) { - let content_topic = votes_topic(committee, view); + async fn send(&self, committee: &Committee, view: View, payload: Box<[u8]>, channel: &str) { + let content_topic = create_topic(channel, committee, view); let message = WakuMessage::new( - approval_message.as_bytes(), + payload, content_topic, 1, chrono::Utc::now().timestamp_nanos() as usize, @@ -234,7 +288,7 @@ impl NetworkAdapter for WakuAdapter { .network_relay .send(NetworkMsg::Process(WakuBackendMessage::Broadcast { message, - topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()), + topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC), })) .await { @@ -243,11 +297,11 @@ impl NetworkAdapter for WakuAdapter { } } -fn votes_topic(committee: Committee, view: View) -> WakuContentTopic { +fn create_topic(tag: &str, committee: &Committee, view: View) -> WakuContentTopic { WakuContentTopic { application_name: Cow::Borrowed(APPLICATION_NAME), version: VERSION, - content_topic_name: Cow::Owned(format!("votes-{}-{}", committee.id(), view)), + content_topic_name: Cow::Owned(format!("{}-{}-{}", tag, hash_set(committee), view)), encoding: Encoding::Proto, } } @@ -256,3 +310,12 @@ const PROPOSAL_CONTENT_TOPIC: WakuContentTopic = WakuContentTopic::new(APPLICATION_NAME, VERSION, "proposal", Encoding::Proto); const TIMEOUT_QC_CONTENT_TOPIC: WakuContentTopic = WakuContentTopic::new(APPLICATION_NAME, VERSION, "timeout-qc", Encoding::Proto); + +// TODO: Maybe use a secure hasher instead +fn hash_set(c: &Committee) -> u64 { + let mut s = DefaultHasher::new(); + for e in c.iter() { + e.hash(&mut s); + } + s.finish() +} diff --git a/nomos-services/consensus/src/network/messages.rs b/nomos-services/consensus/src/network/messages.rs index 8c3afe61..73a1c9ff 100644 --- a/nomos-services/consensus/src/network/messages.rs +++ b/nomos-services/consensus/src/network/messages.rs @@ -1,15 +1,15 @@ // std // crates -use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; // internal use crate::NodeId; -use consensus_engine::{TimeoutQc, View}; +use consensus_engine::{BlockId, NewView, Qc, Timeout, TimeoutQc, View, Vote}; use nomos_core::wire; -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize, Debug)] pub struct ProposalChunkMsg { pub chunk: Box<[u8]>, + pub proposal: BlockId, pub view: View, } @@ -23,25 +23,47 @@ impl ProposalChunkMsg { } } -#[derive(Serialize, Deserialize)] -pub struct VoteMsg { - pub source: NodeId, +#[derive(Eq, PartialEq, Hash, Serialize, Deserialize, Clone)] +pub struct VoteMsg { + pub voter: NodeId, pub vote: Vote, + pub qc: Option, } -impl VoteMsg -where - Vote: Serialize, -{ +impl VoteMsg { pub fn as_bytes(&self) -> Box<[u8]> { wire::serialize(self).unwrap().into_boxed_slice() } + pub fn from_bytes(data: &[u8]) -> Self { + wire::deserialize(data).unwrap() + } } -impl VoteMsg -where - Vote: DeserializeOwned, -{ +#[derive(Serialize, Deserialize)] +pub struct NewViewMsg { + pub voter: NodeId, + pub vote: NewView, +} + +impl NewViewMsg { + pub fn as_bytes(&self) -> Box<[u8]> { + wire::serialize(self).unwrap().into_boxed_slice() + } + pub fn from_bytes(data: &[u8]) -> Self { + wire::deserialize(data).unwrap() + } +} + +#[derive(Serialize, Deserialize)] +pub struct TimeoutMsg { + pub voter: NodeId, + pub vote: Timeout, +} + +impl TimeoutMsg { + pub fn as_bytes(&self) -> Box<[u8]> { + wire::serialize(self).unwrap().into_boxed_slice() + } pub fn from_bytes(data: &[u8]) -> Self { wire::deserialize(data).unwrap() } @@ -57,7 +79,6 @@ impl TimeoutQcMsg { pub fn as_bytes(&self) -> Box<[u8]> { wire::serialize(self).unwrap().into_boxed_slice() } - pub fn from_bytes(data: &[u8]) -> Self { wire::deserialize(data).unwrap() } diff --git a/nomos-services/consensus/src/network/mod.rs b/nomos-services/consensus/src/network/mod.rs index e13eaf36..366cf8a8 100644 --- a/nomos-services/consensus/src/network/mod.rs +++ b/nomos-services/consensus/src/network/mod.rs @@ -2,19 +2,15 @@ pub mod adapters; pub mod messages; // std -use bytes::Bytes; // crates use futures::Stream; // internal -use crate::network::messages::{ProposalChunkMsg, TimeoutQcMsg, VoteMsg}; -use crate::overlay::committees::Committee; -use consensus_engine::{TimeoutQc, View}; +use crate::network::messages::{NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg}; +use consensus_engine::{BlockId, Committee, View}; use nomos_network::backends::NetworkBackend; use nomos_network::NetworkService; use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::ServiceData; -use serde::de::DeserializeOwned; -use serde::Serialize; #[async_trait::async_trait] pub trait NetworkAdapter { @@ -25,22 +21,28 @@ pub trait NetworkAdapter { async fn proposal_chunks_stream( &self, view: View, - ) -> Box + Send + Sync + Unpin>; + ) -> Box + Send + Sync + Unpin>; async fn broadcast_block_chunk(&self, chunk_msg: ProposalChunkMsg); async fn broadcast_timeout_qc(&self, timeout_qc_msg: TimeoutQcMsg); + async fn timeout_stream( + &self, + committee: &Committee, + view: View, + ) -> Box + Send + Sync + Unpin>; async fn timeout_qc_stream( &self, view: View, - ) -> Box + Send + Sync + Unpin>; - async fn votes_stream( + ) -> Box + Send + Sync + Unpin>; + async fn votes_stream( &self, - committee: Committee, + committee: &Committee, view: View, - ) -> Box + Send>; - async fn send_vote( + proposal_id: BlockId, + ) -> Box + Send + Unpin>; + async fn new_view_stream( &self, - committee: Committee, + committee: &Committee, view: View, - vote: VoteMsg, - ); + ) -> Box + Send + Unpin>; + async fn send(&self, committee: &Committee, view: View, payload: Box<[u8]>, channel: &str); } diff --git a/nomos-services/consensus/src/overlay.rs b/nomos-services/consensus/src/overlay.rs new file mode 100644 index 00000000..18b112ff --- /dev/null +++ b/nomos-services/consensus/src/overlay.rs @@ -0,0 +1,65 @@ +use consensus_engine::{Committee, NodeId, Overlay, View}; + +#[derive(Clone, Debug)] +/// Flat overlay with a single committee and round robin leader selection. +pub struct FlatRoundRobin { + nodes: Vec, +} + +impl Overlay for FlatRoundRobin { + fn new(nodes: Vec) -> Self { + Self { nodes } + } + + fn root_committee(&self) -> consensus_engine::Committee { + self.nodes.clone().into_iter().collect() + } + + fn rebuild(&mut self, _timeout_qc: consensus_engine::TimeoutQc) { + todo!() + } + + fn is_member_of_child_committee(&self, _parent: NodeId, _child: NodeId) -> bool { + false + } + + fn is_member_of_root_committee(&self, _id: NodeId) -> bool { + true + } + + fn is_member_of_leaf_committee(&self, _id: NodeId) -> bool { + true + } + + fn is_child_of_root_committee(&self, _id: NodeId) -> bool { + false + } + + fn parent_committee(&self, _id: NodeId) -> consensus_engine::Committee { + Committee::new() + } + + fn node_committee(&self, _id: NodeId) -> consensus_engine::Committee { + self.nodes.clone().into_iter().collect() + } + + fn child_committees(&self, _id: NodeId) -> Vec { + vec![] + } + + fn leaf_committees(&self, _id: NodeId) -> Vec { + vec![self.root_committee()] + } + + fn leader(&self, view: View) -> NodeId { + self.nodes[view as usize % self.nodes.len()] + } + + fn super_majority_threshold(&self, _id: NodeId) -> usize { + 0 + } + + fn leader_super_majority_threshold(&self, _id: NodeId) -> usize { + self.nodes.len() * 2 / 3 + 1 + } +} diff --git a/nomos-services/consensus/src/overlay/committees.rs b/nomos-services/consensus/src/overlay/committees.rs deleted file mode 100644 index 95c13bc7..00000000 --- a/nomos-services/consensus/src/overlay/committees.rs +++ /dev/null @@ -1,189 +0,0 @@ -// std -use std::hash::Hash; -// crates -use futures::StreamExt; -use nomos_core::wire::deserializer; -use rand::{seq::SliceRandom, SeedableRng}; -// internal -use super::*; -use crate::network::messages::ProposalChunkMsg; -use crate::network::NetworkAdapter; - -/// View of the tree overlay centered around a specific member -pub struct Member { - // id is not used now, but gonna probably used it for later checking later on - #[allow(dead_code)] - id: NodeId, - committee: Committee, - committees: Committees, - view_n: consensus_engine::View, -} - -/// #Just a newtype index to be able to implement parent/children methods -#[derive(Copy, Clone)] -pub struct Committee(usize); - -pub struct Committees { - nodes: Box<[NodeId]>, -} - -impl Committees { - pub fn new(view: &View) -> Self { - let mut nodes = view.staking_keys.keys().cloned().collect::>(); - let mut rng = rand_chacha::ChaCha20Rng::from_seed(view.seed); - nodes.shuffle(&mut rng); - Self { nodes } - } - - pub fn into_member(self, id: NodeId, view: &View) -> Option> { - let member_idx = self.nodes.iter().position(|m| m == &id)?; - Some(Member { - committee: Committee(member_idx / C), - committees: self, - id, - view_n: view.view_n, - }) - } - - fn get_committee_members(&self, committee: Committee) -> Option<&[NodeId]> { - let leftb = committee.0 * C; - let rightb = std::cmp::min(self.nodes.len(), leftb + C); - - if leftb < rightb { - Some(&self.nodes[leftb..rightb]) - } else { - None - } - } -} - -impl Committee { - pub const fn root() -> Self { - Self(0) - } - - pub fn id(&self) -> usize { - self.0 - } - - /// Return the left and right children committee, if any - pub fn children(&self) -> (Committee, Committee) { - ( - // left child - Committee(self.0 * 2 + 1), - // right child - Committee(self.0 + 2 + 2), - ) - } - - /// Return the parent committee, if any - pub fn parent(&self) -> Option { - if self.0 == 0 { - None - } else { - Some(Committee((self.0 - 1) / 2)) - } - } -} - -impl Member { - /// Return other members of this committee - pub fn peers(&self) -> &[NodeId] { - self.committees - .get_committee_members(self.committee) - .unwrap() - } - - /// Return the participant in the parent committee this member should interact - /// with - pub fn parent_committee(&self) -> Option { - self.committee.parent() - } - - // Return participants in the children committees this member should interact with - pub fn children_committes(&self) -> (Option, Option) { - let (left, right) = self.committee.children(); - ( - self.committees.get_committee_members(left).map(|_| left), - self.committees.get_committee_members(right).map(|_| right), - ) - } -} - -#[async_trait::async_trait] -impl Overlay - for Member -where - Network: NetworkAdapter + Sync, - Fountain: FountainCode + Sync, - VoteTally: Tally + Sync, - VoteTally::Qc: serde::de::DeserializeOwned + Clone + Send + Sync + 'static, - TxId: serde::de::DeserializeOwned + Clone + Hash + Eq + Send + Sync + 'static, -{ - // we still need view here to help us initialize - fn new(view: &View, node: NodeId) -> Self { - let committees = Committees::new(view); - committees.into_member(node, view).unwrap() - } - - async fn reconstruct_proposal_block( - &self, - view: &View, - adapter: &Network, - fountain: &Fountain, - ) -> Result, FountainError> { - assert_eq!(view.view_n, self.view_n, "view_n mismatch"); - let message_stream = adapter.proposal_chunks_stream(view.view_n).await; - fountain.decode(message_stream).await.and_then(|b| { - deserializer(&b) - .deserialize::>() - .map_err(|e| FountainError::from(e.to_string().as_str())) - }) - } - - async fn broadcast_block( - &self, - view: &View, - block: Block, - adapter: &Network, - fountain: &Fountain, - ) { - assert_eq!(view.view_n, self.view_n, "view_n mismatch"); - let block_bytes = block.as_bytes(); - let encoded_stream = fountain.encode(&block_bytes); - encoded_stream - .for_each_concurrent(None, |chunk| async move { - let message = ProposalChunkMsg { - chunk: chunk.to_vec().into_boxed_slice(), - view: view.view_n, - }; - adapter.broadcast_block_chunk(message.clone()).await; - }) - .await; - } - - async fn approve_and_forward( - &self, - view: &View, - _block: &Block, - _adapter: &Network, - _tally: &VoteTally, - _next_view: &View, - ) -> Result<(), Box> { - assert_eq!(view.view_n, self.view_n, "view_n mismatch"); - // roughly, we want to do something like this: - // 1. wait for left and right children committees to approve - // 2. approve the block - // 3. forward the approval to the parent committee - // - // However this will likely change depending on the position - // of the committee in the tree - todo!() - } - - async fn build_qc(&self, view: &View, _adapter: &Network, _tally: &VoteTally) -> VoteTally::Qc { - assert_eq!(view.view_n, self.view_n, "view_n mismatch"); - // maybe the leader publishing the QC? - todo!() - } -} diff --git a/nomos-services/consensus/src/overlay/flat.rs b/nomos-services/consensus/src/overlay/flat.rs deleted file mode 100644 index 353df9e2..00000000 --- a/nomos-services/consensus/src/overlay/flat.rs +++ /dev/null @@ -1,128 +0,0 @@ -// std -use std::error::Error; -use std::hash::Hash; -// crates -use futures::StreamExt; -use serde::de::DeserializeOwned; -use serde::Serialize; -// internal -use super::*; -use crate::network::messages::{ProposalChunkMsg, VoteMsg}; -use crate::network::NetworkAdapter; -use crate::overlay::committees::Committee; -use nomos_core::wire::deserializer; - -const FLAT_COMMITTEE: Committee = Committee::root(); - -/// A flat overlay, everyone is in the same committee. -/// As far as the API is concerned, this should be equivalent to any other -/// overlay and far simpler to implement. -/// For this reason, this might act as a 'reference' overlay for testing. -pub struct Flat { - // TODO: this should be a const param, but we can't do that yet - node_id: NodeId, - view_n: consensus_engine::View, -} - -impl Flat { - pub fn new(view_n: consensus_engine::View, node_id: NodeId) -> Self { - Self { node_id, view_n } - } - - fn approve(&self, _block: &Block) -> Approval { - // we still need to define how votes look like - Approval - } -} - -#[async_trait::async_trait] -impl Overlay for Flat -where - TxId: serde::de::DeserializeOwned + Clone + Eq + Hash + Send + Sync + 'static, - Network: NetworkAdapter + Sync, - Fountain: FountainCode + Sync, - VoteTally: Tally + Sync, - VoteTally::Vote: Serialize + DeserializeOwned + Send, - VoteTally::Qc: Clone + DeserializeOwned + Send + Sync + 'static, -{ - fn new(view: &View, node: NodeId) -> Self { - Flat::new(view.view_n, node) - } - - async fn reconstruct_proposal_block( - &self, - view: &View, - adapter: &Network, - fountain: &Fountain, - ) -> Result, FountainError> { - assert_eq!(view.view_n, self.view_n, "view_n mismatch"); - let message_stream = adapter.proposal_chunks_stream(view.view_n).await; - fountain.decode(message_stream).await.and_then(|b| { - deserializer(&b) - .deserialize::>() - .map_err(|e| FountainError::from(e.to_string().as_str())) - }) - } - - async fn broadcast_block( - &self, - view: &View, - block: Block, - adapter: &Network, - fountain: &Fountain, - ) { - assert_eq!(view.view_n, self.view_n, "view_n mismatch"); - let block_bytes = block.as_bytes(); - let encoded_stream = fountain.encode(&block_bytes); - encoded_stream - .for_each_concurrent(None, |chunk| async move { - let message = ProposalChunkMsg { - chunk: chunk.to_vec().into_boxed_slice(), - view: view.view_n, - }; - adapter.broadcast_block_chunk(message).await; - }) - .await; - } - - async fn approve_and_forward( - &self, - view: &View, - block: &Block, - adapter: &Network, - _tally: &VoteTally, - _next_view: &View, - ) -> Result<(), Box> { - assert_eq!(view.view_n, self.view_n, "view_n mismatch"); - // in the flat overlay, there's no need to wait for anyone before approving the block - let approval = self.approve(block); - adapter - .send_vote( - FLAT_COMMITTEE, - view.view_n, - VoteMsg { - vote: approval, - source: self.node_id, - }, - ) - .await; - Ok(()) - } - - async fn build_qc(&self, view: &View, adapter: &Network, tally: &VoteTally) -> VoteTally::Qc { - assert_eq!(view.view_n, self.view_n, "view_n mismatch"); - - // for now, let's pretend that consensus is reached as soon as the - // block is approved by a share of the nodes - let stream = Box::into_pin(adapter.votes_stream(FLAT_COMMITTEE, view.view_n).await); - - // Shadow the original binding so that it can't be directly accessed - // ever again. - // TODO: Remove the `try_into` call when tally is refactored to use with latest consensus engine types - if let Ok((qc, _)) = tally.tally(view.view_n.try_into().unwrap(), stream).await { - qc - } else { - unimplemented!("consensus not reached") - } - } -} diff --git a/nomos-services/consensus/src/overlay/mod.rs b/nomos-services/consensus/src/overlay/mod.rs deleted file mode 100644 index 502ea4c5..00000000 --- a/nomos-services/consensus/src/overlay/mod.rs +++ /dev/null @@ -1,59 +0,0 @@ -pub mod committees; -mod flat; - -// std -use std::error::Error; -use std::hash::Hash; -// crates -// internal -use super::{Approval, NodeId, View}; -use crate::network::NetworkAdapter; -pub use committees::Member; -use nomos_core::block::Block; -use nomos_core::fountain::{FountainCode, FountainError}; -use nomos_core::vote::Tally; - -/// Dissemination overlay, tied to a specific view -#[async_trait::async_trait] -pub trait Overlay< - Network: NetworkAdapter, - Fountain: FountainCode, - VoteTally: Tally, - TxId: Clone + Eq + Hash, -> where - VoteTally::Qc: Clone, -{ - fn new(view: &View, node: NodeId) -> Self; - - async fn reconstruct_proposal_block( - &self, - view: &View, - adapter: &Network, - fountain: &Fountain, - ) -> Result, FountainError>; - async fn broadcast_block( - &self, - view: &View, - block: Block, - adapter: &Network, - fountain: &Fountain, - ); - /// Different overlays might have different needs or the same overlay might - /// require different steps depending on the node role - /// For now let's put this responsibility on the overlay - async fn approve_and_forward( - &self, - view: &View, - block: &Block, - adapter: &Network, - vote_tally: &VoteTally, - next_view: &View, - ) -> Result<(), Box>; - /// Wait for consensus on a block - async fn build_qc( - &self, - view: &View, - adapter: &Network, - vote_tally: &VoteTally, - ) -> VoteTally::Qc; -} diff --git a/nomos-services/consensus/src/tally/happy.rs b/nomos-services/consensus/src/tally/happy.rs new file mode 100644 index 00000000..fb7c3c4a --- /dev/null +++ b/nomos-services/consensus/src/tally/happy.rs @@ -0,0 +1,85 @@ +#![allow(dead_code)] +// TODO: Well, remove this when we actually use the fields from the specification +// std +use std::collections::HashSet; +// crates +use futures::{Stream, StreamExt}; + +// internal +use super::CarnotTallySettings; +use crate::network::messages::VoteMsg; +use consensus_engine::{Block, Qc, StandardQc, Vote}; +use nomos_core::crypto::PublicKey; +use nomos_core::vote::Tally; + +pub type NodeId = PublicKey; + +#[derive(thiserror::Error, Debug)] +pub enum CarnotTallyError { + #[error("Received invalid vote: {0}")] + InvalidVote(String), + #[error("Did not receive enough votes")] + InsufficientVotes, +} + +#[derive(Clone, Debug)] +pub struct CarnotTally { + settings: CarnotTallySettings, +} + +#[async_trait::async_trait] +impl Tally for CarnotTally { + type Vote = VoteMsg; + type Qc = Qc; + type Subject = Block; + type Outcome = HashSet; + type TallyError = CarnotTallyError; + type Settings = CarnotTallySettings; + + fn new(settings: Self::Settings) -> Self { + Self { settings } + } + + async fn tally + Unpin + Send>( + &self, + block: Block, + mut vote_stream: S, + ) -> Result<(Self::Qc, Self::Outcome), Self::TallyError> { + let mut seen = HashSet::new(); + let mut outcome = HashSet::new(); + // return early for leaf nodes + if self.settings.threshold == 0 { + return Ok(( + Qc::Standard(StandardQc { + view: block.view, + id: block.id, + }), + outcome, + )); + } + while let Some(vote) = vote_stream.next().await { + // check vote view is valid + if vote.vote.view != block.view || vote.vote.block != block.id { + continue; + } + + // check for individual nodes votes + if !self.settings.participating_nodes.contains(&vote.voter) { + continue; + } + + seen.insert(vote.voter); + outcome.insert(vote.vote.clone()); + if seen.len() >= self.settings.threshold { + return Ok(( + Qc::Standard(StandardQc { + view: vote.vote.view, + id: vote.vote.block, + }), + outcome, + )); + } + } + unreachable!() + } +} diff --git a/nomos-services/consensus/src/tally/mod.rs b/nomos-services/consensus/src/tally/mod.rs new file mode 100644 index 00000000..bcf73ef6 --- /dev/null +++ b/nomos-services/consensus/src/tally/mod.rs @@ -0,0 +1,18 @@ +pub mod happy; +pub mod unhappy; + +// std +use std::collections::HashSet; + +// crates +use serde::{Deserialize, Serialize}; + +// internal +use consensus_engine::NodeId; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CarnotTallySettings { + pub threshold: usize, + // TODO: this probably should be dynamic and should change with the view (?) + pub participating_nodes: HashSet, +} diff --git a/nomos-services/consensus/src/tally/unhappy.rs b/nomos-services/consensus/src/tally/unhappy.rs new file mode 100644 index 00000000..68500489 --- /dev/null +++ b/nomos-services/consensus/src/tally/unhappy.rs @@ -0,0 +1,67 @@ +// std +use std::collections::HashSet; +// crates +use futures::{Stream, StreamExt}; +use serde::{Deserialize, Serialize}; +// internal +use super::CarnotTallySettings; +use crate::network::messages::NewViewMsg; +use consensus_engine::{NewView, TimeoutQc}; +use nomos_core::vote::Tally; + +#[derive(thiserror::Error, Debug)] +pub enum NewViewTallyError { + #[error("Did not receive enough votes")] + InsufficientVotes, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct NewViewTally { + settings: CarnotTallySettings, +} + +#[async_trait::async_trait] +impl Tally for NewViewTally { + type Vote = NewViewMsg; + type Qc = (); + type Subject = TimeoutQc; + type Outcome = HashSet; + type TallyError = NewViewTallyError; + type Settings = CarnotTallySettings; + + fn new(settings: Self::Settings) -> Self { + Self { settings } + } + + async fn tally + Unpin + Send>( + &self, + timeout_qc: TimeoutQc, + mut vote_stream: S, + ) -> Result<(Self::Qc, Self::Outcome), Self::TallyError> { + let mut seen = HashSet::new(); + let mut outcome = HashSet::new(); + + // return early for leaf nodes + if self.settings.threshold == 0 { + return Ok(((), outcome)); + } + + while let Some(vote) = vote_stream.next().await { + // check vote view is valid + if !vote.vote.view != timeout_qc.view { + continue; + } + + // check for individual nodes votes + if !self.settings.participating_nodes.contains(&vote.voter) { + continue; + } + seen.insert(vote.voter); + outcome.insert(vote.vote.clone()); + if seen.len() >= self.settings.threshold { + return Ok(((), outcome)); + } + } + Err(NewViewTallyError::InsufficientVotes) + } +} diff --git a/nomos-services/consensus/src/tip.rs b/nomos-services/consensus/src/tip.rs deleted file mode 100644 index 2bdd57a9..00000000 --- a/nomos-services/consensus/src/tip.rs +++ /dev/null @@ -1,2 +0,0 @@ -/// Assuming determining which tip to consider is integral part of consensus -pub struct Tip; diff --git a/nomos-services/consensus/src/view_cancel.rs b/nomos-services/consensus/src/view_cancel.rs new file mode 100644 index 00000000..a94524ba --- /dev/null +++ b/nomos-services/consensus/src/view_cancel.rs @@ -0,0 +1,70 @@ +use crate::Event; +use consensus_engine::View; +use std::collections::HashMap; +use std::future::Future; +use std::hash::Hash; +use tokio::select; +use tokio_util::sync::CancellationToken; + +pub struct ViewCancel(CancellationToken); + +impl ViewCancel { + pub fn new() -> Self { + ViewCancel(CancellationToken::new()) + } + + pub fn cancel(&self) { + self.0.cancel(); + } + + pub fn cancel_token(&self) -> CancellationToken { + self.0.clone() + } +} + +impl Drop for ViewCancel { + fn drop(&mut self) { + if !self.0.is_cancelled() { + self.cancel(); + } + } +} + +pub struct ViewCancelCache { + cancels: HashMap, +} + +impl ViewCancelCache { + pub fn new() -> Self { + ViewCancelCache { + cancels: HashMap::new(), + } + } + + pub fn cancel(&mut self, view: View) { + if let Some(cancel) = self.cancels.remove(&view) { + cancel.cancel(); + } + } + + pub fn cancel_token(&mut self, view: View) -> CancellationToken { + self.cancels + .entry(view) + .or_insert_with(ViewCancel::new) + .cancel_token() + } + + pub(crate) fn cancelable_event_future>>( + &mut self, + view: View, + f: F, + ) -> impl Future> { + let token = self.cancel_token(view); + async move { + select! { + event = f => event, + _ = token.cancelled() => Event::None, + } + } + } +}