From 36b3ccc0430204487c691f4ab3c757b4b65b7153 Mon Sep 17 00:00:00 2001 From: Giacomo Pasini Date: Wed, 27 Sep 2023 11:58:42 +0200 Subject: [PATCH] Add `send` method to mempool network adapter (#439) * Add `send` method to mempool network adapter Centralize responsabilities for mempool-network interface in the adapter trait. * Update nomos-services/mempool/src/lib.rs Co-authored-by: Youngjoon Lee --------- Co-authored-by: Youngjoon Lee --- nodes/nomos-node/src/bridges/libp2p.rs | 18 ------------ nodes/nomos-node/src/bridges/mod.rs | 12 ++------ nomos-services/mempool/Cargo.toml | 1 + nomos-services/mempool/src/lib.rs | 12 ++++++-- .../mempool/src/network/adapters/libp2p.rs | 21 ++++++++++++-- .../mempool/src/network/adapters/mock.rs | 13 +++++++++ .../mempool/src/network/adapters/waku.rs | 29 ++++++++++++++++++- nomos-services/mempool/src/network/mod.rs | 2 ++ 8 files changed, 74 insertions(+), 34 deletions(-) diff --git a/nodes/nomos-node/src/bridges/libp2p.rs b/nodes/nomos-node/src/bridges/libp2p.rs index 6aa1741c..ee7cb7e0 100644 --- a/nodes/nomos-node/src/bridges/libp2p.rs +++ b/nodes/nomos-node/src/bridges/libp2p.rs @@ -3,11 +3,9 @@ use tokio::sync::mpsc::Sender; use tokio::sync::oneshot; // internal -use nomos_core::wire; use nomos_http::http::HttpResponse; use nomos_network::backends::libp2p::{Command, Libp2p}; use nomos_network::NetworkMsg; -use nomos_node::Tx; use overwatch_rs::services::relay::OutboundRelay; pub(super) async fn handle_libp2p_info_req( @@ -26,19 +24,3 @@ pub(super) async fn handle_libp2p_info_req( Ok(()) } - -pub(super) async fn libp2p_send_transaction( - network_relay: OutboundRelay>, - tx: Tx, -) -> Result<(), overwatch_rs::DynError> { - let payload = wire::serialize(&tx).expect("Tx serialization failed"); - network_relay - .send(NetworkMsg::Process(Command::Broadcast { - topic: nomos_mempool::network::adapters::libp2p::CARNOT_TX_TOPIC.to_string(), - message: payload.into_boxed_slice(), - })) - .await - .map_err(|(e, _)| e)?; - - Ok(()) -} diff --git a/nodes/nomos-node/src/bridges/mod.rs b/nodes/nomos-node/src/bridges/mod.rs index b3299ba2..2dbc46b7 100644 --- a/nodes/nomos-node/src/bridges/mod.rs +++ b/nodes/nomos-node/src/bridges/mod.rs @@ -89,9 +89,7 @@ where res_tx, payload, .. }) = http_request_channel.recv().await { - if let Err(e) = - handle_mempool_add_tx_req(&handle, &mempool_channel, res_tx, payload).await - { + if let Err(e) = handle_mempool_add_tx_req(&mempool_channel, res_tx, payload).await { error!(e); } } @@ -142,7 +140,6 @@ async fn handle_mempool_metrics_req( } pub(super) async fn handle_mempool_add_tx_req( - handle: &overwatch_rs::overwatch::handle::OverwatchHandle, mempool_channel: &OutboundRelay::Hash>>, res_tx: Sender, payload: Option, @@ -163,12 +160,7 @@ pub(super) async fn handle_mempool_add_tx_req( .map_err(|(e, _)| e)?; match receiver.await { - Ok(Ok(())) => { - // broadcast transaction to peers - let network_relay = handle.relay::>().connect().await?; - libp2p_send_transaction(network_relay, tx).await?; - Ok(res_tx.send(Ok(b"".to_vec().into())).await?) - } + Ok(Ok(())) => Ok(res_tx.send(Ok(b"".to_vec().into())).await?), Ok(Err(())) => Ok(res_tx .send(Err(( StatusCode::CONFLICT, diff --git a/nomos-services/mempool/Cargo.toml b/nomos-services/mempool/Cargo.toml index 458b111d..7a734dca 100644 --- a/nomos-services/mempool/Cargo.toml +++ b/nomos-services/mempool/Cargo.toml @@ -20,6 +20,7 @@ tracing = "0.1" tokio = { version = "1", features = ["sync", "macros"] } tokio-stream = "0.1" waku-bindings = { version = "0.1.1", optional = true} +chrono = "0.4" [dev-dependencies] nomos-log = { path = "../log" } diff --git a/nomos-services/mempool/src/lib.rs b/nomos-services/mempool/src/lib.rs index 7fff021e..54b63c33 100644 --- a/nomos-services/mempool/src/lib.rs +++ b/nomos-services/mempool/src/lib.rs @@ -170,7 +170,7 @@ where let adapter = N::new( service_state.settings_reader.get_updated_settings().network, - network_relay, + network_relay.clone(), ); let adapter = adapter.await; @@ -181,8 +181,16 @@ where Some(msg) = service_state.inbound_relay.recv() => { match msg { MempoolMsg::Add { item, key, reply_channel } => { - match pool.add_item(key, item) { + match pool.add_item(key, item.clone()) { Ok(_id) => { + // Broadcast the item to the network + let net = network_relay.clone(); + let settings = service_state.settings_reader.get_updated_settings().network; + // move sending to a new task so local operations can complete in the meantime + tokio::spawn(async move { + let adapter = N::new(settings, net).await; + adapter.send(item).await; + }); if let Err(e) = reply_channel.send(Ok(())) { tracing::debug!("Failed to send reply to AddTx: {:?}", e); } diff --git a/nomos-services/mempool/src/network/adapters/libp2p.rs b/nomos-services/mempool/src/network/adapters/libp2p.rs index 756879e8..e6dd6bca 100644 --- a/nomos-services/mempool/src/network/adapters/libp2p.rs +++ b/nomos-services/mempool/src/network/adapters/libp2p.rs @@ -11,8 +11,6 @@ use nomos_network::{NetworkMsg, NetworkService}; use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::ServiceData; -pub const CARNOT_TX_TOPIC: &str = "CarnotTx"; - pub struct Libp2pAdapter { network_relay: OutboundRelay< as ServiceData>::Message>, settings: Settings, @@ -35,7 +33,7 @@ where ) -> Self { network_relay .send(NetworkMsg::Process(Command::Subscribe( - CARNOT_TX_TOPIC.to_string(), + settings.topic.clone(), ))) .await .expect("Network backend should be ready"); @@ -73,6 +71,23 @@ where }, ))) } + + async fn send(&self, item: Item) { + if let Ok(wire) = wire::serialize(&item) { + if let Err((e, _)) = self + .network_relay + .send(NetworkMsg::Process(Command::Broadcast { + topic: self.settings.topic.clone(), + message: wire.into(), + })) + .await + { + tracing::error!("failed to send item to topic: {e}"); + } + } else { + tracing::error!("Failed to serialize item"); + } + } } #[derive(Clone, Debug)] diff --git a/nomos-services/mempool/src/network/adapters/mock.rs b/nomos-services/mempool/src/network/adapters/mock.rs index 5bbc42ae..fd677256 100644 --- a/nomos-services/mempool/src/network/adapters/mock.rs +++ b/nomos-services/mempool/src/network/adapters/mock.rs @@ -93,4 +93,17 @@ impl NetworkAdapter for MockAdapter { }, ))) } + + async fn send(&self, msg: Self::Item) { + if let Err((e, _)) = self + .network_relay + .send(NetworkMsg::Process(MockBackendMessage::Broadcast { + topic: MOCK_PUB_SUB_TOPIC.into(), + msg: msg.message().clone(), + })) + .await + { + tracing::error!("failed to send item to topic: {e}"); + } + } } diff --git a/nomos-services/mempool/src/network/adapters/waku.rs b/nomos-services/mempool/src/network/adapters/waku.rs index 49f87a3a..304bcfe5 100644 --- a/nomos-services/mempool/src/network/adapters/waku.rs +++ b/nomos-services/mempool/src/network/adapters/waku.rs @@ -12,7 +12,7 @@ use nomos_network::{NetworkMsg, NetworkService}; use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::ServiceData; use serde::Serialize; -use waku_bindings::{Encoding, WakuContentTopic, WakuPubSubTopic}; +use waku_bindings::{Encoding, WakuContentTopic, WakuMessage, WakuPubSubTopic}; pub const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic = WakuPubSubTopic::new("CarnotSim", Encoding::Proto); @@ -90,4 +90,31 @@ where }, ))) } + + async fn send(&self, item: Self::Item) { + if let Ok(wire) = wire::serialize(&item) { + if let Err((e, _)) = self + .network_relay + .send(NetworkMsg::Process(WakuBackendMessage::Broadcast { + topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()), + message: WakuMessage::new( + wire, + WAKU_CARNOT_TX_CONTENT_TOPIC.clone(), + 1, + chrono::Utc::now() + .timestamp_nanos_opt() + .expect("timestamp should be in valid range") + as usize, + [], + false, + ), + })) + .await + { + tracing::error!("failed to send item to topic: {e}"); + } + } else { + tracing::error!("Failed to serialize item"); + } + } } diff --git a/nomos-services/mempool/src/network/mod.rs b/nomos-services/mempool/src/network/mod.rs index da13d136..01a593cf 100644 --- a/nomos-services/mempool/src/network/mod.rs +++ b/nomos-services/mempool/src/network/mod.rs @@ -25,4 +25,6 @@ pub trait NetworkAdapter { async fn transactions_stream( &self, ) -> Box + Unpin + Send>; + + async fn send(&self, item: Self::Item); }