From f8422fc7a86c9500189230f57d338e0b558a7880 Mon Sep 17 00:00:00 2001 From: Giacomo Pasini Date: Wed, 2 Aug 2023 14:07:44 +0200 Subject: [PATCH] Add initial implementation of libp2p consensus adapter (#279) * Add initial implementation of libp2p consensus adapter Co-authored-by: Youngjoon Lee * fix * Handle all message types received via gossipsub (#283) * remove todo Co-authored-by: Youngjoon Lee --------- Co-authored-by: Youngjoon Lee --- nomos-services/consensus/Cargo.toml | 3 +- .../consensus/src/network/adapters/libp2p.rs | 376 ++++++++++++++++++ .../consensus/src/network/adapters/mod.rs | 4 +- 3 files changed, 380 insertions(+), 3 deletions(-) create mode 100644 nomos-services/consensus/src/network/adapters/libp2p.rs diff --git a/nomos-services/consensus/Cargo.toml b/nomos-services/consensus/Cargo.toml index fc7076a3..1797d9fd 100644 --- a/nomos-services/consensus/Cargo.toml +++ b/nomos-services/consensus/Cargo.toml @@ -27,13 +27,14 @@ tracing = "0.1" waku-bindings = { version = "0.1.1", optional = true } bls-signatures = "0.14" serde_with = "3.0.0" +nomos-libp2p = { path = "../../nomos-libp2p", optional = true } blake2 = "0.10" [features] default = [] waku = ["nomos-network/waku", "waku-bindings"] mock = ["nomos-network/mock"] -libp2p = ["nomos-network/libp2p"] +libp2p = ["nomos-network/libp2p", "nomos-libp2p"] [dev-dependencies] serde_json = "1.0.96" diff --git a/nomos-services/consensus/src/network/adapters/libp2p.rs b/nomos-services/consensus/src/network/adapters/libp2p.rs new file mode 100644 index 00000000..853cf4ed --- /dev/null +++ b/nomos-services/consensus/src/network/adapters/libp2p.rs @@ -0,0 +1,376 @@ +// std +use std::collections::{BTreeMap, HashMap}; +use std::ops::DerefMut; +use std::sync::{Arc, Mutex}; +// crates +use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast::error::RecvError; +use tokio::sync::mpsc::{error::TrySendError, Receiver, Sender}; +use tokio_stream::wrappers::ReceiverStream; +// internal +use crate::network::messages::{NewViewMsg, TimeoutMsg, TimeoutQcMsg}; +use crate::network::{ + messages::{NetworkMessage, ProposalChunkMsg, VoteMsg}, + BoxedStream, NetworkAdapter, +}; +use consensus_engine::{BlockId, Committee, CommitteeId, View}; +use nomos_core::wire; +use nomos_network::{ + backends::libp2p::{Command, Event, EventKind, Libp2p}, + NetworkMsg, NetworkService, +}; +use overwatch_rs::services::{relay::OutboundRelay, ServiceData}; + +const TOPIC: &str = "/carnot/proto"; +// TODO: this could be tailored per message (e.g. we need to store only a few proposals per view but might need a lot of votes) +const BUFFER_SIZE: usize = 500; + +type Relay = OutboundRelay< as ServiceData>::Message>; + +/// Due to network effects, latencies, or other factors, it is possible that a node may receive messages +/// out of order, or simply messages that are relevant to future views. +/// Since the implementation only starts listening for a message when it is needed, we need to store +/// messages so that they can be returned when needed. +/// +/// Synched nodes can't fall more than a view behind the leader, and in a healthy network we expect the difference +/// between a node's view and the leader's view to be small. Given this, we can limit the size of the cache to a few +/// views and automatically clear it when the node's view is updated. +/// Messages that fall out of the cache (either evicted or never inserted because of view limits) will be discarded and +/// will have to be requested again from the network. +#[derive(Clone)] +struct MessageCache { + // This will always contain VIEW_SIZE_LIMIT consecutive entries + cache: Arc>>, +} + +// This is essentially a synchronization for a single consumer/single producer where the producer must be able to +// buffer messages even if no consumer showed up yet. +// Lock-free thread safe ring buffer exists but haven't found a good implementation for rust yet so let's just use +// channels for now. +struct Spsc { + sender: Sender, + receiver: Option>, +} + +impl Default for Spsc { + fn default() -> Self { + let (sender, receiver) = tokio::sync::mpsc::channel(BUFFER_SIZE); + Self { + sender, + receiver: Some(receiver), + } + } +} + +impl Spsc { + fn recv_or_restore(&mut self) -> Receiver { + match self.receiver.take() { + Some(recv) => recv, + None => { + // somebody already requested the receiver, let's create a new channel + let (sender, receiver) = tokio::sync::mpsc::channel(BUFFER_SIZE); + self.sender = sender; + receiver + } + } + } + + fn try_send(&mut self, message: T) { + match self.sender.try_send(message) { + Ok(()) => {} + Err(TrySendError::Closed(message)) => { + let (sender, receiver) = tokio::sync::mpsc::channel(BUFFER_SIZE); + self.sender = sender; + self.receiver = Some(receiver); + self.sender + .try_send(message) + .expect("new channel should be empty"); + } + Err(TrySendError::Full(_)) => tracing::error!("full channel, dropping message"), + } + } +} + +#[derive(Default)] +struct Messages { + proposal_chunks: Spsc, + votes: HashMap>>, + new_views: HashMap>, + timeouts: HashMap>, + timeout_qcs: Spsc, +} + +/// Requesting the same stream type multiple times will re-initialize it and new items will only be forwarded to the latest one. +/// It's required for the consumer to keep the stream around for the time it's necessary +#[derive(Clone)] +pub struct Libp2pAdapter { + network_relay: OutboundRelay< as ServiceData>::Message>, + message_cache: MessageCache, +} + +impl MessageCache { + /// The number of views a node will cache messages for, from current_view to current_view + VIEW_SIZE_LIMIT. + /// Messages for views outside [current_view, current_view + VIEW_SIZE_LIMIT] will be discarded. + const VIEW_SIZE_LIMIT: View = View::new(5); + + fn new() -> Self { + let cache = (0..Self::VIEW_SIZE_LIMIT.into()) + .map(|v| (v.into(), Default::default())) + .collect::>(); + Self { + cache: Arc::new(Mutex::new(cache)), + } + } + + // treat view as the current view + fn advance(mut cache: impl DerefMut>, view: View) { + if cache.remove(&(view - 1.into())).is_some() { + cache.insert(view + Self::VIEW_SIZE_LIMIT - 1.into(), Messages::default()); + } + } + + // This will also advance the cache to use view - 1 as the current view + fn get_proposals(&self, view: View) -> Option> { + let mut cache = self.cache.lock().unwrap(); + let res = cache + .get_mut(&view) + .map(|m| m.proposal_chunks.recv_or_restore()); + Self::advance(cache, view - 1.into()); + res + } + + // This will also advance the cache to use view as the current view + fn get_timeout_qcs(&self, view: View) -> Option> { + let mut cache = self.cache.lock().unwrap(); + let res = cache + .get_mut(&view) + .map(|m| m.timeout_qcs.recv_or_restore()); + Self::advance(cache, view); + res + } + + fn get_votes( + &self, + view: View, + committee_id: CommitteeId, + proposal_id: BlockId, + ) -> Option> { + self.cache.lock().unwrap().get_mut(&view).map(|m| { + m.votes + .entry(committee_id) + .or_default() + .entry(proposal_id) + .or_default() + .recv_or_restore() + }) + } + + fn get_new_views(&self, view: View, committee_id: CommitteeId) -> Option> { + self.cache.lock().unwrap().get_mut(&view).map(|m| { + m.new_views + .entry(committee_id) + .or_default() + .recv_or_restore() + }) + } + + fn get_timeouts(&self, view: View, committee_id: CommitteeId) -> Option> { + self.cache.lock().unwrap().get_mut(&view).map(|m| { + m.timeouts + .entry(committee_id) + .or_default() + .recv_or_restore() + }) + } +} + +/// A message published via libp2p gossipsub. +/// If `to` is [`None`], it means that the `message` is propagated to all committees. +#[derive(Serialize, Deserialize)] +struct GossipsubMessage { + to: Option, + message: NetworkMessage, +} + +impl GossipsubMessage { + pub fn as_bytes(&self) -> Box<[u8]> { + wire::serialize(self).unwrap().into_boxed_slice() + } +} + +impl Libp2pAdapter { + async fn broadcast(&self, message: GossipsubMessage, topic: &str) { + if let Err((e, message)) = self + .network_relay + .send(NetworkMsg::Process(Command::Broadcast { + message: message.as_bytes().to_vec(), + topic: topic.into(), + })) + .await + { + tracing::error!("error broadcasting {message:?}: {e}"); + }; + } + + async fn subscribe(relay: &Relay, topic: &str) { + if let Err((e, _)) = relay + .send(NetworkMsg::Process(Command::Subscribe(topic.into()))) + .await + { + tracing::error!("error subscribing to {topic}: {e}"); + }; + } +} + +#[async_trait::async_trait] +impl NetworkAdapter for Libp2pAdapter { + type Backend = Libp2p; + + async fn new(network_relay: Relay) -> Self { + let message_cache = MessageCache::new(); + let cache = message_cache.clone(); + let relay = network_relay.clone(); + // TODO: maybe we need the runtime handle here? + tokio::spawn(async move { + Self::subscribe(&relay, TOPIC).await; + let (sender, receiver) = tokio::sync::oneshot::channel(); + if let Err((e, _)) = relay + .send(NetworkMsg::Subscribe { + kind: EventKind::Message, + sender, + }) + .await + { + tracing::error!("error subscribing to incoming messages: {e}"); + } + + let mut incoming_messages = receiver.await.unwrap(); + loop { + match incoming_messages.recv().await { + Ok(Event::Message(message)) => { + match nomos_core::wire::deserialize(&message.data) { + Ok(GossipsubMessage { to, message }) => match message { + NetworkMessage::ProposalChunk(msg) => { + tracing::debug!("received proposal chunk"); + let mut cache = cache.cache.lock().unwrap(); + let view = msg.view; + if let Some(messages) = cache.get_mut(&view) { + messages.proposal_chunks.try_send(msg); + } + } + NetworkMessage::Vote(msg) => { + tracing::debug!("received vote"); + let mut cache = cache.cache.lock().unwrap(); + let view = msg.vote.view; + if let Some(messages) = cache.get_mut(&view) { + messages + .votes + .entry(to.unwrap()) + .or_default() + .entry(msg.vote.block) + .or_default() + .try_send(msg); + } + } + NetworkMessage::Timeout(msg) => { + tracing::debug!("received timeout"); + let mut cache = cache.cache.lock().unwrap(); + let view = msg.vote.view; + if let Some(messages) = cache.get_mut(&view) { + messages + .timeouts + .entry(to.unwrap()) + .or_default() + .try_send(msg); + } + } + NetworkMessage::TimeoutQc(msg) => { + tracing::debug!("received timeout_qc"); + let mut cache = cache.cache.lock().unwrap(); + let view = msg.qc.view(); + if let Some(messages) = cache.get_mut(&view) { + messages.timeout_qcs.try_send(msg); + } + } + NetworkMessage::NewView(msg) => { + tracing::debug!("received new_view"); + let mut cache = cache.cache.lock().unwrap(); + let view = msg.vote.view; + if let Some(messages) = cache.get_mut(&view) { + messages + .new_views + .entry(to.unwrap()) + .or_default() + .try_send(msg); + } + } + }, + _ => tracing::debug!("unrecognized gossipsub message"), + } + } + Err(RecvError::Lagged(n)) => { + tracing::error!("lagged messages: {n}") + } + Err(RecvError::Closed) => unreachable!(), + } + } + }); + Self { + network_relay, + message_cache, + } + } + + async fn proposal_chunks_stream(&self, view: View) -> BoxedStream { + self.message_cache + .get_proposals(view) + .map::, _>(|stream| Box::new(ReceiverStream::new(stream))) + .unwrap_or_else(|| Box::new(tokio_stream::empty())) + } + + async fn broadcast(&self, message: NetworkMessage) { + let message = GossipsubMessage { to: None, message }; + self.broadcast(message, TOPIC).await; + } + + async fn timeout_stream(&self, committee: &Committee, view: View) -> BoxedStream { + self.message_cache + .get_timeouts(view, committee.id::()) + .map::, _>(|stream| Box::new(ReceiverStream::new(stream))) + .unwrap_or_else(|| Box::new(tokio_stream::empty())) + } + + async fn timeout_qc_stream(&self, view: View) -> BoxedStream { + self.message_cache + .get_timeout_qcs(view) + .map::, _>(|stream| Box::new(ReceiverStream::new(stream))) + .unwrap_or_else(|| Box::new(tokio_stream::empty())) + } + + async fn votes_stream( + &self, + committee: &Committee, + view: View, + proposal_id: BlockId, + ) -> BoxedStream { + self.message_cache + .get_votes(view, committee.id::(), proposal_id) + .map::, _>(|stream| Box::new(ReceiverStream::new(stream))) + .unwrap_or_else(|| Box::new(tokio_stream::empty())) + } + + async fn new_view_stream(&self, committee: &Committee, view: View) -> BoxedStream { + self.message_cache + .get_new_views(view, committee.id::()) + .map::, _>(|stream| Box::new(ReceiverStream::new(stream))) + .unwrap_or_else(|| Box::new(tokio_stream::empty())) + } + + async fn send(&self, message: NetworkMessage, committee: &Committee) { + let message = GossipsubMessage { + to: Some(committee.id::()), + message, + }; + self.broadcast(message, TOPIC).await; + } +} diff --git a/nomos-services/consensus/src/network/adapters/mod.rs b/nomos-services/consensus/src/network/adapters/mod.rs index 47d918cd..98f3f37a 100644 --- a/nomos-services/consensus/src/network/adapters/mod.rs +++ b/nomos-services/consensus/src/network/adapters/mod.rs @@ -1,5 +1,5 @@ -// #[cfg(feature = "libp2p")] -// pub mod libp2p; +#[cfg(feature = "libp2p")] +pub mod libp2p; #[cfg(feature = "mock")] pub mod mock; #[cfg(feature = "waku")]