diff --git a/nomos-services/mempool/src/network/adapters/libp2p.rs b/nomos-services/mempool/src/network/adapters/libp2p.rs index ecbce390..bab916c7 100644 --- a/nomos-services/mempool/src/network/adapters/libp2p.rs +++ b/nomos-services/mempool/src/network/adapters/libp2p.rs @@ -3,15 +3,23 @@ use std::marker::PhantomData; // crates use futures::Stream; use serde::{de::DeserializeOwned, Serialize}; +use tokio_stream::wrappers::BroadcastStream; +use tokio_stream::StreamExt; +use tracing::log::error; + // internal +use crate::network::messages::TransactionMsg; use crate::network::NetworkAdapter; -use nomos_network::backends::libp2p::Libp2p; -use nomos_network::NetworkService; +use nomos_core::wire; +use nomos_network::backends::libp2p::{Command, Event, EventKind, Libp2p, Message, TopicHash}; +use nomos_network::{NetworkMsg, NetworkService}; use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::ServiceData; +const CARNOT_TX_TOPIC: &str = "CarnotTx"; + pub struct Libp2pAdapter { - _network_relay: OutboundRelay< as ServiceData>::Message>, + network_relay: OutboundRelay< as ServiceData>::Message>, _tx: PhantomData, } @@ -24,15 +32,43 @@ where type Tx = Tx; async fn new( - _network_relay: OutboundRelay< as ServiceData>::Message>, + network_relay: OutboundRelay< as ServiceData>::Message>, ) -> Self { + network_relay + .send(NetworkMsg::Process(Command::Subscribe( + CARNOT_TX_TOPIC.to_string(), + ))) + .await + .expect("Network backend should be ready"); Self { - _network_relay, + network_relay, _tx: PhantomData, } } async fn transactions_stream(&self) -> Box + Unpin + Send> { - // TODO - Box::new(futures::stream::empty()) + let topic_hash = TopicHash::from_raw(CARNOT_TX_TOPIC); + let (sender, receiver) = tokio::sync::oneshot::channel(); + self.network_relay + .send(NetworkMsg::Subscribe { + kind: EventKind::Message, + sender, + }) + .await + .expect("Network backend should be ready"); + let receiver = receiver.await.unwrap(); + Box::new(Box::pin(BroadcastStream::new(receiver).filter_map( + move |message| match message { + Ok(Event::Message(Message { data, topic, .. })) if topic == topic_hash => { + match wire::deserialize::>(&data) { + Ok(msg) => Some(msg.tx), + Err(e) => { + error!("Unrecognized Tx message: {e}"); + None + } + } + } + _ => None, + }, + ))) } } diff --git a/nomos-services/network/src/backends/libp2p.rs b/nomos-services/network/src/backends/libp2p.rs index f0d224b1..cfac1def 100644 --- a/nomos-services/network/src/backends/libp2p.rs +++ b/nomos-services/network/src/backends/libp2p.rs @@ -2,11 +2,9 @@ use std::error::Error; // internal use super::NetworkBackend; +pub use nomos_libp2p::libp2p::gossipsub::{Message, TopicHash}; use nomos_libp2p::{ - libp2p::{ - gossipsub::{self, Message}, - Multiaddr, PeerId, - }, + libp2p::{gossipsub, Multiaddr, PeerId}, BehaviourEvent, Swarm, SwarmConfig, SwarmEvent, }; // crates