Refactor NetworkAdapter (#258)

* Rework NetworkAdapter API

The NetworkAdapter API failed to isolate the internals by
providing a way to send a message to a user-provided channel while
the stream listeners expected specific formats.
Unify network messages under the same enum and simplify sending/
broadcasting messages.

* remove redundant inlines

* use committee.id()

* fmt
This commit is contained in:
Giacomo Pasini 2023-07-12 16:12:25 +02:00 committed by GitHub
parent 9467351c10
commit c29a641a9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 203 additions and 232 deletions

View File

@ -49,14 +49,12 @@ pub struct Committee {
}
impl Committee {
#[inline]
pub const fn new() -> Self {
Self {
members: BTreeSet::new(),
}
}
#[inline]
pub fn hash<D: digest::Digest>(
&self,
) -> digest::generic_array::GenericArray<u8, <D as digest::OutputSizeUser>::OutputSize> {
@ -67,42 +65,34 @@ impl Committee {
hasher.finalize()
}
#[inline]
pub fn contains(&self, node_id: &NodeId) -> bool {
self.members.contains(node_id)
}
#[inline]
pub fn insert(&mut self, node_id: NodeId) {
self.members.insert(node_id);
}
#[inline]
pub fn remove(&mut self, node_id: &NodeId) {
self.members.remove(node_id);
}
#[inline]
pub fn is_empty(&self) -> bool {
self.members.is_empty()
}
#[inline]
pub fn len(&self) -> usize {
self.members.len()
}
#[inline]
pub fn extend<'a>(&mut self, other: impl IntoIterator<Item = &'a NodeId>) {
self.members.extend(other);
}
#[inline]
pub fn id<D: digest::Digest<OutputSize = digest::typenum::U32>>(&self) -> CommitteeId {
CommitteeId::new(self.hash::<D>().into())
}
#[inline]
pub fn iter(&self) -> impl Iterator<Item = &NodeId> {
self.members.iter()
}

View File

@ -27,11 +27,13 @@ tracing = "0.1"
waku-bindings = { version = "0.1.1", optional = true }
bls-signatures = "0.14"
serde_with = "3.0.0"
blake2 = "0.10"
[features]
default = []
waku = ["nomos-network/waku", "waku-bindings"]
mock = ["nomos-network/mock"]
libp2p = ["nomos-network/libp2p"]
[dev-dependencies]
serde_json = "1.0.96"

View File

@ -20,7 +20,9 @@ use serde_with::serde_as;
use tokio::sync::oneshot::Sender;
use tracing::instrument;
// internal
use crate::network::messages::{NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg};
use crate::network::messages::{
NetworkMessage, NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg,
};
use crate::network::NetworkAdapter;
use crate::tally::{
happy::CarnotTally, timeout::TimeoutTally, unhappy::NewViewTally, CarnotTallySettings,
@ -742,43 +744,34 @@ where
Payload::Vote(vote) => {
adapter
.send(
&to,
vote.view,
VoteMsg {
NetworkMessage::Vote(VoteMsg {
voter: node_id,
vote,
qc: None, // TODO: handle root commmittee members
}
.as_bytes(),
"votes",
}),
&to,
)
.await;
}
Payload::Timeout(timeout) => {
adapter
.send(
&to,
timeout.view,
TimeoutMsg {
NetworkMessage::Timeout(TimeoutMsg {
voter: node_id,
vote: timeout,
}
.as_bytes(),
"timeout",
}),
&to,
)
.await;
}
Payload::NewView(new_view) => {
adapter
.send(
&to,
new_view.view,
NewViewMsg {
NetworkMessage::NewView(NewViewMsg {
voter: node_id,
vote: new_view,
}
.as_bytes(),
"new-view",
}),
&to,
)
.await;
}
@ -787,20 +780,20 @@ where
fountain
.encode(&proposal.as_bytes())
.for_each(|chunk| {
adapter.broadcast_block_chunk(ProposalChunkMsg {
adapter.broadcast(NetworkMessage::ProposalChunk(ProposalChunkMsg {
proposal: proposal.header().id,
chunk: chunk.to_vec().into_boxed_slice(),
view: proposal.header().view,
})
}))
})
.await;
}
Output::BroadcastTimeoutQc { timeout_qc } => {
adapter
.broadcast_timeout_qc(TimeoutQcMsg {
.broadcast(NetworkMessage::TimeoutQc(TimeoutQcMsg {
source: node_id,
qc: timeout_qc,
})
}))
.await;
}
}

View File

@ -8,7 +8,7 @@ use nomos_network::{
use overwatch_rs::services::{relay::OutboundRelay, ServiceData};
use tokio_stream::wrappers::BroadcastStream;
use crate::network::messages::{NewViewMsg, TimeoutMsg, TimeoutQcMsg};
use crate::network::messages::{NetworkMessage, NewViewMsg, TimeoutMsg, TimeoutQcMsg};
use crate::network::{
messages::{ProposalChunkMsg, VoteMsg},
BoxedStream, NetworkAdapter,
@ -84,27 +84,8 @@ impl NetworkAdapter for MockAdapter {
}))
}
async fn broadcast_block_chunk(&self, chunk_message: ProposalChunkMsg) {
let message = MockMessage::new(
String::from_utf8_lossy(&chunk_message.as_bytes()).to_string(),
MOCK_BLOCK_CONTENT_TOPIC,
1,
chrono::Utc::now().timestamp_nanos() as usize,
);
if let Err((e, _)) = self
.network_relay
.send(NetworkMsg::Process(MockBackendMessage::Broadcast {
msg: message,
topic: MOCK_PUB_SUB_TOPIC.to_string(),
}))
.await
{
tracing::error!("Failed to broadcast block chunk: {:?}", e);
};
}
async fn broadcast_timeout_qc(&self, _timeout_qc_msg: TimeoutQcMsg) {
todo!()
async fn broadcast(&self, message: NetworkMessage) {
self.send(message, &Committee::default()).await
}
async fn timeout_stream(&self, _committee: &Committee, _view: View) -> BoxedStream<TimeoutMsg> {
@ -145,9 +126,9 @@ impl NetworkAdapter for MockAdapter {
todo!()
}
async fn send(&self, _committee: &Committee, _view: View, payload: Box<[u8]>, _channel: &str) {
async fn send(&self, message: NetworkMessage, _committee: &Committee) {
let message = MockMessage::new(
String::from_utf8_lossy(&payload).to_string(),
String::from_utf8_lossy(&message.as_bytes()).to_string(),
MOCK_APPROVAL_CONTENT_TOPIC,
1,
chrono::Utc::now().timestamp_nanos() as usize,

View File

@ -1,3 +1,5 @@
// #[cfg(feature = "libp2p")]
// pub mod libp2p;
#[cfg(feature = "mock")]
pub mod mock;
#[cfg(feature = "waku")]

View File

@ -1,13 +1,10 @@
// std
use std::borrow::Cow;
use std::collections::hash_map::DefaultHasher;
use std::collections::BTreeSet;
use std::hash::{Hash, Hasher};
// crates
use futures::{Stream, StreamExt};
use tokio_stream::wrappers::BroadcastStream;
// internal
use crate::network::messages::{NewViewMsg, TimeoutMsg, TimeoutQcMsg};
use crate::network::messages::{NetworkMessage, NewViewMsg, TimeoutMsg, TimeoutQcMsg};
use crate::network::{
messages::{ProposalChunkMsg, VoteMsg},
BoxedStream, NetworkAdapter,
@ -132,158 +129,7 @@ impl WakuAdapter {
tokio_stream::StreamExt::merge(live_stream, archive_stream)
}
async fn broadcast(&self, bytes: Box<[u8]>, topic: WakuContentTopic) {
let message = WakuMessage::new(
bytes,
topic,
1,
chrono::Utc::now().timestamp_nanos() as usize,
[],
false,
);
if let Err((_, _e)) = self
.network_relay
.send(NetworkMsg::Process(WakuBackendMessage::Broadcast {
message,
topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()),
}))
.await
{
todo!("log error");
};
}
}
#[async_trait::async_trait]
impl NetworkAdapter for WakuAdapter {
type Backend = Waku;
async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self {
Self { network_relay }
}
async fn proposal_chunks_stream(&self, view: View) -> BoxedStream<ProposalChunkMsg> {
Box::new(Box::pin(
self.cached_stream_with_content_topic(PROPOSAL_CONTENT_TOPIC)
.await
.filter_map(move |message| {
let payload = message.payload();
let proposal = ProposalChunkMsg::from_bytes(payload);
async move {
if view == proposal.view {
Some(proposal)
} else {
None
}
}
}),
))
}
async fn broadcast_block_chunk(&self, chunk_message: ProposalChunkMsg) {
let message = WakuMessage::new(
chunk_message.as_bytes(),
PROPOSAL_CONTENT_TOPIC,
1,
chrono::Utc::now().timestamp_nanos() as usize,
[],
false,
);
if let Err((_, _e)) = self
.network_relay
.send(NetworkMsg::Process(WakuBackendMessage::Broadcast {
message,
topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC),
}))
.await
{
todo!("log error");
};
}
async fn broadcast_timeout_qc(&self, timeout_qc_msg: TimeoutQcMsg) {
self.broadcast(timeout_qc_msg.as_bytes(), TIMEOUT_QC_CONTENT_TOPIC)
.await
}
async fn timeout_stream(&self, committee: &Committee, view: View) -> BoxedStream<TimeoutMsg> {
let content_topic = create_topic("timeout", committee, view);
Box::new(Box::pin(
self.cached_stream_with_content_topic(content_topic)
.await
.filter_map(move |message| {
let payload = message.payload();
let timeout = TimeoutMsg::from_bytes(payload);
async move {
if timeout.vote.view == view {
Some(timeout)
} else {
None
}
}
}),
))
}
async fn timeout_qc_stream(&self, view: View) -> BoxedStream<TimeoutQcMsg> {
Box::new(Box::pin(
self.cached_stream_with_content_topic(TIMEOUT_QC_CONTENT_TOPIC)
.await
.filter_map(move |message| {
let payload = message.payload();
let qc = TimeoutQcMsg::from_bytes(payload);
async move {
if qc.qc.view() == view {
Some(qc)
} else {
None
}
}
}),
))
}
async fn votes_stream(
&self,
committee: &Committee,
view: View,
proposal_id: BlockId,
) -> BoxedStream<VoteMsg> {
let content_topic = create_topic("votes", committee, view);
Box::new(Box::pin(
self.cached_stream_with_content_topic(content_topic)
.await
.filter_map(move |message| {
let payload = message.payload();
let vote = VoteMsg::from_bytes(payload);
async move {
if vote.vote.block == proposal_id {
Some(vote)
} else {
None
}
}
}),
))
}
async fn new_view_stream(&self, committee: &Committee, view: View) -> BoxedStream<NewViewMsg> {
let content_topic = create_topic("new-view", committee, view);
Box::new(Box::pin(
self.cached_stream_with_content_topic(content_topic)
.await
.map(|message| {
let payload = message.payload();
NewViewMsg::from_bytes(payload)
}),
))
}
async fn send(&self, committee: &Committee, view: View, payload: Box<[u8]>, channel: &str) {
let content_topic = create_topic(channel, committee, view);
async fn inner_broadcast(&self, payload: Box<[u8]>, content_topic: WakuContentTopic) {
let message = WakuMessage::new(
payload,
content_topic,
@ -305,27 +151,165 @@ impl NetworkAdapter for WakuAdapter {
}
}
fn create_topic(tag: &str, committee: &Committee, view: View) -> WakuContentTopic {
#[async_trait::async_trait]
impl NetworkAdapter for WakuAdapter {
type Backend = Waku;
async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self {
Self { network_relay }
}
async fn proposal_chunks_stream(&self, view: View) -> BoxedStream<ProposalChunkMsg> {
Box::new(Box::pin(
self.cached_stream_with_content_topic(create_topic(PROPOSAL_TAG, None))
.await
.filter_map(move |message| {
let payload = message.payload();
let proposal = ProposalChunkMsg::from_bytes(payload);
async move {
if view == proposal.view {
Some(proposal)
} else {
None
}
}
}),
))
}
async fn broadcast(&self, message: NetworkMessage) {
let topic = create_topic(message_tag(&message), None);
self.inner_broadcast(unwrap_message_to_bytes(&message), topic)
.await
}
async fn timeout_stream(&self, committee: &Committee, view: View) -> BoxedStream<TimeoutMsg> {
let content_topic = create_topic(TIMEOUT_TAG, Some(committee));
Box::new(Box::pin(
self.cached_stream_with_content_topic(content_topic)
.await
.filter_map(move |message| {
let payload = message.payload();
let timeout = TimeoutMsg::from_bytes(payload);
async move {
if timeout.vote.view == view {
Some(timeout)
} else {
None
}
}
}),
))
}
async fn timeout_qc_stream(&self, view: View) -> BoxedStream<TimeoutQcMsg> {
Box::new(Box::pin(
self.cached_stream_with_content_topic(create_topic(TIMEOUT_QC_TAG, None))
.await
.filter_map(move |message| {
let payload = message.payload();
let qc = TimeoutQcMsg::from_bytes(payload);
async move {
if qc.qc.view() == view {
Some(qc)
} else {
None
}
}
}),
))
}
async fn votes_stream(
&self,
committee: &Committee,
view: View,
proposal_id: BlockId,
) -> BoxedStream<VoteMsg> {
let content_topic = create_topic(VOTE_TAG, Some(committee));
Box::new(Box::pin(
self.cached_stream_with_content_topic(content_topic)
.await
.filter_map(move |message| {
let payload = message.payload();
let vote = VoteMsg::from_bytes(payload);
async move {
if vote.vote.block == proposal_id && vote.vote.view == view {
Some(vote)
} else {
None
}
}
}),
))
}
async fn new_view_stream(&self, committee: &Committee, view: View) -> BoxedStream<NewViewMsg> {
let content_topic = create_topic(NEW_VIEW_TAG, Some(committee));
Box::new(Box::pin(
self.cached_stream_with_content_topic(content_topic)
.await
.filter_map(move |message| {
let payload = message.payload();
let new_view = NewViewMsg::from_bytes(payload);
async move {
if new_view.vote.view == view {
Some(new_view)
} else {
None
}
}
}),
))
}
async fn send(&self, message: NetworkMessage, committee: &Committee) {
let topic = create_topic(message_tag(&message), Some(committee));
self.inner_broadcast(unwrap_message_to_bytes(&message), topic)
.await
}
}
fn create_topic(tag: &str, committee: Option<&Committee>) -> WakuContentTopic {
WakuContentTopic {
application_name: Cow::Borrowed(APPLICATION_NAME),
version: VERSION,
content_topic_name: Cow::Owned(format!("{}-{}-{}", tag, hash_set(committee), view)),
content_topic_name: Cow::Owned(format!(
"{}{}",
tag,
committee
.map(|c| format!("-{}", c.id::<blake2::Blake2s256>()))
.unwrap_or_default()
)),
encoding: Encoding::Proto,
}
}
const PROPOSAL_CONTENT_TOPIC: WakuContentTopic =
WakuContentTopic::new(APPLICATION_NAME, VERSION, "proposal", Encoding::Proto);
const TIMEOUT_QC_CONTENT_TOPIC: WakuContentTopic =
WakuContentTopic::new(APPLICATION_NAME, VERSION, "timeout-qc", Encoding::Proto);
// since we use content topic to filter messages, we can remove the tag from the message
fn unwrap_message_to_bytes(message: &NetworkMessage) -> Box<[u8]> {
match message {
NetworkMessage::NewView(msg) => msg.as_bytes(),
NetworkMessage::ProposalChunk(msg) => msg.as_bytes(),
NetworkMessage::Vote(msg) => msg.as_bytes(),
NetworkMessage::Timeout(msg) => msg.as_bytes(),
NetworkMessage::TimeoutQc(msg) => msg.as_bytes(),
}
}
// TODO: Maybe use a secure hasher instead
fn hash_set(c: &Committee) -> u64 {
let mut s = DefaultHasher::new();
// ensure consistent iteration across nodes
let c = c.iter().collect::<BTreeSet<_>>();
for e in c.iter() {
e.hash(&mut s);
fn message_tag(message: &NetworkMessage) -> &str {
match message {
NetworkMessage::NewView(_) => NEW_VIEW_TAG,
NetworkMessage::ProposalChunk(_) => PROPOSAL_TAG,
NetworkMessage::Vote(_) => VOTE_TAG,
NetworkMessage::Timeout(_) => TIMEOUT_TAG,
NetworkMessage::TimeoutQc(_) => TIMEOUT_QC_TAG,
}
s.finish()
}
const NEW_VIEW_TAG: &str = "new-view";
const PROPOSAL_TAG: &str = "proposal";
const VOTE_TAG: &str = "vote";
const TIMEOUT_TAG: &str = "timeout";
const TIMEOUT_QC_TAG: &str = "timeout-qc";

View File

@ -83,3 +83,21 @@ impl TimeoutQcMsg {
wire::deserialize(data).unwrap()
}
}
#[derive(Serialize, Deserialize)]
pub enum NetworkMessage {
Timeout(TimeoutMsg),
TimeoutQc(TimeoutQcMsg),
Vote(VoteMsg),
NewView(NewViewMsg),
ProposalChunk(ProposalChunkMsg),
}
impl NetworkMessage {
pub fn as_bytes(&self) -> Box<[u8]> {
wire::serialize(self).unwrap().into_boxed_slice()
}
pub fn from_bytes(data: &[u8]) -> Self {
wire::deserialize(data).unwrap()
}
}

View File

@ -5,7 +5,9 @@ pub mod messages;
// crates
use futures::Stream;
// internal
use crate::network::messages::{NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg};
use crate::network::messages::{
NetworkMessage, NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg,
};
use consensus_engine::{BlockId, Committee, View};
use nomos_network::backends::NetworkBackend;
use nomos_network::NetworkService;
@ -24,8 +26,7 @@ pub trait NetworkAdapter {
&self,
view: View,
) -> Box<dyn Stream<Item = ProposalChunkMsg> + Send + Sync + Unpin>;
async fn broadcast_block_chunk(&self, chunk_msg: ProposalChunkMsg);
async fn broadcast_timeout_qc(&self, timeout_qc_msg: TimeoutQcMsg);
async fn broadcast(&self, message: NetworkMessage);
async fn timeout_stream(&self, committee: &Committee, view: View) -> BoxedStream<TimeoutMsg>;
async fn timeout_qc_stream(&self, view: View) -> BoxedStream<TimeoutQcMsg>;
async fn votes_stream(
@ -35,5 +36,5 @@ pub trait NetworkAdapter {
proposal_id: BlockId,
) -> BoxedStream<VoteMsg>;
async fn new_view_stream(&self, committee: &Committee, view: View) -> BoxedStream<NewViewMsg>;
async fn send(&self, committee: &Committee, view: View, payload: Box<[u8]>, channel: &str);
async fn send(&self, message: NetworkMessage, committee: &Committee);
}