diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index b75b3024..4d904bac 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -262,4 +262,8 @@ impl View { pub fn is_leader(&self, _node_id: NodeId) -> bool { todo!() } + + pub fn id(&self) -> u64 { + self._view_n + } } diff --git a/nomos-services/consensus/src/network/adapters/waku.rs b/nomos-services/consensus/src/network/adapters/waku.rs index a9849d85..fa15f997 100644 --- a/nomos-services/consensus/src/network/adapters/waku.rs +++ b/nomos-services/consensus/src/network/adapters/waku.rs @@ -1,4 +1,5 @@ // std +use std::borrow::Cow; // crates use bytes::Bytes; use futures::{Stream, StreamExt}; @@ -8,6 +9,7 @@ use crate::network::{ messages::{ApprovalMsg, ProposalChunkMsg}, NetworkAdapter, }; +use crate::overlay::committees::Committee; use crate::{Approval, View}; use nomos_network::{ backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage}, @@ -19,11 +21,8 @@ use waku_bindings::{Encoding, WakuContentTopic, WakuMessage, WakuPubSubTopic}; const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic = WakuPubSubTopic::new("CarnotSim", Encoding::Proto); -const WAKU_CARNOT_BLOCK_CONTENT_TOPIC: WakuContentTopic = - WakuContentTopic::new("CarnotSim", 1, "CarnotBlock", Encoding::Proto); - -const WAKU_CARNOT_APPROVAL_CONTENT_TOPIC: WakuContentTopic = - WakuContentTopic::new("CarnotSim", 1, "CarnotApproval", Encoding::Proto); +const APPLICATION_NAME: &str = "CarnotSim"; +const VERSION: usize = 1; pub struct WakuAdapter { network_relay: OutboundRelay< as ServiceData>::Message>, @@ -61,40 +60,50 @@ impl NetworkAdapter for WakuAdapter { Self { network_relay } } - async fn proposal_chunks_stream(&self) -> Box + Send + Sync + Unpin> { + async fn proposal_chunks_stream( + &self, + committee: Committee, + view: &View, + ) -> Box + Send + Sync + Unpin> { let stream_channel = self .message_subscriber_channel() .await .unwrap_or_else(|_e| todo!("handle error")); - Box::new(BroadcastStream::new(stream_channel).filter_map(|msg| { - Box::pin(async move { - match msg { - Ok(event) => match event { - NetworkEvent::RawMessage(message) => { - // TODO: this should actually check the whole content topic, - // waiting for this [PR](https://github.com/waku-org/waku-rust-bindings/pull/28) - if WAKU_CARNOT_BLOCK_CONTENT_TOPIC.content_topic_name - == message.content_topic().content_topic_name - { - let payload = message.payload(); - Some(ProposalChunkMsg::from_bytes(payload).chunk) - } else { - None - } + let content_topic = proposal_topic(committee, view); + Box::new( + BroadcastStream::new(stream_channel) + .zip(futures::stream::repeat(content_topic)) + .filter_map(|(msg, content_topic)| { + Box::pin(async move { + match msg { + Ok(event) => match event { + NetworkEvent::RawMessage(message) => { + if &content_topic == message.content_topic() { + let payload = message.payload(); + Some(ProposalChunkMsg::from_bytes(payload).chunk) + } else { + None + } + } + }, + Err(_e) => None, } - }, - Err(_e) => None, - } - }) - })) + }) + }), + ) } - async fn broadcast_block_chunk(&self, _view: &View, chunk_message: ProposalChunkMsg) { - // TODO: probably later, depending on the view we should map to different content topics - // but this is an ongoing idea that should/will be discus. + async fn broadcast_block_chunk( + &self, + committee: Committee, + view: &View, + chunk_message: ProposalChunkMsg, + ) { + let content_topic = proposal_topic(committee, view); + let message = WakuMessage::new( chunk_message.as_bytes(), - WAKU_CARNOT_BLOCK_CONTENT_TOPIC, + content_topic, 1, chrono::Utc::now().timestamp() as usize, ); @@ -110,38 +119,48 @@ impl NetworkAdapter for WakuAdapter { }; } - async fn approvals_stream(&self) -> Box + Send> { + async fn approvals_stream( + &self, + committee: Committee, + view: &View, + ) -> Box + Send> { + let content_topic = approval_topic(committee, view); let stream_channel = self .message_subscriber_channel() .await .unwrap_or_else(|_e| todo!("handle error")); Box::new( - BroadcastStream::new(stream_channel).filter_map(|msg| async move { - match msg { - Ok(event) => match event { - NetworkEvent::RawMessage(message) => { - // TODO: this should actually check the whole content topic, - // waiting for this [PR](https://github.com/waku-org/waku-rust-bindings/pull/28) - if WAKU_CARNOT_APPROVAL_CONTENT_TOPIC.content_topic_name - == message.content_topic().content_topic_name - { - let payload = message.payload(); - Some(ApprovalMsg::from_bytes(payload).approval) - } else { - None + BroadcastStream::new(stream_channel) + .zip(futures::stream::repeat(content_topic)) + .filter_map(|(msg, content_topic)| async move { + match msg { + Ok(event) => match event { + NetworkEvent::RawMessage(message) => { + if &content_topic == message.content_topic() { + let payload = message.payload(); + Some(ApprovalMsg::from_bytes(payload).approval) + } else { + None + } } - } - }, - Err(_e) => None, - } - }), + }, + Err(_e) => None, + } + }), ) } - async fn forward_approval(&self, approval_message: ApprovalMsg) { + async fn forward_approval( + &self, + committee: Committee, + view: &View, + approval_message: ApprovalMsg, + ) { + let content_topic = approval_topic(committee, view); + let message = WakuMessage::new( approval_message.as_bytes(), - WAKU_CARNOT_APPROVAL_CONTENT_TOPIC, + content_topic, 1, chrono::Utc::now().timestamp() as usize, ); @@ -157,3 +176,21 @@ impl NetworkAdapter for WakuAdapter { }; } } + +fn approval_topic(committee: Committee, view: &View) -> WakuContentTopic { + WakuContentTopic { + application_name: Cow::Borrowed(APPLICATION_NAME), + version: VERSION, + content_topic_name: Cow::Owned(format!("approval-{}-{}", committee.id(), view.id())), + encoding: Encoding::Proto, + } +} + +fn proposal_topic(committee: Committee, view: &View) -> WakuContentTopic { + WakuContentTopic { + application_name: Cow::Borrowed(APPLICATION_NAME), + version: VERSION, + content_topic_name: Cow::Owned(format!("proposal-{}-{}", committee.id(), view.id())), + encoding: Encoding::Proto, + } +} diff --git a/nomos-services/consensus/src/network/messages.rs b/nomos-services/consensus/src/network/messages.rs index 96811071..ec4daba3 100644 --- a/nomos-services/consensus/src/network/messages.rs +++ b/nomos-services/consensus/src/network/messages.rs @@ -4,6 +4,7 @@ use bytes::Bytes; // internal use crate::{Approval, NodeId}; +#[derive(Clone)] pub struct ProposalChunkMsg { pub chunk: Bytes, } diff --git a/nomos-services/consensus/src/network/mod.rs b/nomos-services/consensus/src/network/mod.rs index 684e0f13..cd103794 100644 --- a/nomos-services/consensus/src/network/mod.rs +++ b/nomos-services/consensus/src/network/mod.rs @@ -7,6 +7,7 @@ use bytes::Bytes; use futures::Stream; // internal use crate::network::messages::{ApprovalMsg, ProposalChunkMsg}; +use crate::overlay::committees::Committee; use crate::{Approval, View}; use nomos_network::backends::NetworkBackend; use nomos_network::NetworkService; @@ -19,8 +20,21 @@ pub trait NetworkAdapter { async fn new( network_relay: OutboundRelay< as ServiceData>::Message>, ) -> Self; - async fn proposal_chunks_stream(&self) -> Box + Send + Sync + Unpin>; - async fn broadcast_block_chunk(&self, view: &View, chunk_msg: ProposalChunkMsg); - async fn approvals_stream(&self) -> Box + Send>; - async fn forward_approval(&self, approval: ApprovalMsg); + async fn proposal_chunks_stream( + &self, + committee: Committee, + view: &View, + ) -> Box + Send + Sync + Unpin>; + async fn broadcast_block_chunk( + &self, + committee: Committee, + view: &View, + chunk_msg: ProposalChunkMsg, + ); + async fn approvals_stream( + &self, + committee: Committee, + view: &View, + ) -> Box + Send>; + async fn forward_approval(&self, committee: Committee, view: &View, approval: ApprovalMsg); } diff --git a/nomos-services/consensus/src/overlay/committees.rs b/nomos-services/consensus/src/overlay/committees.rs index 96b42511..9e576f68 100644 --- a/nomos-services/consensus/src/overlay/committees.rs +++ b/nomos-services/consensus/src/overlay/committees.rs @@ -1,5 +1,4 @@ // std -use std::pin::Pin; // crates use futures::StreamExt; use rand::{seq::SliceRandom, SeedableRng}; @@ -10,6 +9,8 @@ use crate::network::NetworkAdapter; /// View of the tree overlay centered around a specific member pub struct Member<'view, const C: usize> { + // id is not used now, but gonna probably used it for later checking later on + #[allow(dead_code)] id: NodeId, committee: Committee, committees: Committees<'view, C>, @@ -54,6 +55,14 @@ impl<'view, const C: usize> Committees<'view, C> { } impl Committee { + pub const fn root() -> Self { + Self(0) + } + + pub fn id(&self) -> usize { + self.0 + } + /// Return the left and right children committee, if any pub fn children(&self) -> (Committee, Committee) { ( @@ -112,19 +121,31 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const adapter: &Network, fountain: &Fountain, ) -> Result { - let message_stream = adapter.proposal_chunks_stream().await; + let committee = self.committee; + let view = self.committees.view; + let message_stream = adapter.proposal_chunks_stream(committee, view).await; fountain.decode(message_stream).await.map(Block::from_bytes) } async fn broadcast_block(&self, block: Block, adapter: &Network, fountain: &Fountain) { + let (left_child, right_child) = self.children_committes(); + let view = self.committees.view; let block_bytes = block.as_bytes(); let encoded_stream = fountain.encode(&block_bytes); encoded_stream .for_each_concurrent(None, |chunk| async move { let message = ProposalChunkMsg { chunk }; - adapter - .broadcast_block_chunk(self.committees.view, message) - .await; + let r_child = right_child + .map(|right_child| { + adapter.broadcast_block_chunk(right_child, view, message.clone()) + }) + .into_iter(); + let l_child = left_child + .map(|left_child| { + adapter.broadcast_block_chunk(left_child, view, message.clone()) + }) + .into_iter(); + futures::future::join_all(r_child.chain(l_child)).await; }) .await; } @@ -132,7 +153,7 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const async fn approve_and_forward( &self, _block: &Block, - adapter: &Network, + _adapter: &Network, ) -> Result<(), Box> { // roughly, we want to do something like this: // 1. wait for left and right children committees to approve @@ -144,7 +165,7 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const todo!() } - async fn wait_for_consensus(&self, _approval: &Block, adapter: &Network) { + async fn wait_for_consensus(&self, _approval: &Block, _adapter: &Network) { // maybe the leader publishing the QC? } } diff --git a/nomos-services/consensus/src/overlay/flat.rs b/nomos-services/consensus/src/overlay/flat.rs index 0e87d7a1..e727f9c5 100644 --- a/nomos-services/consensus/src/overlay/flat.rs +++ b/nomos-services/consensus/src/overlay/flat.rs @@ -7,9 +7,10 @@ use futures::StreamExt; use super::*; use crate::network::messages::{ApprovalMsg, ProposalChunkMsg}; use crate::network::NetworkAdapter; +use crate::overlay::committees::Committee; const DEFAULT_THRESHOLD: Threshold = Threshold::new(2, 3); - +const FLAT_COMMITTEE: Committee = Committee::root(); /// The share of nodes that need to approve a block for it to be valid /// expressed as a fraction of the total number of nodes #[derive(Copy, Clone, Debug)] @@ -63,7 +64,9 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync> adapter: &Network, fountain: &Fountain, ) -> Result { - let message_stream = adapter.proposal_chunks_stream().await; + let message_stream = adapter + .proposal_chunks_stream(FLAT_COMMITTEE, self.view) + .await; fountain.decode(message_stream).await.map(Block::from_bytes) } @@ -73,7 +76,9 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync> encoded_stream .for_each_concurrent(None, |chunk| async move { let message = ProposalChunkMsg { chunk }; - adapter.broadcast_block_chunk(self.view, message).await; + adapter + .broadcast_block_chunk(FLAT_COMMITTEE, self.view, message) + .await; }) .await; } @@ -86,10 +91,14 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync> // in the flat overlay, there's no need to wait for anyone before approving the block let approval = self.approve(block); adapter - .forward_approval(ApprovalMsg { - approval, - source: self.node_id, - }) + .forward_approval( + FLAT_COMMITTEE, + self.view, + ApprovalMsg { + approval, + source: self.node_id, + }, + ) .await; Ok(()) } @@ -98,7 +107,7 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync> // for now, let's pretend that consensus is reached as soon as the // block is approved by a share of the nodes let mut approvals = HashSet::new(); - let mut stream = Box::into_pin(adapter.approvals_stream().await); + let mut stream = Box::into_pin(adapter.approvals_stream(FLAT_COMMITTEE, self.view).await); // Shadow the original binding so that it can't be directly accessed // ever again. diff --git a/nomos-services/consensus/src/overlay/mod.rs b/nomos-services/consensus/src/overlay/mod.rs index dab6a871..037c159d 100644 --- a/nomos-services/consensus/src/overlay/mod.rs +++ b/nomos-services/consensus/src/overlay/mod.rs @@ -1,5 +1,4 @@ -#[allow(unused)] -mod committees; +pub mod committees; mod flat; // std