Use commitee and view information for consensus network adapter (#57)

* Add committee and view information to network adapter

* Use committee and view on waku adapter

* Add committee and view info to flat view implementation.

* Clippy happy

* Rename flat -> root

* Split broadcast block iterator into l/r childs

* Extract topics to builder functions
This commit is contained in:
Daniel Sanchez 2023-01-31 12:03:45 +01:00 committed by GitHub
parent f5175c74c0
commit 1e20c3b6cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 157 additions and 72 deletions

View File

@ -262,4 +262,8 @@ impl View {
pub fn is_leader(&self, _node_id: NodeId) -> bool { pub fn is_leader(&self, _node_id: NodeId) -> bool {
todo!() todo!()
} }
pub fn id(&self) -> u64 {
self._view_n
}
} }

View File

@ -1,4 +1,5 @@
// std // std
use std::borrow::Cow;
// crates // crates
use bytes::Bytes; use bytes::Bytes;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
@ -8,6 +9,7 @@ use crate::network::{
messages::{ApprovalMsg, ProposalChunkMsg}, messages::{ApprovalMsg, ProposalChunkMsg},
NetworkAdapter, NetworkAdapter,
}; };
use crate::overlay::committees::Committee;
use crate::{Approval, View}; use crate::{Approval, View};
use nomos_network::{ use nomos_network::{
backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage}, backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage},
@ -19,11 +21,8 @@ use waku_bindings::{Encoding, WakuContentTopic, WakuMessage, WakuPubSubTopic};
const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic = const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic =
WakuPubSubTopic::new("CarnotSim", Encoding::Proto); WakuPubSubTopic::new("CarnotSim", Encoding::Proto);
const WAKU_CARNOT_BLOCK_CONTENT_TOPIC: WakuContentTopic = const APPLICATION_NAME: &str = "CarnotSim";
WakuContentTopic::new("CarnotSim", 1, "CarnotBlock", Encoding::Proto); const VERSION: usize = 1;
const WAKU_CARNOT_APPROVAL_CONTENT_TOPIC: WakuContentTopic =
WakuContentTopic::new("CarnotSim", 1, "CarnotApproval", Encoding::Proto);
pub struct WakuAdapter { pub struct WakuAdapter {
network_relay: OutboundRelay<<NetworkService<Waku> as ServiceData>::Message>, network_relay: OutboundRelay<<NetworkService<Waku> as ServiceData>::Message>,
@ -61,40 +60,50 @@ impl NetworkAdapter for WakuAdapter {
Self { network_relay } Self { network_relay }
} }
async fn proposal_chunks_stream(&self) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin> { async fn proposal_chunks_stream(
&self,
committee: Committee,
view: &View,
) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin> {
let stream_channel = self let stream_channel = self
.message_subscriber_channel() .message_subscriber_channel()
.await .await
.unwrap_or_else(|_e| todo!("handle error")); .unwrap_or_else(|_e| todo!("handle error"));
Box::new(BroadcastStream::new(stream_channel).filter_map(|msg| { let content_topic = proposal_topic(committee, view);
Box::pin(async move { Box::new(
match msg { BroadcastStream::new(stream_channel)
Ok(event) => match event { .zip(futures::stream::repeat(content_topic))
NetworkEvent::RawMessage(message) => { .filter_map(|(msg, content_topic)| {
// TODO: this should actually check the whole content topic, Box::pin(async move {
// waiting for this [PR](https://github.com/waku-org/waku-rust-bindings/pull/28) match msg {
if WAKU_CARNOT_BLOCK_CONTENT_TOPIC.content_topic_name Ok(event) => match event {
== message.content_topic().content_topic_name NetworkEvent::RawMessage(message) => {
{ if &content_topic == message.content_topic() {
let payload = message.payload(); let payload = message.payload();
Some(ProposalChunkMsg::from_bytes(payload).chunk) Some(ProposalChunkMsg::from_bytes(payload).chunk)
} else { } else {
None None
} }
}
},
Err(_e) => None,
} }
}, })
Err(_e) => None, }),
} )
})
}))
} }
async fn broadcast_block_chunk(&self, _view: &View, chunk_message: ProposalChunkMsg) { async fn broadcast_block_chunk(
// TODO: probably later, depending on the view we should map to different content topics &self,
// but this is an ongoing idea that should/will be discus. committee: Committee,
view: &View,
chunk_message: ProposalChunkMsg,
) {
let content_topic = proposal_topic(committee, view);
let message = WakuMessage::new( let message = WakuMessage::new(
chunk_message.as_bytes(), chunk_message.as_bytes(),
WAKU_CARNOT_BLOCK_CONTENT_TOPIC, content_topic,
1, 1,
chrono::Utc::now().timestamp() as usize, chrono::Utc::now().timestamp() as usize,
); );
@ -110,38 +119,48 @@ impl NetworkAdapter for WakuAdapter {
}; };
} }
async fn approvals_stream(&self) -> Box<dyn Stream<Item = Approval> + Send> { async fn approvals_stream(
&self,
committee: Committee,
view: &View,
) -> Box<dyn Stream<Item = Approval> + Send> {
let content_topic = approval_topic(committee, view);
let stream_channel = self let stream_channel = self
.message_subscriber_channel() .message_subscriber_channel()
.await .await
.unwrap_or_else(|_e| todo!("handle error")); .unwrap_or_else(|_e| todo!("handle error"));
Box::new( Box::new(
BroadcastStream::new(stream_channel).filter_map(|msg| async move { BroadcastStream::new(stream_channel)
match msg { .zip(futures::stream::repeat(content_topic))
Ok(event) => match event { .filter_map(|(msg, content_topic)| async move {
NetworkEvent::RawMessage(message) => { match msg {
// TODO: this should actually check the whole content topic, Ok(event) => match event {
// waiting for this [PR](https://github.com/waku-org/waku-rust-bindings/pull/28) NetworkEvent::RawMessage(message) => {
if WAKU_CARNOT_APPROVAL_CONTENT_TOPIC.content_topic_name if &content_topic == message.content_topic() {
== message.content_topic().content_topic_name let payload = message.payload();
{ Some(ApprovalMsg::from_bytes(payload).approval)
let payload = message.payload(); } else {
Some(ApprovalMsg::from_bytes(payload).approval) None
} else { }
None
} }
} },
}, Err(_e) => None,
Err(_e) => None, }
} }),
}),
) )
} }
async fn forward_approval(&self, approval_message: ApprovalMsg) { async fn forward_approval(
&self,
committee: Committee,
view: &View,
approval_message: ApprovalMsg,
) {
let content_topic = approval_topic(committee, view);
let message = WakuMessage::new( let message = WakuMessage::new(
approval_message.as_bytes(), approval_message.as_bytes(),
WAKU_CARNOT_APPROVAL_CONTENT_TOPIC, content_topic,
1, 1,
chrono::Utc::now().timestamp() as usize, chrono::Utc::now().timestamp() as usize,
); );
@ -157,3 +176,21 @@ impl NetworkAdapter for WakuAdapter {
}; };
} }
} }
fn approval_topic(committee: Committee, view: &View) -> WakuContentTopic {
WakuContentTopic {
application_name: Cow::Borrowed(APPLICATION_NAME),
version: VERSION,
content_topic_name: Cow::Owned(format!("approval-{}-{}", committee.id(), view.id())),
encoding: Encoding::Proto,
}
}
fn proposal_topic(committee: Committee, view: &View) -> WakuContentTopic {
WakuContentTopic {
application_name: Cow::Borrowed(APPLICATION_NAME),
version: VERSION,
content_topic_name: Cow::Owned(format!("proposal-{}-{}", committee.id(), view.id())),
encoding: Encoding::Proto,
}
}

View File

@ -4,6 +4,7 @@ use bytes::Bytes;
// internal // internal
use crate::{Approval, NodeId}; use crate::{Approval, NodeId};
#[derive(Clone)]
pub struct ProposalChunkMsg { pub struct ProposalChunkMsg {
pub chunk: Bytes, pub chunk: Bytes,
} }

View File

@ -7,6 +7,7 @@ use bytes::Bytes;
use futures::Stream; use futures::Stream;
// internal // internal
use crate::network::messages::{ApprovalMsg, ProposalChunkMsg}; use crate::network::messages::{ApprovalMsg, ProposalChunkMsg};
use crate::overlay::committees::Committee;
use crate::{Approval, View}; use crate::{Approval, View};
use nomos_network::backends::NetworkBackend; use nomos_network::backends::NetworkBackend;
use nomos_network::NetworkService; use nomos_network::NetworkService;
@ -19,8 +20,21 @@ pub trait NetworkAdapter {
async fn new( async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>, network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self; ) -> Self;
async fn proposal_chunks_stream(&self) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin>; async fn proposal_chunks_stream(
async fn broadcast_block_chunk(&self, view: &View, chunk_msg: ProposalChunkMsg); &self,
async fn approvals_stream(&self) -> Box<dyn Stream<Item = Approval> + Send>; committee: Committee,
async fn forward_approval(&self, approval: ApprovalMsg); view: &View,
) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin>;
async fn broadcast_block_chunk(
&self,
committee: Committee,
view: &View,
chunk_msg: ProposalChunkMsg,
);
async fn approvals_stream(
&self,
committee: Committee,
view: &View,
) -> Box<dyn Stream<Item = Approval> + Send>;
async fn forward_approval(&self, committee: Committee, view: &View, approval: ApprovalMsg);
} }

View File

@ -1,5 +1,4 @@
// std // std
use std::pin::Pin;
// crates // crates
use futures::StreamExt; use futures::StreamExt;
use rand::{seq::SliceRandom, SeedableRng}; use rand::{seq::SliceRandom, SeedableRng};
@ -10,6 +9,8 @@ use crate::network::NetworkAdapter;
/// View of the tree overlay centered around a specific member /// View of the tree overlay centered around a specific member
pub struct Member<'view, const C: usize> { pub struct Member<'view, const C: usize> {
// id is not used now, but gonna probably used it for later checking later on
#[allow(dead_code)]
id: NodeId, id: NodeId,
committee: Committee, committee: Committee,
committees: Committees<'view, C>, committees: Committees<'view, C>,
@ -54,6 +55,14 @@ impl<'view, const C: usize> Committees<'view, C> {
} }
impl Committee { impl Committee {
pub const fn root() -> Self {
Self(0)
}
pub fn id(&self) -> usize {
self.0
}
/// Return the left and right children committee, if any /// Return the left and right children committee, if any
pub fn children(&self) -> (Committee, Committee) { pub fn children(&self) -> (Committee, Committee) {
( (
@ -112,19 +121,31 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const
adapter: &Network, adapter: &Network,
fountain: &Fountain, fountain: &Fountain,
) -> Result<Block, FountainError> { ) -> Result<Block, FountainError> {
let message_stream = adapter.proposal_chunks_stream().await; let committee = self.committee;
let view = self.committees.view;
let message_stream = adapter.proposal_chunks_stream(committee, view).await;
fountain.decode(message_stream).await.map(Block::from_bytes) fountain.decode(message_stream).await.map(Block::from_bytes)
} }
async fn broadcast_block(&self, block: Block, adapter: &Network, fountain: &Fountain) { async fn broadcast_block(&self, block: Block, adapter: &Network, fountain: &Fountain) {
let (left_child, right_child) = self.children_committes();
let view = self.committees.view;
let block_bytes = block.as_bytes(); let block_bytes = block.as_bytes();
let encoded_stream = fountain.encode(&block_bytes); let encoded_stream = fountain.encode(&block_bytes);
encoded_stream encoded_stream
.for_each_concurrent(None, |chunk| async move { .for_each_concurrent(None, |chunk| async move {
let message = ProposalChunkMsg { chunk }; let message = ProposalChunkMsg { chunk };
adapter let r_child = right_child
.broadcast_block_chunk(self.committees.view, message) .map(|right_child| {
.await; adapter.broadcast_block_chunk(right_child, view, message.clone())
})
.into_iter();
let l_child = left_child
.map(|left_child| {
adapter.broadcast_block_chunk(left_child, view, message.clone())
})
.into_iter();
futures::future::join_all(r_child.chain(l_child)).await;
}) })
.await; .await;
} }
@ -132,7 +153,7 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const
async fn approve_and_forward( async fn approve_and_forward(
&self, &self,
_block: &Block, _block: &Block,
adapter: &Network, _adapter: &Network,
) -> Result<(), Box<dyn Error>> { ) -> Result<(), Box<dyn Error>> {
// roughly, we want to do something like this: // roughly, we want to do something like this:
// 1. wait for left and right children committees to approve // 1. wait for left and right children committees to approve
@ -144,7 +165,7 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const
todo!() todo!()
} }
async fn wait_for_consensus(&self, _approval: &Block, adapter: &Network) { async fn wait_for_consensus(&self, _approval: &Block, _adapter: &Network) {
// maybe the leader publishing the QC? // maybe the leader publishing the QC?
} }
} }

View File

@ -7,9 +7,10 @@ use futures::StreamExt;
use super::*; use super::*;
use crate::network::messages::{ApprovalMsg, ProposalChunkMsg}; use crate::network::messages::{ApprovalMsg, ProposalChunkMsg};
use crate::network::NetworkAdapter; use crate::network::NetworkAdapter;
use crate::overlay::committees::Committee;
const DEFAULT_THRESHOLD: Threshold = Threshold::new(2, 3); const DEFAULT_THRESHOLD: Threshold = Threshold::new(2, 3);
const FLAT_COMMITTEE: Committee = Committee::root();
/// The share of nodes that need to approve a block for it to be valid /// The share of nodes that need to approve a block for it to be valid
/// expressed as a fraction of the total number of nodes /// expressed as a fraction of the total number of nodes
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
@ -63,7 +64,9 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync>
adapter: &Network, adapter: &Network,
fountain: &Fountain, fountain: &Fountain,
) -> Result<Block, FountainError> { ) -> Result<Block, FountainError> {
let message_stream = adapter.proposal_chunks_stream().await; let message_stream = adapter
.proposal_chunks_stream(FLAT_COMMITTEE, self.view)
.await;
fountain.decode(message_stream).await.map(Block::from_bytes) fountain.decode(message_stream).await.map(Block::from_bytes)
} }
@ -73,7 +76,9 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync>
encoded_stream encoded_stream
.for_each_concurrent(None, |chunk| async move { .for_each_concurrent(None, |chunk| async move {
let message = ProposalChunkMsg { chunk }; let message = ProposalChunkMsg { chunk };
adapter.broadcast_block_chunk(self.view, message).await; adapter
.broadcast_block_chunk(FLAT_COMMITTEE, self.view, message)
.await;
}) })
.await; .await;
} }
@ -86,10 +91,14 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync>
// in the flat overlay, there's no need to wait for anyone before approving the block // in the flat overlay, there's no need to wait for anyone before approving the block
let approval = self.approve(block); let approval = self.approve(block);
adapter adapter
.forward_approval(ApprovalMsg { .forward_approval(
approval, FLAT_COMMITTEE,
source: self.node_id, self.view,
}) ApprovalMsg {
approval,
source: self.node_id,
},
)
.await; .await;
Ok(()) Ok(())
} }
@ -98,7 +107,7 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync>
// for now, let's pretend that consensus is reached as soon as the // for now, let's pretend that consensus is reached as soon as the
// block is approved by a share of the nodes // block is approved by a share of the nodes
let mut approvals = HashSet::new(); let mut approvals = HashSet::new();
let mut stream = Box::into_pin(adapter.approvals_stream().await); let mut stream = Box::into_pin(adapter.approvals_stream(FLAT_COMMITTEE, self.view).await);
// Shadow the original binding so that it can't be directly accessed // Shadow the original binding so that it can't be directly accessed
// ever again. // ever again.

View File

@ -1,5 +1,4 @@
#[allow(unused)] pub mod committees;
mod committees;
mod flat; mod flat;
// std // std