Adapt carnot network adapter interfaces and implementations to latest spec (#121)

* Add happy-path consensus engine

* tmp

* Fit types from spec (#124)

* Match types to spec

* Remove Output import

* Consensus engine rework (#126)

* rework

* fix test

* clippy happy

---------

Co-authored-by: Giacomo Pasini <Zeegomo@users.noreply.github.com>

* Adapt carnot network adapter interfaces and implementations

* Fix errors

* Update network with engine types

* Fit types yet again

---------

Co-authored-by: Al Liu <scygliu1@gmail.com>
Co-authored-by: Giacomo Pasini <g.pasini98@gmail.com>
Co-authored-by: Giacomo Pasini <Zeegomo@users.noreply.github.com>
This commit is contained in:
Daniel Sanchez 2023-05-01 17:47:56 +02:00 committed by GitHub
parent f8617d7331
commit 26d10856ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 166 additions and 110 deletions

View File

@ -6,3 +6,8 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
serde = { version = "1.0", optional = true }
[features]
default = []
serde1 = ["serde"]

View File

@ -1,7 +1,7 @@
use std::collections::{HashMap, HashSet};
mod types;
use types::*;
pub use types::*;
#[derive(Clone, Debug)]
pub struct Carnot<O: Overlay> {

View File

@ -12,6 +12,7 @@ pub type Committee = HashSet<NodeId>;
/// This enum represents the different types of messages that can be sent from the perspective of consensus and
/// can't be directly used in the network as they lack things like cryptographic signatures.
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub enum Payload {
/// Vote for a block in a view
Vote(Vote),
@ -23,11 +24,13 @@ pub enum Payload {
/// Returned
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct Vote {
pub block: BlockId,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct Timeout {
pub view: View,
pub sender: NodeId,
@ -36,6 +39,7 @@ pub struct Timeout {
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct NewView {
pub view: View,
pub sender: NodeId,
@ -44,6 +48,7 @@ pub struct NewView {
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct TimeoutQc {
pub view: View,
pub high_qc: Qc,
@ -51,6 +56,7 @@ pub struct TimeoutQc {
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct Block {
pub id: BlockId,
pub view: View,
@ -76,6 +82,7 @@ pub enum Output {
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct StandardQc {
pub view: View,
pub id: BlockId,
@ -91,12 +98,14 @@ impl StandardQc {
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct AggregateQc {
pub high_qc: StandardQc,
pub view: View,
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub enum Qc {
Standard(StandardQc),
Aggregated(AggregateQc),

View File

@ -12,6 +12,7 @@ authors = [
async-trait = { version = "0.1" }
blake2 = { version = "0.10" }
bytes = "1.3"
consensus-engine = { path = "../consensus-engine"}
futures = "0.3"
nomos-network = { path = "../nomos-services/network", optional = true }
raptorq = { version = "1.7", optional = true }

View File

@ -9,6 +9,7 @@ edition = "2021"
async-trait = "0.1"
bytes = "1.3"
chrono = "0.4"
consensus-engine = { path = "../../consensus-engine", features = ["serde1"] }
futures = "0.3"
nomos-network = { path = "../network" }
nomos-mempool = { path = "../mempool" }

View File

@ -227,7 +227,7 @@ pub struct Approval;
pub struct View {
seed: Seed,
staking_keys: BTreeMap<NodeId, Stake>,
pub view_n: u64,
pub view_n: consensus_engine::View,
}
impl View {
@ -357,8 +357,9 @@ impl View {
true
}
// TODO: use consensus_engine::View instead
pub fn id(&self) -> u64 {
self.view_n
self.view_n.try_into().unwrap()
}
// Verifies the block is new and the previous leader did not fail

View File

@ -11,14 +11,15 @@ use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio_stream::{wrappers::BroadcastStream, Stream};
use crate::network::messages::TimeoutQcMsg;
use crate::{
network::{
messages::{ProposalChunkMsg, VoteMsg},
NetworkAdapter,
},
overlay::committees::Committee,
View,
};
use consensus_engine::{TimeoutQc, View};
const MOCK_PUB_SUB_TOPIC: &str = "MockPubSubTopic";
const MOCK_BLOCK_CONTENT_TOPIC: MockContentTopic = MockContentTopic::new("MockSim", 1, "MockBlock");
@ -63,8 +64,7 @@ impl NetworkAdapter for MockAdapter {
async fn proposal_chunks_stream(
&self,
_committee: Committee,
_view: &View,
_view: View,
) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin> {
let stream_channel = self
.message_subscriber_channel()
@ -80,7 +80,9 @@ impl NetworkAdapter for MockAdapter {
== message.content_topic().content_topic_name
{
let payload = message.payload();
Some(ProposalChunkMsg::from_bytes(payload.as_bytes()).chunk)
Some(Bytes::from(
ProposalChunkMsg::from_bytes(payload.as_bytes()).chunk,
))
} else {
None
}
@ -92,14 +94,9 @@ impl NetworkAdapter for MockAdapter {
}))
}
async fn broadcast_block_chunk(
&self,
_committee: Committee,
_view: &View,
chunk_message: ProposalChunkMsg,
) {
async fn broadcast_block_chunk(&self, chunk_message: ProposalChunkMsg) {
let message = MockMessage::new(
String::from_utf8_lossy(chunk_message.as_bytes()).to_string(),
String::from_utf8_lossy(&chunk_message.as_bytes()).to_string(),
MOCK_BLOCK_CONTENT_TOPIC,
1,
chrono::Utc::now().timestamp() as usize,
@ -116,10 +113,21 @@ impl NetworkAdapter for MockAdapter {
};
}
async fn broadcast_timeout_qc(&self, _timeout_qc_msg: TimeoutQcMsg) {
todo!()
}
async fn timeout_qc_stream(
&self,
_view: View,
) -> Box<dyn Stream<Item = TimeoutQc> + Send + Sync + Unpin> {
todo!()
}
async fn votes_stream<Vote: DeserializeOwned>(
&self,
_committee: Committee,
_view: &View,
_view: View,
) -> Box<dyn Stream<Item = Vote> + Send> {
let stream_channel = self
.message_subscriber_channel()
@ -146,10 +154,10 @@ impl NetworkAdapter for MockAdapter {
)
}
async fn forward_approval<Vote: Serialize>(
async fn send_vote<Vote: Serialize>(
&self,
_committee: Committee,
_view: &View,
_view: View,
approval_message: VoteMsg<Vote>,
) where
Vote: Send,

View File

@ -5,12 +5,13 @@ use bytes::Bytes;
use futures::{Stream, StreamExt};
use tokio_stream::wrappers::BroadcastStream;
// internal
use crate::network::messages::TimeoutQcMsg;
use crate::network::{
messages::{ProposalChunkMsg, VoteMsg},
NetworkAdapter,
};
use crate::overlay::committees::Committee;
use crate::View;
use consensus_engine::{TimeoutQc, View};
use nomos_network::{
backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage},
NetworkMsg, NetworkService,
@ -109,6 +110,20 @@ impl WakuAdapter {
});
tokio_stream::StreamExt::merge(archive_stream, live_stream)
}
async fn broadcast(&self, bytes: Box<[u8]>, topic: WakuContentTopic) {
let message = WakuMessage::new(bytes, topic, 1, chrono::Utc::now().timestamp() as usize);
if let Err((_, _e)) = self
.network_relay
.send(NetworkMsg::Process(WakuBackendMessage::Broadcast {
message,
topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()),
}))
.await
{
todo!("log error");
};
}
}
#[async_trait::async_trait]
@ -123,52 +138,65 @@ impl NetworkAdapter for WakuAdapter {
async fn proposal_chunks_stream(
&self,
committee: Committee,
view: &View,
view: View,
) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin> {
let content_topic = proposal_topic(committee, view);
Box::new(Box::pin(
self.cached_stream_with_content_topic(content_topic)
self.cached_stream_with_content_topic(PROPOSAL_CONTENT_TOPIC.clone())
.await
.map(|message| {
.filter_map(move |message| {
let payload = message.payload();
ProposalChunkMsg::from_bytes(payload).chunk
let ProposalChunkMsg {
view: msg_view,
chunk,
} = ProposalChunkMsg::from_bytes(payload);
async move {
if view == msg_view {
Some(Bytes::from(chunk))
} else {
None
}
}
}),
))
}
async fn broadcast_block_chunk(
&self,
committee: Committee,
view: &View,
chunk_message: ProposalChunkMsg,
) {
let content_topic = proposal_topic(committee, view);
let message = WakuMessage::new(
chunk_message.as_bytes(),
content_topic,
1,
chrono::Utc::now().timestamp() as usize,
);
if let Err((_, _e)) = self
.network_relay
.send(NetworkMsg::Process(WakuBackendMessage::Broadcast {
message,
topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()),
}))
async fn broadcast_block_chunk(&self, chunk_message: ProposalChunkMsg) {
self.broadcast(chunk_message.as_bytes(), PROPOSAL_CONTENT_TOPIC)
.await
{
todo!("log error");
};
}
async fn broadcast_timeout_qc(&self, timeout_qc_msg: TimeoutQcMsg) {
self.broadcast(timeout_qc_msg.as_bytes(), TIMEOUT_QC_CONTENT_TOPIC)
.await
}
async fn timeout_qc_stream(
&self,
view: View,
) -> Box<dyn Stream<Item = TimeoutQc> + Send + Sync + Unpin> {
Box::new(Box::pin(
self.cached_stream_with_content_topic(TIMEOUT_QC_CONTENT_TOPIC.clone())
.await
.filter_map(move |message| {
let payload = message.payload();
let qc = TimeoutQcMsg::from_bytes(payload).qc;
async move {
if qc.view > view {
Some(qc)
} else {
None
}
}
}),
))
}
async fn votes_stream<Vote: DeserializeOwned>(
&self,
committee: Committee,
view: &View,
view: View,
) -> Box<dyn Stream<Item = Vote> + Send> {
let content_topic = proposal_topic(committee, view);
let content_topic = votes_topic(committee, view);
Box::new(Box::pin(
self.cached_stream_with_content_topic(content_topic)
.await
@ -179,13 +207,13 @@ impl NetworkAdapter for WakuAdapter {
))
}
async fn forward_approval<Vote: Serialize + Send>(
async fn send_vote<Vote: Serialize + Send>(
&self,
committee: Committee,
view: &View,
view: View,
approval_message: VoteMsg<Vote>,
) {
let content_topic = approval_topic(committee, view);
let content_topic = votes_topic(committee, view);
let message = WakuMessage::new(
approval_message.as_bytes(),
@ -206,20 +234,16 @@ impl NetworkAdapter for WakuAdapter {
}
}
fn approval_topic(committee: Committee, view: &View) -> WakuContentTopic {
fn votes_topic(committee: Committee, view: View) -> WakuContentTopic {
WakuContentTopic {
application_name: Cow::Borrowed(APPLICATION_NAME),
version: VERSION,
content_topic_name: Cow::Owned(format!("approval-{}-{}", committee.id(), view.id())),
content_topic_name: Cow::Owned(format!("votes-{}-{}", committee.id(), view)),
encoding: Encoding::Proto,
}
}
fn proposal_topic(committee: Committee, view: &View) -> WakuContentTopic {
WakuContentTopic {
application_name: Cow::Borrowed(APPLICATION_NAME),
version: VERSION,
content_topic_name: Cow::Owned(format!("proposal-{}-{}", committee.id(), view.id())),
encoding: Encoding::Proto,
}
}
const PROPOSAL_CONTENT_TOPIC: WakuContentTopic =
WakuContentTopic::new(APPLICATION_NAME, VERSION, "proposal", Encoding::Proto);
const TIMEOUT_QC_CONTENT_TOPIC: WakuContentTopic =
WakuContentTopic::new(APPLICATION_NAME, VERSION, "timeout-qc", Encoding::Proto);

View File

@ -1,26 +1,25 @@
// std
// crates
use bytes::Bytes;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
// internal
use crate::NodeId;
use consensus_engine::{TimeoutQc, View};
use nomos_core::wire;
#[derive(Clone)]
#[derive(Clone, Serialize, Deserialize)]
pub struct ProposalChunkMsg {
pub chunk: Bytes,
pub chunk: Box<[u8]>,
pub view: View,
}
impl ProposalChunkMsg {
pub fn as_bytes(&self) -> &[u8] {
&self.chunk
pub fn as_bytes(&self) -> Box<[u8]> {
wire::serialize(self).unwrap().into_boxed_slice()
}
pub fn from_bytes(data: &[u8]) -> Self {
Self {
chunk: Bytes::from(data.to_vec()),
}
wire::deserialize(data).unwrap()
}
}
@ -47,3 +46,19 @@ where
wire::deserialize(data).unwrap()
}
}
#[derive(Serialize, Deserialize)]
pub struct TimeoutQcMsg {
pub source: NodeId,
pub qc: TimeoutQc,
}
impl TimeoutQcMsg {
pub fn as_bytes(&self) -> Box<[u8]> {
wire::serialize(self).unwrap().into_boxed_slice()
}
pub fn from_bytes(data: &[u8]) -> Self {
wire::deserialize(data).unwrap()
}
}

View File

@ -6,9 +6,9 @@ use bytes::Bytes;
// crates
use futures::Stream;
// internal
use crate::network::messages::{ProposalChunkMsg, VoteMsg};
use crate::network::messages::{ProposalChunkMsg, TimeoutQcMsg, VoteMsg};
use crate::overlay::committees::Committee;
use crate::View;
use consensus_engine::{TimeoutQc, View};
use nomos_network::backends::NetworkBackend;
use nomos_network::NetworkService;
use overwatch_rs::services::relay::OutboundRelay;
@ -24,24 +24,23 @@ pub trait NetworkAdapter {
) -> Self;
async fn proposal_chunks_stream(
&self,
committee: Committee,
view: &View,
view: View,
) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin>;
async fn broadcast_block_chunk(
async fn broadcast_block_chunk(&self, chunk_msg: ProposalChunkMsg);
async fn broadcast_timeout_qc(&self, timeout_qc_msg: TimeoutQcMsg);
async fn timeout_qc_stream(
&self,
committee: Committee,
view: &View,
chunk_msg: ProposalChunkMsg,
);
view: View,
) -> Box<dyn Stream<Item = TimeoutQc> + Send + Sync + Unpin>;
async fn votes_stream<Vote: DeserializeOwned>(
&self,
committee: Committee,
view: &View,
view: View,
) -> Box<dyn Stream<Item = Vote> + Send>;
async fn forward_approval<Vote: Serialize + Send>(
async fn send_vote<Vote: Serialize + Send>(
&self,
committee: Committee,
view: &View,
approval: VoteMsg<Vote>,
view: View,
vote: VoteMsg<Vote>,
);
}

View File

@ -16,7 +16,7 @@ pub struct Member<const C: usize> {
id: NodeId,
committee: Committee,
committees: Committees<C>,
view_n: u64,
view_n: consensus_engine::View,
}
/// #Just a newtype index to be able to implement parent/children methods
@ -133,8 +133,7 @@ where
fountain: &Fountain,
) -> Result<Block<VoteTally::Qc, TxId>, FountainError> {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
let committee = self.committee;
let message_stream = adapter.proposal_chunks_stream(committee, view).await;
let message_stream = adapter.proposal_chunks_stream(view.view_n).await;
fountain.decode(message_stream).await.and_then(|b| {
deserializer(&b)
.deserialize::<Block<VoteTally::Qc, TxId>>()
@ -150,23 +149,15 @@ where
fountain: &Fountain,
) {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
let (left_child, right_child) = self.children_committes();
let block_bytes = block.as_bytes();
let encoded_stream = fountain.encode(&block_bytes);
encoded_stream
.for_each_concurrent(None, |chunk| async move {
let message = ProposalChunkMsg { chunk };
let r_child = right_child
.map(|right_child| {
adapter.broadcast_block_chunk(right_child, view, message.clone())
})
.into_iter();
let l_child = left_child
.map(|left_child| {
adapter.broadcast_block_chunk(left_child, view, message.clone())
})
.into_iter();
futures::future::join_all(r_child.chain(l_child)).await;
let message = ProposalChunkMsg {
chunk: chunk.to_vec().into_boxed_slice(),
view: view.view_n,
};
adapter.broadcast_block_chunk(message.clone()).await;
})
.await;
}

View File

@ -21,11 +21,11 @@ const FLAT_COMMITTEE: Committee = Committee::root();
pub struct Flat {
// TODO: this should be a const param, but we can't do that yet
node_id: NodeId,
view_n: u64,
view_n: consensus_engine::View,
}
impl Flat {
pub fn new(view_n: u64, node_id: NodeId) -> Self {
pub fn new(view_n: consensus_engine::View, node_id: NodeId) -> Self {
Self { node_id, view_n }
}
@ -56,7 +56,7 @@ where
fountain: &Fountain,
) -> Result<Block<VoteTally::Qc, TxId>, FountainError> {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
let message_stream = adapter.proposal_chunks_stream(FLAT_COMMITTEE, view).await;
let message_stream = adapter.proposal_chunks_stream(view.view_n).await;
fountain.decode(message_stream).await.and_then(|b| {
deserializer(&b)
.deserialize::<Block<VoteTally::Qc, TxId>>()
@ -76,10 +76,11 @@ where
let encoded_stream = fountain.encode(&block_bytes);
encoded_stream
.for_each_concurrent(None, |chunk| async move {
let message = ProposalChunkMsg { chunk };
adapter
.broadcast_block_chunk(FLAT_COMMITTEE, view, message)
.await;
let message = ProposalChunkMsg {
chunk: chunk.to_vec().into_boxed_slice(),
view: view.view_n,
};
adapter.broadcast_block_chunk(message).await;
})
.await;
}
@ -96,9 +97,9 @@ where
// in the flat overlay, there's no need to wait for anyone before approving the block
let approval = self.approve(block);
adapter
.forward_approval(
.send_vote(
FLAT_COMMITTEE,
view,
view.view_n,
VoteMsg {
vote: approval,
source: self.node_id,
@ -113,11 +114,12 @@ where
// for now, let's pretend that consensus is reached as soon as the
// block is approved by a share of the nodes
let stream = Box::into_pin(adapter.votes_stream(FLAT_COMMITTEE, view).await);
let stream = Box::into_pin(adapter.votes_stream(FLAT_COMMITTEE, view.view_n).await);
// Shadow the original binding so that it can't be directly accessed
// ever again.
if let Ok((qc, _)) = tally.tally(view.view_n, stream).await {
// TODO: Remove the `try_into` call when tally is refactored to use with latest consensus engine types
if let Ok((qc, _)) = tally.tally(view.view_n.try_into().unwrap(), stream).await {
qc
} else {
unimplemented!("consensus not reached")