diff --git a/nodes/nomos-node/src/bridges/libp2p.rs b/nodes/nomos-node/src/bridges/libp2p.rs index f52dd107..9217c8e3 100644 --- a/nodes/nomos-node/src/bridges/libp2p.rs +++ b/nodes/nomos-node/src/bridges/libp2p.rs @@ -3,9 +3,11 @@ use tokio::sync::mpsc::Sender; use tokio::sync::oneshot; // internal +use nomos_core::wire; use nomos_http::http::HttpResponse; use nomos_network::backends::libp2p::{Command, Libp2p}; use nomos_network::NetworkMsg; +use nomos_node::Tx; use overwatch_rs::services::relay::OutboundRelay; #[cfg(feature = "libp2p")] @@ -25,3 +27,20 @@ pub(super) async fn handle_libp2p_info_req( Ok(()) } + +#[cfg(feature = "libp2p")] +pub(super) async fn libp2p_send_transaction( + network_relay: OutboundRelay>, + tx: Tx, +) -> Result<(), overwatch_rs::DynError> { + let payload = wire::serialize(&tx).expect("Tx serialization failed"); + network_relay + .send(NetworkMsg::Process(Command::Broadcast { + topic: nomos_mempool::network::adapters::libp2p::CARNOT_TX_TOPIC.to_string(), + message: payload.into_boxed_slice(), + })) + .await + .map_err(|(e, _)| e)?; + + Ok(()) +} diff --git a/nodes/nomos-node/src/bridges/mod.rs b/nodes/nomos-node/src/bridges/mod.rs index 971616fd..074f6742 100644 --- a/nodes/nomos-node/src/bridges/mod.rs +++ b/nodes/nomos-node/src/bridges/mod.rs @@ -1,3 +1,5 @@ +use bytes::Bytes; +use http::StatusCode; // std // crates use nomos_consensus::{CarnotInfo, ConsensusMsg}; @@ -33,6 +35,8 @@ use waku::*; mod libp2p; #[cfg(feature = "libp2p")] use libp2p::*; +use nomos_mempool::network::NetworkAdapter; +use nomos_network::backends::NetworkBackend; macro_rules! get_handler { ($handle:expr, $service:ty, $path:expr => $handler:tt) => {{ @@ -83,13 +87,15 @@ pub fn network_info_bridge( })) } -#[cfg(feature = "waku")] -pub fn mempool_add_tx_bridge( +pub fn mempool_add_tx_bridge< + N: NetworkBackend, + A: NetworkAdapter + Send + Sync + 'static, +>( handle: overwatch_rs::overwatch::handle::OverwatchHandle, ) -> HttpBridgeRunner { Box::new(Box::pin(async move { let (mempool_channel, mut http_request_channel) = - build_http_bridge::, MockPool>, AxumBackend, _>( + build_http_bridge::>, AxumBackend, _>( handle.clone(), HttpMethod::POST, "addtx", @@ -177,3 +183,56 @@ async fn handle_mempool_metrics_req( Ok(()) } + +pub(super) async fn handle_mempool_add_tx_req( + handle: &overwatch_rs::overwatch::handle::OverwatchHandle, + mempool_channel: &OutboundRelay>, + res_tx: Sender, + payload: Option, +) -> Result<(), overwatch_rs::DynError> { + if let Some(data) = payload + .as_ref() + .and_then(|b| String::from_utf8(b.to_vec()).ok()) + { + let tx = Tx(data); + let (sender, receiver) = oneshot::channel(); + mempool_channel + .send(MempoolMsg::AddTx { + tx: tx.clone(), + reply_channel: sender, + }) + .await + .map_err(|(e, _)| e)?; + + match receiver.await { + Ok(Ok(())) => { + // broadcast transaction to peers + #[cfg(feature = "waku")] + { + let network_relay = handle.relay::>().connect().await?; + waku_send_transaction(network_relay, tx).await?; + } + #[cfg(feature = "libp2p")] + { + let network_relay = handle.relay::>().connect().await?; + libp2p_send_transaction(network_relay, tx).await?; + } + Ok(res_tx.send(Ok(b"".to_vec().into())).await?) + } + Ok(Err(())) => Ok(res_tx + .send(Err(( + StatusCode::CONFLICT, + "error: unable to add tx".into(), + ))) + .await?), + Err(err) => Ok(res_tx + .send(Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))) + .await?), + } + } else { + Err( + format!("Invalid payload, {payload:?}. Empty or couldn't transform into a utf8 String") + .into(), + ) + } +} diff --git a/nodes/nomos-node/src/bridges/waku.rs b/nodes/nomos-node/src/bridges/waku.rs index 3c7cf557..af740d79 100644 --- a/nodes/nomos-node/src/bridges/waku.rs +++ b/nodes/nomos-node/src/bridges/waku.rs @@ -1,5 +1,4 @@ use bytes::Bytes; -use http::StatusCode; use tokio::sync::mpsc::Sender; use tokio::sync::oneshot; // internal @@ -9,58 +8,12 @@ use nomos_http::http::HttpResponse; use nomos_mempool::network::adapters::waku::{ WAKU_CARNOT_PUB_SUB_TOPIC, WAKU_CARNOT_TX_CONTENT_TOPIC, }; -use nomos_mempool::MempoolMsg; use nomos_network::backends::waku::{Waku, WakuBackendMessage}; -use nomos_network::{NetworkMsg, NetworkService}; +use nomos_network::NetworkMsg; use nomos_node::Tx; use overwatch_rs::services::relay::OutboundRelay; use waku_bindings::{Multiaddr, WakuMessage}; -pub(super) async fn handle_mempool_add_tx_req( - handle: &overwatch_rs::overwatch::handle::OverwatchHandle, - mempool_channel: &OutboundRelay>, - res_tx: Sender, - payload: Option, -) -> Result<(), overwatch_rs::DynError> { - if let Some(data) = payload - .as_ref() - .and_then(|b| String::from_utf8(b.to_vec()).ok()) - { - let tx = Tx(data); - let (sender, receiver) = oneshot::channel(); - mempool_channel - .send(MempoolMsg::AddTx { - tx: tx.clone(), - reply_channel: sender, - }) - .await - .map_err(|(e, _)| e)?; - - match receiver.await { - Ok(Ok(())) => { - // broadcast transaction to peers - let network_relay = handle.relay::>().connect().await?; - send_transaction(network_relay, tx).await?; - Ok(res_tx.send(Ok(b"".to_vec().into())).await?) - } - Ok(Err(())) => Ok(res_tx - .send(Err(( - StatusCode::CONFLICT, - "error: unable to add tx".into(), - ))) - .await?), - Err(err) => Ok(res_tx - .send(Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))) - .await?), - } - } else { - Err( - format!("Invalid payload, {payload:?}. Empty or couldn't transform into a utf8 String") - .into(), - ) - } -} - pub(super) async fn handle_waku_info_req( channel: &OutboundRelay>, res_tx: Sender, @@ -106,7 +59,7 @@ pub(super) async fn handle_add_conn_req( } } -pub(super) async fn send_transaction( +pub(super) async fn waku_send_transaction( network_relay: OutboundRelay>, tx: Tx, ) -> Result<(), overwatch_rs::DynError> { diff --git a/nodes/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs index 52377929..99642263 100644 --- a/nodes/nomos-node/src/main.rs +++ b/nodes/nomos-node/src/main.rs @@ -1,4 +1,4 @@ -use nomos_node::{Config, Nomos, NomosServiceSettings}; +use nomos_node::{Config, Nomos, NomosServiceSettings, Tx}; mod bridges; @@ -6,6 +6,14 @@ use clap::Parser; use color_eyre::eyre::{eyre, Result}; use nomos_http::bridge::{HttpBridge, HttpBridgeSettings}; +#[cfg(feature = "libp2p")] +use nomos_mempool::network::adapters::libp2p::Libp2pAdapter; +#[cfg(feature = "waku")] +use nomos_mempool::network::adapters::waku::WakuAdapter; +#[cfg(feature = "libp2p")] +use nomos_network::backends::libp2p::Libp2p; +#[cfg(feature = "waku")] +use nomos_network::backends::waku::Waku; use overwatch_rs::overwatch::*; use std::sync::Arc; @@ -24,7 +32,13 @@ fn main() -> Result<()> { Arc::new(Box::new(bridges::mempool_metrics_bridge)), Arc::new(Box::new(bridges::network_info_bridge)), #[cfg(feature = "waku")] - Arc::new(Box::new(bridges::mempool_add_tx_bridge)), + Arc::new(Box::new( + bridges::mempool_add_tx_bridge::>, + )), + #[cfg(feature = "libp2p")] + Arc::new(Box::new( + bridges::mempool_add_tx_bridge::>, + )), #[cfg(feature = "waku")] Arc::new(Box::new(bridges::waku_add_conn_bridge)), ]; diff --git a/nomos-services/mempool/src/network/adapters/libp2p.rs b/nomos-services/mempool/src/network/adapters/libp2p.rs index bab916c7..5d2a3ef8 100644 --- a/nomos-services/mempool/src/network/adapters/libp2p.rs +++ b/nomos-services/mempool/src/network/adapters/libp2p.rs @@ -16,7 +16,7 @@ use nomos_network::{NetworkMsg, NetworkService}; use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::ServiceData; -const CARNOT_TX_TOPIC: &str = "CarnotTx"; +pub const CARNOT_TX_TOPIC: &str = "CarnotTx"; pub struct Libp2pAdapter { network_relay: OutboundRelay< as ServiceData>::Message>,