diff --git a/nomos-core/src/lib.rs b/nomos-core/src/lib.rs index 39aec6ca..cb48e37b 100644 --- a/nomos-core/src/lib.rs +++ b/nomos-core/src/lib.rs @@ -4,4 +4,5 @@ pub mod crypto; pub mod fountain; pub mod staking; pub mod tx; +pub mod vote; pub mod wire; diff --git a/nomos-core/src/vote/carnot.rs b/nomos-core/src/vote/carnot.rs new file mode 100644 index 00000000..cebe8d6f --- /dev/null +++ b/nomos-core/src/vote/carnot.rs @@ -0,0 +1,117 @@ +#![allow(dead_code)] +// TODO: Well, remove this when we actually use the fields from the specification +// std + +use std::collections::HashSet; +// crates +use futures::{Stream, StreamExt}; +// internal +use crate::block::BlockId; +use crate::crypto::PublicKey; +use crate::vote::Tally; + +pub type NodeId = PublicKey; + +pub enum QuorumCertificate { + Simple(SimpleQuorumCertificate), + Aggregated(AggregatedQuorumCertificate), +} + +impl QuorumCertificate { + pub fn view(&self) -> u64 { + match self { + QuorumCertificate::Simple(qc) => qc.view, + QuorumCertificate::Aggregated(qc) => qc.view, + } + } +} + +pub struct SimpleQuorumCertificate { + view: u64, + block: BlockId, +} + +pub struct AggregatedQuorumCertificate { + view: u64, + high_qh: Box, +} + +pub struct Vote { + block: BlockId, + view: u64, + voter: NodeId, // TODO: this should be some id, probably the node pk + qc: Option, +} + +impl Vote { + pub fn valid_view(&self, view: u64) -> bool { + self.view == view && self.qc.as_ref().map_or(true, |qc| qc.view() == view - 1) + } +} + +#[derive(thiserror::Error, Debug)] +pub enum CarnotTallyError { + #[error("Received invalid vote: {0}")] + InvalidVote(String), + #[error("Did not receive enough votes")] + InsufficientVotes, +} + +#[derive(Clone)] +pub struct CarnotTallySettings { + threshold: usize, + // TODO: this probably should be dynamic and should change with the view (?) + participating_nodes: HashSet, +} + +pub struct CarnotTally { + settings: CarnotTallySettings, +} + +#[async_trait::async_trait] +impl Tally for CarnotTally { + type Vote = Vote; + type Outcome = QuorumCertificate; + type TallyError = CarnotTallyError; + type Settings = CarnotTallySettings; + + fn new(settings: Self::Settings) -> Self { + Self { settings } + } + + async fn tally + Unpin + Send>( + &self, + view: u64, + mut vote_stream: S, + ) -> Result { + let mut approved = 0usize; + let mut seen = HashSet::new(); + while let Some(vote) = vote_stream.next().await { + // check vote view is valid + if !vote.valid_view(view) { + return Err(CarnotTallyError::InvalidVote("Invalid view".to_string())); + } + // check for duplicated votes + if seen.contains(&vote.voter) { + return Err(CarnotTallyError::InvalidVote( + "Double voted node".to_string(), + )); + } + // check for individual nodes votes + if !self.settings.participating_nodes.contains(&vote.voter) { + return Err(CarnotTallyError::InvalidVote( + "Non-participating node".to_string(), + )); + } + seen.insert(vote.voter); + approved += 1; + if approved >= self.settings.threshold { + return Ok(QuorumCertificate::Simple(SimpleQuorumCertificate { + view, + block: vote.block, + })); + } + } + Err(CarnotTallyError::InsufficientVotes) + } +} diff --git a/nomos-core/src/vote/mock.rs b/nomos-core/src/vote/mock.rs new file mode 100644 index 00000000..5a5d3b12 --- /dev/null +++ b/nomos-core/src/vote/mock.rs @@ -0,0 +1,76 @@ +// std +// crates +use futures::{Stream, StreamExt}; +use serde::{Deserialize, Serialize}; +// internal +use crate::vote::Tally; + +#[derive(Serialize, Deserialize)] +pub struct MockVote { + view: u64, +} + +impl MockVote { + pub fn view(&self) -> u64 { + self.view + } +} + +#[allow(dead_code)] +pub struct MockQc { + count_votes: usize, +} + +pub struct Error(String); + +#[derive(Clone, Debug)] +pub struct MockTallySettings { + pub threshold: usize, +} + +#[derive(Debug)] +pub struct MockTally { + threshold: usize, +} + +impl MockQc { + pub fn new(count_votes: usize) -> Self { + Self { count_votes } + } + + pub fn votes(&self) -> usize { + self.count_votes + } +} + +#[async_trait::async_trait] +impl Tally for MockTally { + type Vote = MockVote; + type Outcome = MockQc; + type TallyError = Error; + type Settings = MockTallySettings; + + fn new(settings: Self::Settings) -> Self { + let Self::Settings { threshold } = settings; + Self { threshold } + } + + async fn tally + Unpin + Send>( + &self, + view: u64, + mut vote_stream: S, + ) -> Result { + let mut count_votes = 0; + while let Some(vote) = vote_stream.next().await { + if vote.view() != view { + return Err(Error("Invalid vote".into())); + } + count_votes += 1; + } + if count_votes > self.threshold { + Ok(MockQc { count_votes }) + } else { + Err(Error("Not enough votes".into())) + } + } +} diff --git a/nomos-core/src/vote/mod.rs b/nomos-core/src/vote/mod.rs new file mode 100644 index 00000000..a8190e18 --- /dev/null +++ b/nomos-core/src/vote/mod.rs @@ -0,0 +1,18 @@ +pub mod carnot; +pub mod mock; + +use futures::Stream; + +#[async_trait::async_trait] +pub trait Tally { + type Vote; + type Outcome; + type TallyError; + type Settings: Clone; + fn new(settings: Self::Settings) -> Self; + async fn tally + Unpin + Send>( + &self, + view: u64, + vote_stream: S, + ) -> Result; +} diff --git a/nomos-core/src/wire.rs b/nomos-core/src/wire.rs index 12077205..917f33d6 100644 --- a/nomos-core/src/wire.rs +++ b/nomos-core/src/wire.rs @@ -11,6 +11,7 @@ use bincode::{ Options, }; use once_cell::sync::Lazy; +use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; pub type Error = bincode::Error; @@ -105,6 +106,11 @@ pub fn serialize(item: &T) -> Result, Error> { Ok(buf) } +/// Deserialize an object directly +pub fn deserialize(item: &[u8]) -> Result { + deserializer(item).deserialize() +} + #[cfg(test)] mod tests { use super::*; diff --git a/nomos-services/consensus/Cargo.toml b/nomos-services/consensus/Cargo.toml index 1665b5e3..f1c1d5ab 100644 --- a/nomos-services/consensus/Cargo.toml +++ b/nomos-services/consensus/Cargo.toml @@ -6,18 +6,19 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = "0.1" bytes = "1.3" chrono = "0.4" -rand_chacha = "0.3" -rand = "0.8" -overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } -async-trait = "0.1" +futures = "0.3" nomos-network = { path = "../network" } nomos-mempool = { path = "../mempool" } nomos-core = { path = "../../nomos-core" } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } +rand_chacha = "0.3" +rand = "0.8" +serde = "1.0" tokio = { version = "1", features = ["sync"] } tokio-stream = "0.1" -futures = "0.3" waku-bindings = { version = "0.1.0-rc.2", optional = true} tracing = "0.1" diff --git a/nomos-services/consensus/src/leadership.rs b/nomos-services/consensus/src/leadership.rs index 70620563..799991ac 100644 --- a/nomos-services/consensus/src/leadership.rs +++ b/nomos-services/consensus/src/leadership.rs @@ -36,11 +36,11 @@ impl Leadership { } #[allow(unused, clippy::diverging_sub_expression)] - pub async fn try_propose_block<'view>( + pub async fn try_propose_block<'view, Qc>( &self, view: &'view View, tip: &Tip, - qc: Approval, + qc: Qc, ) -> LeadershipResult<'view> { let ancestor_hint = todo!("get the ancestor from the tip"); if view.is_leader(self.key.key) { diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 760adc83..7b03f042 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -16,6 +16,7 @@ use std::collections::BTreeMap; use std::error::Error; use std::fmt::Debug; // crates +use serde::{Deserialize, Serialize}; // internal use crate::network::NetworkAdapter; use leadership::{Leadership, LeadershipResult}; @@ -23,6 +24,7 @@ use nomos_core::block::Block; use nomos_core::crypto::PublicKey; use nomos_core::fountain::FountainCode; use nomos_core::staking::Stake; +use nomos_core::vote::Tally; use nomos_mempool::{backend::MemPool, network::NetworkAdapter as MempoolAdapter, MempoolService}; use nomos_network::NetworkService; use overlay::Overlay; @@ -40,37 +42,45 @@ pub type NodeId = PublicKey; // Random seed for each round provided by the protocol pub type Seed = [u8; 32]; -pub struct CarnotSettings { +pub struct CarnotSettings { private_key: [u8; 32], fountain_settings: Fountain::Settings, + tally_settings: VoteTally::Settings, } -impl Clone for CarnotSettings { +impl Clone for CarnotSettings { fn clone(&self) -> Self { Self { private_key: self.private_key, fountain_settings: self.fountain_settings.clone(), + tally_settings: self.tally_settings.clone(), } } } -impl CarnotSettings { +impl CarnotSettings { #[inline] - pub const fn new(private_key: [u8; 32], fountain_settings: Fountain::Settings) -> Self { + pub const fn new( + private_key: [u8; 32], + fountain_settings: Fountain::Settings, + tally_settings: VoteTally::Settings, + ) -> Self { Self { private_key, fountain_settings, + tally_settings, } } } -pub struct CarnotConsensus +pub struct CarnotConsensus where F: FountainCode, A: NetworkAdapter, M: MempoolAdapter, P: MemPool, - O: Overlay, + T: Tally, + O: Overlay, P::Tx: Debug + 'static, P::Id: Debug + 'static, A::Backend: 'static, @@ -81,37 +91,42 @@ where network_relay: Relay>, mempool_relay: Relay>, _fountain: std::marker::PhantomData, + _tally: std::marker::PhantomData, _overlay: std::marker::PhantomData, } -impl ServiceData for CarnotConsensus +impl ServiceData for CarnotConsensus where F: FountainCode, A: NetworkAdapter, P: MemPool, + T: Tally, P::Tx: Debug, P::Id: Debug, M: MempoolAdapter, - O: Overlay, + O: Overlay, { const SERVICE_ID: ServiceId = "Carnot"; - type Settings = CarnotSettings; + type Settings = CarnotSettings; type State = NoState; type StateOperator = NoOperator; type Message = NoMessage; } #[async_trait::async_trait] -impl ServiceCore for CarnotConsensus +impl ServiceCore for CarnotConsensus where F: FountainCode + Send + Sync + 'static, A: NetworkAdapter + Send + Sync + 'static, P: MemPool + Send + Sync + 'static, + T: Tally + Send + Sync + 'static, + T::Settings: Send + Sync + 'static, + T::Outcome: Send + Sync, P::Settings: Send + Sync + 'static, P::Tx: Debug + Send + Sync + 'static, P::Id: Debug + Send + Sync + 'static, M: MempoolAdapter + Send + Sync + 'static, - O: Overlay + Send + Sync + 'static, + O: Overlay + Send + Sync + 'static, { fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); @@ -120,6 +135,7 @@ where service_state, network_relay, _fountain: Default::default(), + _tally: Default::default(), _overlay: Default::default(), mempool_relay, }) @@ -141,6 +157,7 @@ where let CarnotSettings { private_key, fountain_settings, + tally_settings, } = self.service_state.settings_reader.get_updated_settings(); let network_adapter = A::new(network_relay).await; @@ -148,6 +165,7 @@ where let tip = Tip; let fountain = F::new(fountain_settings); + let tally = T::new(tally_settings); let leadership = Leadership::new(private_key, mempool_relay); // FIXME: this should be taken from config @@ -162,11 +180,12 @@ where // FIXME: this should probably have a timer to detect failed rounds let res = cur_view - .resolve::( + .resolve::( private_key, &tip, &network_adapter, &fountain, + &tally, &leadership, ) .await; @@ -185,7 +204,7 @@ where } } -#[derive(Hash, Eq, PartialEq)] +#[derive(Hash, Eq, PartialEq, Serialize, Deserialize)] pub struct Approval; // Consensus round, also aids in guaranteeing synchronization @@ -198,28 +217,33 @@ pub struct View { impl View { // TODO: might want to encode steps in the type system - pub async fn resolve<'view, A, O, F, Tx, Id>( + pub async fn resolve<'view, A, O, F, T, Tx, Id>( &'view self, node_id: NodeId, tip: &Tip, adapter: &A, fountain: &F, + tally: &T, leadership: &Leadership, ) -> Result<(Block, View), Box> where A: NetworkAdapter + Send + Sync + 'static, F: FountainCode, - O: Overlay, + T: Tally + Send + Sync + 'static, + T::Outcome: Send + Sync, + O: Overlay, { let res = if self.is_leader(node_id) { let block = self - .resolve_leader::(node_id, tip, adapter, fountain, leadership) + .resolve_leader::( + node_id, tip, adapter, fountain, tally, leadership, + ) .await .unwrap(); // FIXME: handle sad path let next_view = self.generate_next_view(&block); (block, next_view) } else { - self.resolve_non_leader::(node_id, adapter, fountain) + self.resolve_non_leader::(node_id, adapter, fountain, tally) .await .unwrap() // FIXME: handle sad path }; @@ -233,23 +257,26 @@ impl View { Ok(res) } - async fn resolve_leader<'view, A, O, F, Tx, Id>( + async fn resolve_leader<'view, A, O, F, T, Tx, Id>( &'view self, node_id: NodeId, tip: &Tip, adapter: &A, fountain: &F, + tally: &T, leadership: &Leadership, ) -> Result where A: NetworkAdapter + Send + Sync + 'static, F: FountainCode, - O: Overlay, + T: Tally + Send + Sync + 'static, + T::Outcome: Send + Sync, + O: Overlay, { let overlay = O::new(self, node_id); // We need to build the QC for the block we are proposing - let qc = overlay.build_qc(self, adapter).await; + let qc = overlay.build_qc(self, adapter, tally).await; let LeadershipResult::Leader { block, _view } = leadership .try_propose_block(self, tip, qc) @@ -262,16 +289,18 @@ impl View { Ok(block) } - async fn resolve_non_leader<'view, A, O, F>( + async fn resolve_non_leader<'view, A, O, F, T>( &'view self, node_id: NodeId, adapter: &A, fountain: &F, + tally: &T, ) -> Result<(Block, View), ()> where A: NetworkAdapter + Send + Sync + 'static, F: FountainCode, - O: Overlay, + T: Tally + Send + Sync + 'static, + O: Overlay, { let overlay = O::new(self, node_id); // Consensus in Carnot is achieved in 2 steps from the point of view of a node: @@ -294,7 +323,7 @@ impl View { // We only consider the happy path for now if self.pipelined_safe_block(&block) { overlay - .approve_and_forward(self, &block, adapter, &next_view) + .approve_and_forward(self, &block, adapter, tally, &next_view) .await .unwrap(); // FIXME: handle sad path } diff --git a/nomos-services/consensus/src/network/adapters/mock.rs b/nomos-services/consensus/src/network/adapters/mock.rs index 2e3deda8..3f33692e 100644 --- a/nomos-services/consensus/src/network/adapters/mock.rs +++ b/nomos-services/consensus/src/network/adapters/mock.rs @@ -7,15 +7,17 @@ use nomos_network::{ NetworkMsg, NetworkService, }; use overwatch_rs::services::{relay::OutboundRelay, ServiceData}; +use serde::de::DeserializeOwned; +use serde::Serialize; use tokio_stream::{wrappers::BroadcastStream, Stream}; use crate::{ network::{ - messages::{ApprovalMsg, ProposalChunkMsg}, + messages::{ProposalChunkMsg, VoteMsg}, NetworkAdapter, }, overlay::committees::Committee, - Approval, View, + View, }; const MOCK_PUB_SUB_TOPIC: &str = "MockPubSubTopic"; @@ -114,11 +116,11 @@ impl NetworkAdapter for MockAdapter { }; } - async fn approvals_stream( + async fn votes_stream( &self, _committee: Committee, _view: &View, - ) -> Box + Send> { + ) -> Box + Send> { let stream_channel = self .message_subscriber_channel() .await @@ -132,7 +134,7 @@ impl NetworkAdapter for MockAdapter { == message.content_topic().content_topic_name { let payload = message.payload(); - Some(ApprovalMsg::from_bytes(payload.as_bytes()).approval) + Some(VoteMsg::from_bytes(payload.as_bytes()).vote) } else { None } @@ -144,12 +146,14 @@ impl NetworkAdapter for MockAdapter { ) } - async fn forward_approval( + async fn forward_approval( &self, _committee: Committee, _view: &View, - approval_message: ApprovalMsg, - ) { + approval_message: VoteMsg, + ) where + Vote: Send, + { let message = MockMessage::new( String::from_utf8_lossy(&approval_message.as_bytes()).to_string(), MOCK_APPROVAL_CONTENT_TOPIC, diff --git a/nomos-services/consensus/src/network/adapters/waku.rs b/nomos-services/consensus/src/network/adapters/waku.rs index aeeaefac..ce988356 100644 --- a/nomos-services/consensus/src/network/adapters/waku.rs +++ b/nomos-services/consensus/src/network/adapters/waku.rs @@ -6,16 +6,18 @@ use futures::{Stream, StreamExt}; use tokio_stream::wrappers::BroadcastStream; // internal use crate::network::{ - messages::{ApprovalMsg, ProposalChunkMsg}, + messages::{ProposalChunkMsg, VoteMsg}, NetworkAdapter, }; use crate::overlay::committees::Committee; -use crate::{Approval, View}; +use crate::View; use nomos_network::{ backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage}, NetworkMsg, NetworkService, }; use overwatch_rs::services::{relay::OutboundRelay, ServiceData}; +use serde::de::DeserializeOwned; +use serde::Serialize; use waku_bindings::{ ContentFilter, Encoding, StoreQuery, WakuContentTopic, WakuMessage, WakuPubSubTopic, }; @@ -161,27 +163,27 @@ impl NetworkAdapter for WakuAdapter { }; } - async fn approvals_stream( + async fn votes_stream( &self, committee: Committee, view: &View, - ) -> Box + Send> { + ) -> Box + Send> { let content_topic = proposal_topic(committee, view); Box::new(Box::pin( self.cached_stream_with_content_topic(content_topic) .await .map(|message| { let payload = message.payload(); - ApprovalMsg::from_bytes(payload).approval + VoteMsg::from_bytes(payload).vote }), )) } - async fn forward_approval( + async fn forward_approval( &self, committee: Committee, view: &View, - approval_message: ApprovalMsg, + approval_message: VoteMsg, ) { let content_topic = approval_topic(committee, view); diff --git a/nomos-services/consensus/src/network/messages.rs b/nomos-services/consensus/src/network/messages.rs index ec4daba3..91b8eaf5 100644 --- a/nomos-services/consensus/src/network/messages.rs +++ b/nomos-services/consensus/src/network/messages.rs @@ -1,8 +1,11 @@ // std // crates use bytes::Bytes; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; // internal -use crate::{Approval, NodeId}; +use crate::NodeId; +use nomos_core::wire; #[derive(Clone)] pub struct ProposalChunkMsg { @@ -21,20 +24,26 @@ impl ProposalChunkMsg { } } -pub struct ApprovalMsg { +#[derive(Serialize, Deserialize)] +pub struct VoteMsg { pub source: NodeId, - pub approval: Approval, + pub vote: Vote, } -impl ApprovalMsg { +impl VoteMsg +where + Vote: Serialize, +{ pub fn as_bytes(&self) -> Box<[u8]> { - self.source.into() - } - - pub fn from_bytes(data: &[u8]) -> Self { - Self { - source: NodeId::try_from(data).unwrap(), - approval: Approval, - } + wire::serialize(self).unwrap().into_boxed_slice() + } +} + +impl VoteMsg +where + Vote: DeserializeOwned, +{ + pub fn from_bytes(data: &[u8]) -> Self { + wire::deserialize(data).unwrap() } } diff --git a/nomos-services/consensus/src/network/mod.rs b/nomos-services/consensus/src/network/mod.rs index cd103794..8f06f1ca 100644 --- a/nomos-services/consensus/src/network/mod.rs +++ b/nomos-services/consensus/src/network/mod.rs @@ -6,13 +6,15 @@ use bytes::Bytes; // crates use futures::Stream; // internal -use crate::network::messages::{ApprovalMsg, ProposalChunkMsg}; +use crate::network::messages::{ProposalChunkMsg, VoteMsg}; use crate::overlay::committees::Committee; -use crate::{Approval, View}; +use crate::View; use nomos_network::backends::NetworkBackend; use nomos_network::NetworkService; use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::ServiceData; +use serde::de::DeserializeOwned; +use serde::Serialize; #[async_trait::async_trait] pub trait NetworkAdapter { @@ -31,10 +33,15 @@ pub trait NetworkAdapter { view: &View, chunk_msg: ProposalChunkMsg, ); - async fn approvals_stream( + async fn votes_stream( &self, committee: Committee, view: &View, - ) -> Box + Send>; - async fn forward_approval(&self, committee: Committee, view: &View, approval: ApprovalMsg); + ) -> Box + Send>; + async fn forward_approval( + &self, + committee: Committee, + view: &View, + approval: VoteMsg, + ); } diff --git a/nomos-services/consensus/src/overlay/committees.rs b/nomos-services/consensus/src/overlay/committees.rs index 09ba68a0..7aaf31b0 100644 --- a/nomos-services/consensus/src/overlay/committees.rs +++ b/nomos-services/consensus/src/overlay/committees.rs @@ -109,8 +109,12 @@ impl Member { } #[async_trait::async_trait] -impl - Overlay for Member +impl< + Network: NetworkAdapter + Sync, + Fountain: FountainCode + Sync, + VoteTally: Tally + Sync, + const C: usize, + > Overlay for Member { // we still need view here to help us initialize fn new(view: &View, node: NodeId) -> Self { @@ -164,6 +168,7 @@ impl Result<(), Box> { assert_eq!(view.view_n, self.view_n, "view_n mismatch"); @@ -177,7 +182,12 @@ impl Approval { + async fn build_qc( + &self, + view: &View, + _adapter: &Network, + _tally: &VoteTally, + ) -> VoteTally::Outcome { assert_eq!(view.view_n, self.view_n, "view_n mismatch"); // maybe the leader publishing the QC? todo!() diff --git a/nomos-services/consensus/src/overlay/flat.rs b/nomos-services/consensus/src/overlay/flat.rs index 09cc1c17..b368fd30 100644 --- a/nomos-services/consensus/src/overlay/flat.rs +++ b/nomos-services/consensus/src/overlay/flat.rs @@ -1,29 +1,16 @@ -use std::collections::HashSet; // std use std::error::Error; // crates use futures::StreamExt; +use serde::de::DeserializeOwned; +use serde::Serialize; // internal use super::*; -use crate::network::messages::{ApprovalMsg, ProposalChunkMsg}; +use crate::network::messages::{ProposalChunkMsg, VoteMsg}; use crate::network::NetworkAdapter; use crate::overlay::committees::Committee; -const DEFAULT_THRESHOLD: Threshold = Threshold::new(2, 3); const FLAT_COMMITTEE: Committee = Committee::root(); -/// The share of nodes that need to approve a block for it to be valid -/// expressed as a fraction of the total number of nodes -#[derive(Copy, Clone, Debug)] -pub struct Threshold { - num: u64, - den: u64, -} - -impl Threshold { - pub const fn new(num: u64, den: u64) -> Self { - Self { num, den } - } -} /// A flat overlay, everyone is in the same committee. /// As far as the API is concerned, this should be equivalent to any other @@ -31,18 +18,13 @@ impl Threshold { /// For this reason, this might act as a 'reference' overlay for testing. pub struct Flat { // TODO: this should be a const param, but we can't do that yet - threshold: Threshold, node_id: NodeId, view_n: u64, } impl Flat { pub fn new(view_n: u64, node_id: NodeId) -> Self { - Self { - threshold: DEFAULT_THRESHOLD, - node_id, - view_n, - } + Self { node_id, view_n } } fn approve(&self, _block: &Block) -> Approval { @@ -52,8 +34,12 @@ impl Flat { } #[async_trait::async_trait] -impl Overlay - for Flat +impl Overlay for Flat +where + Network: NetworkAdapter + Sync, + Fountain: FountainCode + Sync, + VoteTally: Tally + Sync, + VoteTally::Vote: Serialize + DeserializeOwned + Send, { fn new(view: &View, node: NodeId) -> Self { Flat::new(view.view_n, node) @@ -95,6 +81,7 @@ impl Overlay Result<(), Box> { assert_eq!(view.view_n, self.view_n, "view_n mismatch"); @@ -104,8 +91,8 @@ impl Overlay Overlay Approval { + async fn build_qc( + &self, + view: &View, + adapter: &Network, + tally: &VoteTally, + ) -> VoteTally::Outcome { assert_eq!(view.view_n, self.view_n, "view_n mismatch"); // for now, let's pretend that consensus is reached as soon as the // block is approved by a share of the nodes - let mut approvals = HashSet::new(); - let mut stream = Box::into_pin(adapter.approvals_stream(FLAT_COMMITTEE, view).await); + let stream = Box::into_pin(adapter.votes_stream(FLAT_COMMITTEE, view).await); // Shadow the original binding so that it can't be directly accessed // ever again. - while let Some(approval) = stream.next().await { - // insert the approval in the map to deduplicate - // TODO: validate approval - approvals.insert(approval); - // ceil(num/den * n) - let threshold = - (self.threshold.num * view.staking_keys.len() as u64 + self.threshold.den - 1) - / self.threshold.den; - if approvals.len() as u64 >= threshold { - // consensus reached - // FIXME: build a real QC - return Approval; - } + if let Ok(qc) = tally.tally(view.view_n, stream).await { + qc + } else { + unimplemented!("consensus not reached") } - unimplemented!("consensus not reached") } } diff --git a/nomos-services/consensus/src/overlay/mod.rs b/nomos-services/consensus/src/overlay/mod.rs index 5989ccd9..277efaa9 100644 --- a/nomos-services/consensus/src/overlay/mod.rs +++ b/nomos-services/consensus/src/overlay/mod.rs @@ -10,10 +10,11 @@ use crate::network::NetworkAdapter; pub use committees::Member; use nomos_core::block::Block; use nomos_core::fountain::{FountainCode, FountainError}; +use nomos_core::vote::Tally; /// Dissemination overlay, tied to a specific view #[async_trait::async_trait] -pub trait Overlay { +pub trait Overlay { fn new(view: &View, node: NodeId) -> Self; async fn reconstruct_proposal_block( @@ -37,8 +38,14 @@ pub trait Overlay { view: &View, block: &Block, adapter: &Network, + vote_tally: &VoteTally, next_view: &View, ) -> Result<(), Box>; /// Wait for consensus on a block - async fn build_qc(&self, view: &View, adapter: &Network) -> Approval; + async fn build_qc( + &self, + view: &View, + adapter: &Network, + vote_tally: &VoteTally, + ) -> VoteTally::Outcome; } diff --git a/nomos-services/consensus/src/test.rs b/nomos-services/consensus/src/test.rs index 9b8f6bfc..2d863225 100644 --- a/nomos-services/consensus/src/test.rs +++ b/nomos-services/consensus/src/test.rs @@ -7,9 +7,11 @@ use bytes::Bytes; use futures::Stream; use nomos_core::fountain::FountainError; use nomos_core::fountain::{mock::MockFountain, FountainCode}; +use nomos_core::vote::mock::{MockQc, MockTally, MockTallySettings}; use nomos_network::backends::NetworkBackend; use nomos_network::NetworkService; use overwatch_rs::services::relay::*; +use serde::de::DeserializeOwned; use tokio::sync::broadcast::Receiver; struct DummyOverlay; @@ -17,7 +19,7 @@ struct DummyAdapter; struct DummyBackend; #[async_trait] -impl Overlay for DummyOverlay { +impl Overlay for DummyOverlay { fn new(_: &View, _: NodeId) -> Self { DummyOverlay } @@ -38,13 +40,15 @@ impl Overlay for DummyOv _view: &View, _block: &Block, _adapter: &N, + _vote_tally: &MockTally, _next_view: &View, ) -> Result<(), Box> { Ok(()) } - async fn build_qc(&self, _view: &View, _: &N) -> Approval { - Approval + async fn build_qc(&self, view: &View, _adapter: &N, _vote_tally: &MockTally) -> MockQc { + // TODO: mock the total votes + MockQc::new(0) } } @@ -66,14 +70,21 @@ impl NetworkAdapter for DummyAdapter { async fn broadcast_block_chunk(&self, _: Committee, _: &View, _: ProposalChunkMsg) { unimplemented!() } - async fn approvals_stream( + async fn votes_stream( &self, - _: Committee, - _: &View, - ) -> Box + Send> { + _committee: Committee, + _view: &View, + ) -> Box + Send> { + unimplemented!() + } + async fn forward_approval( + &self, + _committee: Committee, + _view: &View, + _approval: VoteMsg, + ) { unimplemented!() } - async fn forward_approval(&self, _: Committee, _: &View, _: ApprovalMsg) {} } #[async_trait] @@ -100,11 +111,13 @@ async fn test_single_round_non_leader() { staking_keys: BTreeMap::new(), view_n: 0, }; + let mock_tally = MockTally::new(MockTallySettings { threshold: 0 }); let (_, next_view) = view - .resolve_non_leader::( + .resolve_non_leader::( [0; 32], &DummyAdapter, &MockFountain, + &mock_tally, ) .await .unwrap();