Small fixes for libp2p network backend (#280)

* Generate network events for self messages

Waku does that and it's kind of convenient not to handle ourselves
in a different way from the rest.

* Use bigger buffer + fmt

When receiving messages coming from libp2p IWANT requests, it's
common to receive a burst of packest which can cause subscribers
to lag. To account for that, let's increase the buffer in the
broadcast channel.

* Check if topic is being subscribed before self-notification (#292)

* fmt

---------

Co-authored-by: Youngjoon Lee <taxihighway@gmail.com>
This commit is contained in:
Giacomo Pasini 2023-08-07 06:00:41 +02:00 committed by GitHub
parent 30bf101576
commit 4a3d677ea9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 35 additions and 8 deletions

View File

@ -8,7 +8,8 @@ pub use libp2p;
use blake2::digest::{consts::U32, Digest}; use blake2::digest::{consts::U32, Digest};
use blake2::Blake2b; use blake2::Blake2b;
use libp2p::gossipsub::{Message, MessageId}; use libp2p::gossipsub::{Message, MessageId, TopicHash};
pub use libp2p::{ pub use libp2p::{
core::upgrade, core::upgrade,
gossipsub::{self, PublishError, SubscriptionError}, gossipsub::{self, PublishError, SubscriptionError},
@ -152,6 +153,21 @@ impl Swarm {
pub fn swarm(&self) -> &libp2p::Swarm<Behaviour> { pub fn swarm(&self) -> &libp2p::Swarm<Behaviour> {
&self.swarm &self.swarm
} }
pub fn is_subscribed(&mut self, topic: &str) -> bool {
let topic_hash = Self::topic_hash(topic);
//TODO: consider O(1) searching by having our own data structure
self.swarm
.behaviour_mut()
.gossipsub
.topics()
.any(|h| h == &topic_hash)
}
pub fn topic_hash(topic: &str) -> TopicHash {
gossipsub::IdentTopic::new(topic).hash()
}
} }
impl futures::Stream for Swarm { impl futures::Stream for Swarm {

View File

@ -203,7 +203,7 @@ impl Libp2pAdapter {
if let Err((e, message)) = self if let Err((e, message)) = self
.network_relay .network_relay
.send(NetworkMsg::Process(Command::Broadcast { .send(NetworkMsg::Process(Command::Broadcast {
message: message.as_bytes().to_vec(), message: message.as_bytes(),
topic: topic.into(), topic: topic.into(),
})) }))
.await .await

View File

@ -1,3 +1,6 @@
// std
use std::error::Error;
// internal
use super::NetworkBackend; use super::NetworkBackend;
use nomos_libp2p::{ use nomos_libp2p::{
libp2p::{ libp2p::{
@ -6,9 +9,10 @@ use nomos_libp2p::{
}, },
BehaviourEvent, Swarm, SwarmConfig, SwarmEvent, BehaviourEvent, Swarm, SwarmConfig, SwarmEvent,
}; };
// crates
use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::NoState}; use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::NoState};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, mpsc}; use tokio::sync::{broadcast, mpsc, oneshot};
macro_rules! log_error { macro_rules! log_error {
($e:expr) => { ($e:expr) => {
@ -35,15 +39,13 @@ pub struct Libp2pInfo {
pub enum EventKind { pub enum EventKind {
Message, Message,
} }
use std::error::Error;
use tokio::sync::oneshot;
const BUFFER_SIZE: usize = 16; const BUFFER_SIZE: usize = 64;
#[derive(Debug)] #[derive(Debug)]
pub enum Command { pub enum Command {
Connect(PeerId, Multiaddr), Connect(PeerId, Multiaddr),
Broadcast { topic: Topic, message: Vec<u8> }, Broadcast { topic: Topic, message: Box<[u8]> },
Subscribe(Topic), Subscribe(Topic),
Unsubscribe(Topic), Unsubscribe(Topic),
Info { reply: oneshot::Sender<Libp2pInfo> }, Info { reply: oneshot::Sender<Libp2pInfo> },
@ -121,7 +123,7 @@ impl NetworkBackend for Libp2p {
log_error!(swarm.connect(peer_id, peer_addr)); log_error!(swarm.connect(peer_id, peer_addr));
} }
Command::Broadcast { topic, message } => { Command::Broadcast { topic, message } => {
match swarm.broadcast(&topic, message) { match swarm.broadcast(&topic, message.to_vec()) {
Ok(id) => { Ok(id) => {
tracing::debug!("broadcasted message with id: {id} tp topic: {topic}"); tracing::debug!("broadcasted message with id: {id} tp topic: {topic}");
} }
@ -129,6 +131,15 @@ impl NetworkBackend for Libp2p {
tracing::error!("failed to broadcast message to topic: {topic} {e:?}"); tracing::error!("failed to broadcast message to topic: {topic} {e:?}");
} }
} }
if swarm.is_subscribed(&topic) {
log_error!(events_tx.send(Event::Message(Message {
source: None,
data: message.into(),
sequence_number: None,
topic: Swarm::topic_hash(&topic),
})));
}
} }
Command::Subscribe(topic) => { Command::Subscribe(topic) => {
tracing::debug!("subscribing to topic: {topic}"); tracing::debug!("subscribing to topic: {topic}");