diff --git a/consensus-engine/Cargo.toml b/consensus-engine/Cargo.toml index c44f2075..bec28a37 100644 --- a/consensus-engine/Cargo.toml +++ b/consensus-engine/Cargo.toml @@ -6,3 +6,8 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +serde = { version = "1.0", optional = true } + +[features] +default = [] +serde1 = ["serde"] diff --git a/consensus-engine/src/lib.rs b/consensus-engine/src/lib.rs index 3067a225..9cd526f3 100644 --- a/consensus-engine/src/lib.rs +++ b/consensus-engine/src/lib.rs @@ -1,7 +1,7 @@ use std::collections::{HashMap, HashSet}; mod types; -use types::*; +pub use types::*; #[derive(Clone, Debug)] pub struct Carnot { diff --git a/consensus-engine/src/types.rs b/consensus-engine/src/types.rs index efe87970..20cba87b 100644 --- a/consensus-engine/src/types.rs +++ b/consensus-engine/src/types.rs @@ -12,6 +12,7 @@ pub type Committee = HashSet; /// This enum represents the different types of messages that can be sent from the perspective of consensus and /// can't be directly used in the network as they lack things like cryptographic signatures. #[derive(Debug, Clone, Eq, PartialEq)] +#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] pub enum Payload { /// Vote for a block in a view Vote(Vote), @@ -23,11 +24,13 @@ pub enum Payload { /// Returned #[derive(Debug, Clone, Eq, PartialEq)] +#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] pub struct Vote { pub block: BlockId, } #[derive(Debug, Clone, Eq, PartialEq)] +#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] pub struct Timeout { pub view: View, pub sender: NodeId, @@ -36,6 +39,7 @@ pub struct Timeout { } #[derive(Debug, Clone, Eq, PartialEq)] +#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] pub struct NewView { pub view: View, pub sender: NodeId, @@ -44,6 +48,7 @@ pub struct NewView { } #[derive(Debug, Clone, Eq, PartialEq)] +#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] pub struct TimeoutQc { pub view: View, pub high_qc: Qc, @@ -51,6 +56,7 @@ pub struct TimeoutQc { } #[derive(Debug, Clone, Eq, PartialEq, Hash)] +#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] pub struct Block { pub id: BlockId, pub view: View, @@ -76,6 +82,7 @@ pub enum Output { } #[derive(Debug, Clone, Eq, PartialEq, Hash)] +#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] pub struct StandardQc { pub view: View, pub id: BlockId, @@ -91,12 +98,14 @@ impl StandardQc { } #[derive(Debug, Clone, Eq, PartialEq, Hash)] +#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] pub struct AggregateQc { pub high_qc: StandardQc, pub view: View, } #[derive(Debug, Clone, Eq, PartialEq, Hash)] +#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] pub enum Qc { Standard(StandardQc), Aggregated(AggregateQc), diff --git a/nomos-core/Cargo.toml b/nomos-core/Cargo.toml index 3ef3bed5..842b5eaf 100644 --- a/nomos-core/Cargo.toml +++ b/nomos-core/Cargo.toml @@ -12,6 +12,7 @@ authors = [ async-trait = { version = "0.1" } blake2 = { version = "0.10" } bytes = "1.3" +consensus-engine = { path = "../consensus-engine"} futures = "0.3" nomos-network = { path = "../nomos-services/network", optional = true } raptorq = { version = "1.7", optional = true } diff --git a/nomos-services/consensus/Cargo.toml b/nomos-services/consensus/Cargo.toml index 01572677..ba21fc73 100644 --- a/nomos-services/consensus/Cargo.toml +++ b/nomos-services/consensus/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" async-trait = "0.1" bytes = "1.3" chrono = "0.4" +consensus-engine = { path = "../../consensus-engine", features = ["serde1"] } futures = "0.3" nomos-network = { path = "../network" } nomos-mempool = { path = "../mempool" } diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 80eb1358..1b801cac 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -227,7 +227,7 @@ pub struct Approval; pub struct View { seed: Seed, staking_keys: BTreeMap, - pub view_n: u64, + pub view_n: consensus_engine::View, } impl View { @@ -357,8 +357,9 @@ impl View { true } + // TODO: use consensus_engine::View instead pub fn id(&self) -> u64 { - self.view_n + self.view_n.try_into().unwrap() } // Verifies the block is new and the previous leader did not fail diff --git a/nomos-services/consensus/src/network/adapters/mock.rs b/nomos-services/consensus/src/network/adapters/mock.rs index ce2bf810..0df627fd 100644 --- a/nomos-services/consensus/src/network/adapters/mock.rs +++ b/nomos-services/consensus/src/network/adapters/mock.rs @@ -11,14 +11,15 @@ use serde::de::DeserializeOwned; use serde::Serialize; use tokio_stream::{wrappers::BroadcastStream, Stream}; +use crate::network::messages::TimeoutQcMsg; use crate::{ network::{ messages::{ProposalChunkMsg, VoteMsg}, NetworkAdapter, }, overlay::committees::Committee, - View, }; +use consensus_engine::{TimeoutQc, View}; const MOCK_PUB_SUB_TOPIC: &str = "MockPubSubTopic"; const MOCK_BLOCK_CONTENT_TOPIC: MockContentTopic = MockContentTopic::new("MockSim", 1, "MockBlock"); @@ -63,8 +64,7 @@ impl NetworkAdapter for MockAdapter { async fn proposal_chunks_stream( &self, - _committee: Committee, - _view: &View, + _view: View, ) -> Box + Send + Sync + Unpin> { let stream_channel = self .message_subscriber_channel() @@ -80,7 +80,9 @@ impl NetworkAdapter for MockAdapter { == message.content_topic().content_topic_name { let payload = message.payload(); - Some(ProposalChunkMsg::from_bytes(payload.as_bytes()).chunk) + Some(Bytes::from( + ProposalChunkMsg::from_bytes(payload.as_bytes()).chunk, + )) } else { None } @@ -92,14 +94,9 @@ impl NetworkAdapter for MockAdapter { })) } - async fn broadcast_block_chunk( - &self, - _committee: Committee, - _view: &View, - chunk_message: ProposalChunkMsg, - ) { + async fn broadcast_block_chunk(&self, chunk_message: ProposalChunkMsg) { let message = MockMessage::new( - String::from_utf8_lossy(chunk_message.as_bytes()).to_string(), + String::from_utf8_lossy(&chunk_message.as_bytes()).to_string(), MOCK_BLOCK_CONTENT_TOPIC, 1, chrono::Utc::now().timestamp() as usize, @@ -116,10 +113,21 @@ impl NetworkAdapter for MockAdapter { }; } + async fn broadcast_timeout_qc(&self, _timeout_qc_msg: TimeoutQcMsg) { + todo!() + } + + async fn timeout_qc_stream( + &self, + _view: View, + ) -> Box + Send + Sync + Unpin> { + todo!() + } + async fn votes_stream( &self, _committee: Committee, - _view: &View, + _view: View, ) -> Box + Send> { let stream_channel = self .message_subscriber_channel() @@ -146,10 +154,10 @@ impl NetworkAdapter for MockAdapter { ) } - async fn forward_approval( + async fn send_vote( &self, _committee: Committee, - _view: &View, + _view: View, approval_message: VoteMsg, ) where Vote: Send, diff --git a/nomos-services/consensus/src/network/adapters/waku.rs b/nomos-services/consensus/src/network/adapters/waku.rs index ce988356..d3758859 100644 --- a/nomos-services/consensus/src/network/adapters/waku.rs +++ b/nomos-services/consensus/src/network/adapters/waku.rs @@ -5,12 +5,13 @@ use bytes::Bytes; use futures::{Stream, StreamExt}; use tokio_stream::wrappers::BroadcastStream; // internal +use crate::network::messages::TimeoutQcMsg; use crate::network::{ messages::{ProposalChunkMsg, VoteMsg}, NetworkAdapter, }; use crate::overlay::committees::Committee; -use crate::View; +use consensus_engine::{TimeoutQc, View}; use nomos_network::{ backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage}, NetworkMsg, NetworkService, @@ -109,6 +110,20 @@ impl WakuAdapter { }); tokio_stream::StreamExt::merge(archive_stream, live_stream) } + + async fn broadcast(&self, bytes: Box<[u8]>, topic: WakuContentTopic) { + let message = WakuMessage::new(bytes, topic, 1, chrono::Utc::now().timestamp() as usize); + if let Err((_, _e)) = self + .network_relay + .send(NetworkMsg::Process(WakuBackendMessage::Broadcast { + message, + topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()), + })) + .await + { + todo!("log error"); + }; + } } #[async_trait::async_trait] @@ -123,52 +138,65 @@ impl NetworkAdapter for WakuAdapter { async fn proposal_chunks_stream( &self, - committee: Committee, - view: &View, + view: View, ) -> Box + Send + Sync + Unpin> { - let content_topic = proposal_topic(committee, view); Box::new(Box::pin( - self.cached_stream_with_content_topic(content_topic) + self.cached_stream_with_content_topic(PROPOSAL_CONTENT_TOPIC.clone()) .await - .map(|message| { + .filter_map(move |message| { let payload = message.payload(); - ProposalChunkMsg::from_bytes(payload).chunk + let ProposalChunkMsg { + view: msg_view, + chunk, + } = ProposalChunkMsg::from_bytes(payload); + async move { + if view == msg_view { + Some(Bytes::from(chunk)) + } else { + None + } + } }), )) } - async fn broadcast_block_chunk( - &self, - committee: Committee, - view: &View, - chunk_message: ProposalChunkMsg, - ) { - let content_topic = proposal_topic(committee, view); - - let message = WakuMessage::new( - chunk_message.as_bytes(), - content_topic, - 1, - chrono::Utc::now().timestamp() as usize, - ); - if let Err((_, _e)) = self - .network_relay - .send(NetworkMsg::Process(WakuBackendMessage::Broadcast { - message, - topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()), - })) + async fn broadcast_block_chunk(&self, chunk_message: ProposalChunkMsg) { + self.broadcast(chunk_message.as_bytes(), PROPOSAL_CONTENT_TOPIC) .await - { - todo!("log error"); - }; + } + + async fn broadcast_timeout_qc(&self, timeout_qc_msg: TimeoutQcMsg) { + self.broadcast(timeout_qc_msg.as_bytes(), TIMEOUT_QC_CONTENT_TOPIC) + .await + } + + async fn timeout_qc_stream( + &self, + view: View, + ) -> Box + Send + Sync + Unpin> { + Box::new(Box::pin( + self.cached_stream_with_content_topic(TIMEOUT_QC_CONTENT_TOPIC.clone()) + .await + .filter_map(move |message| { + let payload = message.payload(); + let qc = TimeoutQcMsg::from_bytes(payload).qc; + async move { + if qc.view > view { + Some(qc) + } else { + None + } + } + }), + )) } async fn votes_stream( &self, committee: Committee, - view: &View, + view: View, ) -> Box + Send> { - let content_topic = proposal_topic(committee, view); + let content_topic = votes_topic(committee, view); Box::new(Box::pin( self.cached_stream_with_content_topic(content_topic) .await @@ -179,13 +207,13 @@ impl NetworkAdapter for WakuAdapter { )) } - async fn forward_approval( + async fn send_vote( &self, committee: Committee, - view: &View, + view: View, approval_message: VoteMsg, ) { - let content_topic = approval_topic(committee, view); + let content_topic = votes_topic(committee, view); let message = WakuMessage::new( approval_message.as_bytes(), @@ -206,20 +234,16 @@ impl NetworkAdapter for WakuAdapter { } } -fn approval_topic(committee: Committee, view: &View) -> WakuContentTopic { +fn votes_topic(committee: Committee, view: View) -> WakuContentTopic { WakuContentTopic { application_name: Cow::Borrowed(APPLICATION_NAME), version: VERSION, - content_topic_name: Cow::Owned(format!("approval-{}-{}", committee.id(), view.id())), + content_topic_name: Cow::Owned(format!("votes-{}-{}", committee.id(), view)), encoding: Encoding::Proto, } } -fn proposal_topic(committee: Committee, view: &View) -> WakuContentTopic { - WakuContentTopic { - application_name: Cow::Borrowed(APPLICATION_NAME), - version: VERSION, - content_topic_name: Cow::Owned(format!("proposal-{}-{}", committee.id(), view.id())), - encoding: Encoding::Proto, - } -} +const PROPOSAL_CONTENT_TOPIC: WakuContentTopic = + WakuContentTopic::new(APPLICATION_NAME, VERSION, "proposal", Encoding::Proto); +const TIMEOUT_QC_CONTENT_TOPIC: WakuContentTopic = + WakuContentTopic::new(APPLICATION_NAME, VERSION, "timeout-qc", Encoding::Proto); diff --git a/nomos-services/consensus/src/network/messages.rs b/nomos-services/consensus/src/network/messages.rs index 91b8eaf5..8c3afe61 100644 --- a/nomos-services/consensus/src/network/messages.rs +++ b/nomos-services/consensus/src/network/messages.rs @@ -1,26 +1,25 @@ // std // crates -use bytes::Bytes; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; // internal use crate::NodeId; +use consensus_engine::{TimeoutQc, View}; use nomos_core::wire; -#[derive(Clone)] +#[derive(Clone, Serialize, Deserialize)] pub struct ProposalChunkMsg { - pub chunk: Bytes, + pub chunk: Box<[u8]>, + pub view: View, } impl ProposalChunkMsg { - pub fn as_bytes(&self) -> &[u8] { - &self.chunk + pub fn as_bytes(&self) -> Box<[u8]> { + wire::serialize(self).unwrap().into_boxed_slice() } pub fn from_bytes(data: &[u8]) -> Self { - Self { - chunk: Bytes::from(data.to_vec()), - } + wire::deserialize(data).unwrap() } } @@ -47,3 +46,19 @@ where wire::deserialize(data).unwrap() } } + +#[derive(Serialize, Deserialize)] +pub struct TimeoutQcMsg { + pub source: NodeId, + pub qc: TimeoutQc, +} + +impl TimeoutQcMsg { + pub fn as_bytes(&self) -> Box<[u8]> { + wire::serialize(self).unwrap().into_boxed_slice() + } + + pub fn from_bytes(data: &[u8]) -> Self { + wire::deserialize(data).unwrap() + } +} diff --git a/nomos-services/consensus/src/network/mod.rs b/nomos-services/consensus/src/network/mod.rs index 8f06f1ca..e13eaf36 100644 --- a/nomos-services/consensus/src/network/mod.rs +++ b/nomos-services/consensus/src/network/mod.rs @@ -6,9 +6,9 @@ use bytes::Bytes; // crates use futures::Stream; // internal -use crate::network::messages::{ProposalChunkMsg, VoteMsg}; +use crate::network::messages::{ProposalChunkMsg, TimeoutQcMsg, VoteMsg}; use crate::overlay::committees::Committee; -use crate::View; +use consensus_engine::{TimeoutQc, View}; use nomos_network::backends::NetworkBackend; use nomos_network::NetworkService; use overwatch_rs::services::relay::OutboundRelay; @@ -24,24 +24,23 @@ pub trait NetworkAdapter { ) -> Self; async fn proposal_chunks_stream( &self, - committee: Committee, - view: &View, + view: View, ) -> Box + Send + Sync + Unpin>; - async fn broadcast_block_chunk( + async fn broadcast_block_chunk(&self, chunk_msg: ProposalChunkMsg); + async fn broadcast_timeout_qc(&self, timeout_qc_msg: TimeoutQcMsg); + async fn timeout_qc_stream( &self, - committee: Committee, - view: &View, - chunk_msg: ProposalChunkMsg, - ); + view: View, + ) -> Box + Send + Sync + Unpin>; async fn votes_stream( &self, committee: Committee, - view: &View, + view: View, ) -> Box + Send>; - async fn forward_approval( + async fn send_vote( &self, committee: Committee, - view: &View, - approval: VoteMsg, + view: View, + vote: VoteMsg, ); } diff --git a/nomos-services/consensus/src/overlay/committees.rs b/nomos-services/consensus/src/overlay/committees.rs index 4c25b624..95c13bc7 100644 --- a/nomos-services/consensus/src/overlay/committees.rs +++ b/nomos-services/consensus/src/overlay/committees.rs @@ -16,7 +16,7 @@ pub struct Member { id: NodeId, committee: Committee, committees: Committees, - view_n: u64, + view_n: consensus_engine::View, } /// #Just a newtype index to be able to implement parent/children methods @@ -133,8 +133,7 @@ where fountain: &Fountain, ) -> Result, FountainError> { assert_eq!(view.view_n, self.view_n, "view_n mismatch"); - let committee = self.committee; - let message_stream = adapter.proposal_chunks_stream(committee, view).await; + let message_stream = adapter.proposal_chunks_stream(view.view_n).await; fountain.decode(message_stream).await.and_then(|b| { deserializer(&b) .deserialize::>() @@ -150,23 +149,15 @@ where fountain: &Fountain, ) { assert_eq!(view.view_n, self.view_n, "view_n mismatch"); - let (left_child, right_child) = self.children_committes(); let block_bytes = block.as_bytes(); let encoded_stream = fountain.encode(&block_bytes); encoded_stream .for_each_concurrent(None, |chunk| async move { - let message = ProposalChunkMsg { chunk }; - let r_child = right_child - .map(|right_child| { - adapter.broadcast_block_chunk(right_child, view, message.clone()) - }) - .into_iter(); - let l_child = left_child - .map(|left_child| { - adapter.broadcast_block_chunk(left_child, view, message.clone()) - }) - .into_iter(); - futures::future::join_all(r_child.chain(l_child)).await; + let message = ProposalChunkMsg { + chunk: chunk.to_vec().into_boxed_slice(), + view: view.view_n, + }; + adapter.broadcast_block_chunk(message.clone()).await; }) .await; } diff --git a/nomos-services/consensus/src/overlay/flat.rs b/nomos-services/consensus/src/overlay/flat.rs index 8e224ee5..353df9e2 100644 --- a/nomos-services/consensus/src/overlay/flat.rs +++ b/nomos-services/consensus/src/overlay/flat.rs @@ -21,11 +21,11 @@ const FLAT_COMMITTEE: Committee = Committee::root(); pub struct Flat { // TODO: this should be a const param, but we can't do that yet node_id: NodeId, - view_n: u64, + view_n: consensus_engine::View, } impl Flat { - pub fn new(view_n: u64, node_id: NodeId) -> Self { + pub fn new(view_n: consensus_engine::View, node_id: NodeId) -> Self { Self { node_id, view_n } } @@ -56,7 +56,7 @@ where fountain: &Fountain, ) -> Result, FountainError> { assert_eq!(view.view_n, self.view_n, "view_n mismatch"); - let message_stream = adapter.proposal_chunks_stream(FLAT_COMMITTEE, view).await; + let message_stream = adapter.proposal_chunks_stream(view.view_n).await; fountain.decode(message_stream).await.and_then(|b| { deserializer(&b) .deserialize::>() @@ -76,10 +76,11 @@ where let encoded_stream = fountain.encode(&block_bytes); encoded_stream .for_each_concurrent(None, |chunk| async move { - let message = ProposalChunkMsg { chunk }; - adapter - .broadcast_block_chunk(FLAT_COMMITTEE, view, message) - .await; + let message = ProposalChunkMsg { + chunk: chunk.to_vec().into_boxed_slice(), + view: view.view_n, + }; + adapter.broadcast_block_chunk(message).await; }) .await; } @@ -96,9 +97,9 @@ where // in the flat overlay, there's no need to wait for anyone before approving the block let approval = self.approve(block); adapter - .forward_approval( + .send_vote( FLAT_COMMITTEE, - view, + view.view_n, VoteMsg { vote: approval, source: self.node_id, @@ -113,11 +114,12 @@ where // for now, let's pretend that consensus is reached as soon as the // block is approved by a share of the nodes - let stream = Box::into_pin(adapter.votes_stream(FLAT_COMMITTEE, view).await); + let stream = Box::into_pin(adapter.votes_stream(FLAT_COMMITTEE, view.view_n).await); // Shadow the original binding so that it can't be directly accessed // ever again. - if let Ok((qc, _)) = tally.tally(view.view_n, stream).await { + // TODO: Remove the `try_into` call when tally is refactored to use with latest consensus engine types + if let Ok((qc, _)) = tally.tally(view.view_n.try_into().unwrap(), stream).await { qc } else { unimplemented!("consensus not reached")