Plug mempool to libp2p backend (#344)

* Plug mempool to libp2p backend

* Fix except, log error and skip instead.
This commit is contained in:
Daniel Sanchez 2023-08-30 13:39:34 +02:00 committed by GitHub
parent e5ec2b7739
commit 28596377b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 11 deletions

View File

@ -3,15 +3,23 @@ use std::marker::PhantomData;
// crates // crates
use futures::Stream; use futures::Stream;
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;
use tracing::log::error;
// internal // internal
use crate::network::messages::TransactionMsg;
use crate::network::NetworkAdapter; use crate::network::NetworkAdapter;
use nomos_network::backends::libp2p::Libp2p; use nomos_core::wire;
use nomos_network::NetworkService; use nomos_network::backends::libp2p::{Command, Event, EventKind, Libp2p, Message, TopicHash};
use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData; use overwatch_rs::services::ServiceData;
const CARNOT_TX_TOPIC: &str = "CarnotTx";
pub struct Libp2pAdapter<Tx> { pub struct Libp2pAdapter<Tx> {
_network_relay: OutboundRelay<<NetworkService<Libp2p> as ServiceData>::Message>, network_relay: OutboundRelay<<NetworkService<Libp2p> as ServiceData>::Message>,
_tx: PhantomData<Tx>, _tx: PhantomData<Tx>,
} }
@ -24,15 +32,43 @@ where
type Tx = Tx; type Tx = Tx;
async fn new( async fn new(
_network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>, network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self { ) -> Self {
network_relay
.send(NetworkMsg::Process(Command::Subscribe(
CARNOT_TX_TOPIC.to_string(),
)))
.await
.expect("Network backend should be ready");
Self { Self {
_network_relay, network_relay,
_tx: PhantomData, _tx: PhantomData,
} }
} }
async fn transactions_stream(&self) -> Box<dyn Stream<Item = Self::Tx> + Unpin + Send> { async fn transactions_stream(&self) -> Box<dyn Stream<Item = Self::Tx> + Unpin + Send> {
// TODO let topic_hash = TopicHash::from_raw(CARNOT_TX_TOPIC);
Box::new(futures::stream::empty()) let (sender, receiver) = tokio::sync::oneshot::channel();
self.network_relay
.send(NetworkMsg::Subscribe {
kind: EventKind::Message,
sender,
})
.await
.expect("Network backend should be ready");
let receiver = receiver.await.unwrap();
Box::new(Box::pin(BroadcastStream::new(receiver).filter_map(
move |message| match message {
Ok(Event::Message(Message { data, topic, .. })) if topic == topic_hash => {
match wire::deserialize::<TransactionMsg<Tx>>(&data) {
Ok(msg) => Some(msg.tx),
Err(e) => {
error!("Unrecognized Tx message: {e}");
None
}
}
}
_ => None,
},
)))
} }
} }

View File

@ -2,11 +2,9 @@
use std::error::Error; use std::error::Error;
// internal // internal
use super::NetworkBackend; use super::NetworkBackend;
pub use nomos_libp2p::libp2p::gossipsub::{Message, TopicHash};
use nomos_libp2p::{ use nomos_libp2p::{
libp2p::{ libp2p::{gossipsub, Multiaddr, PeerId},
gossipsub::{self, Message},
Multiaddr, PeerId,
},
BehaviourEvent, Swarm, SwarmConfig, SwarmEvent, BehaviourEvent, Swarm, SwarmConfig, SwarmEvent,
}; };
// crates // crates