From 4a3d677ea9f780a62b921ba51a3d0a870bbee110 Mon Sep 17 00:00:00 2001 From: Giacomo Pasini Date: Mon, 7 Aug 2023 06:00:41 +0200 Subject: [PATCH] Small fixes for libp2p network backend (#280) * Generate network events for self messages Waku does that and it's kind of convenient not to handle ourselves in a different way from the rest. * Use bigger buffer + fmt When receiving messages coming from libp2p IWANT requests, it's common to receive a burst of packest which can cause subscribers to lag. To account for that, let's increase the buffer in the broadcast channel. * Check if topic is being subscribed before self-notification (#292) * fmt --------- Co-authored-by: Youngjoon Lee --- nomos-libp2p/src/lib.rs | 18 ++++++++++++++- .../consensus/src/network/adapters/libp2p.rs | 2 +- nomos-services/network/src/backends/libp2p.rs | 23 ++++++++++++++----- 3 files changed, 35 insertions(+), 8 deletions(-) diff --git a/nomos-libp2p/src/lib.rs b/nomos-libp2p/src/lib.rs index 30956635..711c6a7b 100644 --- a/nomos-libp2p/src/lib.rs +++ b/nomos-libp2p/src/lib.rs @@ -8,7 +8,8 @@ pub use libp2p; use blake2::digest::{consts::U32, Digest}; use blake2::Blake2b; -use libp2p::gossipsub::{Message, MessageId}; +use libp2p::gossipsub::{Message, MessageId, TopicHash}; + pub use libp2p::{ core::upgrade, gossipsub::{self, PublishError, SubscriptionError}, @@ -152,6 +153,21 @@ impl Swarm { pub fn swarm(&self) -> &libp2p::Swarm { &self.swarm } + + pub fn is_subscribed(&mut self, topic: &str) -> bool { + let topic_hash = Self::topic_hash(topic); + + //TODO: consider O(1) searching by having our own data structure + self.swarm + .behaviour_mut() + .gossipsub + .topics() + .any(|h| h == &topic_hash) + } + + pub fn topic_hash(topic: &str) -> TopicHash { + gossipsub::IdentTopic::new(topic).hash() + } } impl futures::Stream for Swarm { diff --git a/nomos-services/consensus/src/network/adapters/libp2p.rs b/nomos-services/consensus/src/network/adapters/libp2p.rs index 853cf4ed..03af2670 100644 --- a/nomos-services/consensus/src/network/adapters/libp2p.rs +++ b/nomos-services/consensus/src/network/adapters/libp2p.rs @@ -203,7 +203,7 @@ impl Libp2pAdapter { if let Err((e, message)) = self .network_relay .send(NetworkMsg::Process(Command::Broadcast { - message: message.as_bytes().to_vec(), + message: message.as_bytes(), topic: topic.into(), })) .await diff --git a/nomos-services/network/src/backends/libp2p.rs b/nomos-services/network/src/backends/libp2p.rs index 790fddb7..f0d224b1 100644 --- a/nomos-services/network/src/backends/libp2p.rs +++ b/nomos-services/network/src/backends/libp2p.rs @@ -1,3 +1,6 @@ +// std +use std::error::Error; +// internal use super::NetworkBackend; use nomos_libp2p::{ libp2p::{ @@ -6,9 +9,10 @@ use nomos_libp2p::{ }, BehaviourEvent, Swarm, SwarmConfig, SwarmEvent, }; +// crates use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::NoState}; use serde::{Deserialize, Serialize}; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::{broadcast, mpsc, oneshot}; macro_rules! log_error { ($e:expr) => { @@ -35,15 +39,13 @@ pub struct Libp2pInfo { pub enum EventKind { Message, } -use std::error::Error; -use tokio::sync::oneshot; -const BUFFER_SIZE: usize = 16; +const BUFFER_SIZE: usize = 64; #[derive(Debug)] pub enum Command { Connect(PeerId, Multiaddr), - Broadcast { topic: Topic, message: Vec }, + Broadcast { topic: Topic, message: Box<[u8]> }, Subscribe(Topic), Unsubscribe(Topic), Info { reply: oneshot::Sender }, @@ -121,7 +123,7 @@ impl NetworkBackend for Libp2p { log_error!(swarm.connect(peer_id, peer_addr)); } Command::Broadcast { topic, message } => { - match swarm.broadcast(&topic, message) { + match swarm.broadcast(&topic, message.to_vec()) { Ok(id) => { tracing::debug!("broadcasted message with id: {id} tp topic: {topic}"); } @@ -129,6 +131,15 @@ impl NetworkBackend for Libp2p { tracing::error!("failed to broadcast message to topic: {topic} {e:?}"); } } + + if swarm.is_subscribed(&topic) { + log_error!(events_tx.send(Event::Message(Message { + source: None, + data: message.into(), + sequence_number: None, + topic: Swarm::topic_hash(&topic), + }))); + } } Command::Subscribe(topic) => { tracing::debug!("subscribing to topic: {topic}");