Pipe addtx endpoint for libp2p (#345)

* Pipe add tx for both backends

* Cleanup imports
This commit is contained in:
Daniel Sanchez 2023-08-31 09:13:35 +02:00 committed by GitHub
parent 28596377b7
commit 26a3910566
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 100 additions and 55 deletions

View File

@ -3,9 +3,11 @@
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
// internal
use nomos_core::wire;
use nomos_http::http::HttpResponse;
use nomos_network::backends::libp2p::{Command, Libp2p};
use nomos_network::NetworkMsg;
use nomos_node::Tx;
use overwatch_rs::services::relay::OutboundRelay;
#[cfg(feature = "libp2p")]
@ -25,3 +27,20 @@ pub(super) async fn handle_libp2p_info_req(
Ok(())
}
#[cfg(feature = "libp2p")]
pub(super) async fn libp2p_send_transaction(
network_relay: OutboundRelay<NetworkMsg<Libp2p>>,
tx: Tx,
) -> Result<(), overwatch_rs::DynError> {
let payload = wire::serialize(&tx).expect("Tx serialization failed");
network_relay
.send(NetworkMsg::Process(Command::Broadcast {
topic: nomos_mempool::network::adapters::libp2p::CARNOT_TX_TOPIC.to_string(),
message: payload.into_boxed_slice(),
}))
.await
.map_err(|(e, _)| e)?;
Ok(())
}

View File

@ -1,3 +1,5 @@
use bytes::Bytes;
use http::StatusCode;
// std
// crates
use nomos_consensus::{CarnotInfo, ConsensusMsg};
@ -33,6 +35,8 @@ use waku::*;
mod libp2p;
#[cfg(feature = "libp2p")]
use libp2p::*;
use nomos_mempool::network::NetworkAdapter;
use nomos_network::backends::NetworkBackend;
macro_rules! get_handler {
($handle:expr, $service:ty, $path:expr => $handler:tt) => {{
@ -83,13 +87,15 @@ pub fn network_info_bridge(
}))
}
#[cfg(feature = "waku")]
pub fn mempool_add_tx_bridge(
pub fn mempool_add_tx_bridge<
N: NetworkBackend,
A: NetworkAdapter<Backend = N, Tx = Tx> + Send + Sync + 'static,
>(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
let (mempool_channel, mut http_request_channel) =
build_http_bridge::<MempoolService<WakuAdapter<Tx>, MockPool<Tx>>, AxumBackend, _>(
build_http_bridge::<MempoolService<A, MockPool<Tx>>, AxumBackend, _>(
handle.clone(),
HttpMethod::POST,
"addtx",
@ -177,3 +183,56 @@ async fn handle_mempool_metrics_req(
Ok(())
}
pub(super) async fn handle_mempool_add_tx_req(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
mempool_channel: &OutboundRelay<MempoolMsg<Tx>>,
res_tx: Sender<HttpResponse>,
payload: Option<Bytes>,
) -> Result<(), overwatch_rs::DynError> {
if let Some(data) = payload
.as_ref()
.and_then(|b| String::from_utf8(b.to_vec()).ok())
{
let tx = Tx(data);
let (sender, receiver) = oneshot::channel();
mempool_channel
.send(MempoolMsg::AddTx {
tx: tx.clone(),
reply_channel: sender,
})
.await
.map_err(|(e, _)| e)?;
match receiver.await {
Ok(Ok(())) => {
// broadcast transaction to peers
#[cfg(feature = "waku")]
{
let network_relay = handle.relay::<NetworkService<Waku>>().connect().await?;
waku_send_transaction(network_relay, tx).await?;
}
#[cfg(feature = "libp2p")]
{
let network_relay = handle.relay::<NetworkService<Libp2p>>().connect().await?;
libp2p_send_transaction(network_relay, tx).await?;
}
Ok(res_tx.send(Ok(b"".to_vec().into())).await?)
}
Ok(Err(())) => Ok(res_tx
.send(Err((
StatusCode::CONFLICT,
"error: unable to add tx".into(),
)))
.await?),
Err(err) => Ok(res_tx
.send(Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string())))
.await?),
}
} else {
Err(
format!("Invalid payload, {payload:?}. Empty or couldn't transform into a utf8 String")
.into(),
)
}
}

View File

@ -1,5 +1,4 @@
use bytes::Bytes;
use http::StatusCode;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
// internal
@ -9,58 +8,12 @@ use nomos_http::http::HttpResponse;
use nomos_mempool::network::adapters::waku::{
WAKU_CARNOT_PUB_SUB_TOPIC, WAKU_CARNOT_TX_CONTENT_TOPIC,
};
use nomos_mempool::MempoolMsg;
use nomos_network::backends::waku::{Waku, WakuBackendMessage};
use nomos_network::{NetworkMsg, NetworkService};
use nomos_network::NetworkMsg;
use nomos_node::Tx;
use overwatch_rs::services::relay::OutboundRelay;
use waku_bindings::{Multiaddr, WakuMessage};
pub(super) async fn handle_mempool_add_tx_req(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
mempool_channel: &OutboundRelay<MempoolMsg<Tx>>,
res_tx: Sender<HttpResponse>,
payload: Option<Bytes>,
) -> Result<(), overwatch_rs::DynError> {
if let Some(data) = payload
.as_ref()
.and_then(|b| String::from_utf8(b.to_vec()).ok())
{
let tx = Tx(data);
let (sender, receiver) = oneshot::channel();
mempool_channel
.send(MempoolMsg::AddTx {
tx: tx.clone(),
reply_channel: sender,
})
.await
.map_err(|(e, _)| e)?;
match receiver.await {
Ok(Ok(())) => {
// broadcast transaction to peers
let network_relay = handle.relay::<NetworkService<Waku>>().connect().await?;
send_transaction(network_relay, tx).await?;
Ok(res_tx.send(Ok(b"".to_vec().into())).await?)
}
Ok(Err(())) => Ok(res_tx
.send(Err((
StatusCode::CONFLICT,
"error: unable to add tx".into(),
)))
.await?),
Err(err) => Ok(res_tx
.send(Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string())))
.await?),
}
} else {
Err(
format!("Invalid payload, {payload:?}. Empty or couldn't transform into a utf8 String")
.into(),
)
}
}
pub(super) async fn handle_waku_info_req(
channel: &OutboundRelay<NetworkMsg<Waku>>,
res_tx: Sender<HttpResponse>,
@ -106,7 +59,7 @@ pub(super) async fn handle_add_conn_req(
}
}
pub(super) async fn send_transaction(
pub(super) async fn waku_send_transaction(
network_relay: OutboundRelay<NetworkMsg<Waku>>,
tx: Tx,
) -> Result<(), overwatch_rs::DynError> {

View File

@ -1,4 +1,4 @@
use nomos_node::{Config, Nomos, NomosServiceSettings};
use nomos_node::{Config, Nomos, NomosServiceSettings, Tx};
mod bridges;
@ -6,6 +6,14 @@ use clap::Parser;
use color_eyre::eyre::{eyre, Result};
use nomos_http::bridge::{HttpBridge, HttpBridgeSettings};
#[cfg(feature = "libp2p")]
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter;
#[cfg(feature = "waku")]
use nomos_mempool::network::adapters::waku::WakuAdapter;
#[cfg(feature = "libp2p")]
use nomos_network::backends::libp2p::Libp2p;
#[cfg(feature = "waku")]
use nomos_network::backends::waku::Waku;
use overwatch_rs::overwatch::*;
use std::sync::Arc;
@ -24,7 +32,13 @@ fn main() -> Result<()> {
Arc::new(Box::new(bridges::mempool_metrics_bridge)),
Arc::new(Box::new(bridges::network_info_bridge)),
#[cfg(feature = "waku")]
Arc::new(Box::new(bridges::mempool_add_tx_bridge)),
Arc::new(Box::new(
bridges::mempool_add_tx_bridge::<Waku, WakuAdapter<Tx>>,
)),
#[cfg(feature = "libp2p")]
Arc::new(Box::new(
bridges::mempool_add_tx_bridge::<Libp2p, Libp2pAdapter<Tx>>,
)),
#[cfg(feature = "waku")]
Arc::new(Box::new(bridges::waku_add_conn_bridge)),
];

View File

@ -16,7 +16,7 @@ use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
const CARNOT_TX_TOPIC: &str = "CarnotTx";
pub const CARNOT_TX_TOPIC: &str = "CarnotTx";
pub struct Libp2pAdapter<Tx> {
network_relay: OutboundRelay<<NetworkService<Libp2p> as ServiceData>::Message>,