Add `send` method to mempool network adapter (#439)

* Add `send` method to mempool network adapter

Centralize responsabilities for mempool-network interface in the
adapter trait.

* Update nomos-services/mempool/src/lib.rs

Co-authored-by: Youngjoon Lee <taxihighway@gmail.com>

---------

Co-authored-by: Youngjoon Lee <taxihighway@gmail.com>
This commit is contained in:
Giacomo Pasini 2023-09-27 11:58:42 +02:00 committed by GitHub
parent dda4a1365c
commit 36b3ccc043
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 74 additions and 34 deletions

View File

@ -3,11 +3,9 @@
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot; use tokio::sync::oneshot;
// internal // internal
use nomos_core::wire;
use nomos_http::http::HttpResponse; use nomos_http::http::HttpResponse;
use nomos_network::backends::libp2p::{Command, Libp2p}; use nomos_network::backends::libp2p::{Command, Libp2p};
use nomos_network::NetworkMsg; use nomos_network::NetworkMsg;
use nomos_node::Tx;
use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::relay::OutboundRelay;
pub(super) async fn handle_libp2p_info_req( pub(super) async fn handle_libp2p_info_req(
@ -26,19 +24,3 @@ pub(super) async fn handle_libp2p_info_req(
Ok(()) Ok(())
} }
pub(super) async fn libp2p_send_transaction(
network_relay: OutboundRelay<NetworkMsg<Libp2p>>,
tx: Tx,
) -> Result<(), overwatch_rs::DynError> {
let payload = wire::serialize(&tx).expect("Tx serialization failed");
network_relay
.send(NetworkMsg::Process(Command::Broadcast {
topic: nomos_mempool::network::adapters::libp2p::CARNOT_TX_TOPIC.to_string(),
message: payload.into_boxed_slice(),
}))
.await
.map_err(|(e, _)| e)?;
Ok(())
}

View File

@ -89,9 +89,7 @@ where
res_tx, payload, .. res_tx, payload, ..
}) = http_request_channel.recv().await }) = http_request_channel.recv().await
{ {
if let Err(e) = if let Err(e) = handle_mempool_add_tx_req(&mempool_channel, res_tx, payload).await {
handle_mempool_add_tx_req(&handle, &mempool_channel, res_tx, payload).await
{
error!(e); error!(e);
} }
} }
@ -142,7 +140,6 @@ async fn handle_mempool_metrics_req(
} }
pub(super) async fn handle_mempool_add_tx_req( pub(super) async fn handle_mempool_add_tx_req(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
mempool_channel: &OutboundRelay<MempoolMsg<Tx, <Tx as Transaction>::Hash>>, mempool_channel: &OutboundRelay<MempoolMsg<Tx, <Tx as Transaction>::Hash>>,
res_tx: Sender<HttpResponse>, res_tx: Sender<HttpResponse>,
payload: Option<Bytes>, payload: Option<Bytes>,
@ -163,12 +160,7 @@ pub(super) async fn handle_mempool_add_tx_req(
.map_err(|(e, _)| e)?; .map_err(|(e, _)| e)?;
match receiver.await { match receiver.await {
Ok(Ok(())) => { Ok(Ok(())) => Ok(res_tx.send(Ok(b"".to_vec().into())).await?),
// broadcast transaction to peers
let network_relay = handle.relay::<NetworkService<Libp2p>>().connect().await?;
libp2p_send_transaction(network_relay, tx).await?;
Ok(res_tx.send(Ok(b"".to_vec().into())).await?)
}
Ok(Err(())) => Ok(res_tx Ok(Err(())) => Ok(res_tx
.send(Err(( .send(Err((
StatusCode::CONFLICT, StatusCode::CONFLICT,

View File

@ -20,6 +20,7 @@ tracing = "0.1"
tokio = { version = "1", features = ["sync", "macros"] } tokio = { version = "1", features = ["sync", "macros"] }
tokio-stream = "0.1" tokio-stream = "0.1"
waku-bindings = { version = "0.1.1", optional = true} waku-bindings = { version = "0.1.1", optional = true}
chrono = "0.4"
[dev-dependencies] [dev-dependencies]
nomos-log = { path = "../log" } nomos-log = { path = "../log" }

View File

@ -170,7 +170,7 @@ where
let adapter = N::new( let adapter = N::new(
service_state.settings_reader.get_updated_settings().network, service_state.settings_reader.get_updated_settings().network,
network_relay, network_relay.clone(),
); );
let adapter = adapter.await; let adapter = adapter.await;
@ -181,8 +181,16 @@ where
Some(msg) = service_state.inbound_relay.recv() => { Some(msg) = service_state.inbound_relay.recv() => {
match msg { match msg {
MempoolMsg::Add { item, key, reply_channel } => { MempoolMsg::Add { item, key, reply_channel } => {
match pool.add_item(key, item) { match pool.add_item(key, item.clone()) {
Ok(_id) => { Ok(_id) => {
// Broadcast the item to the network
let net = network_relay.clone();
let settings = service_state.settings_reader.get_updated_settings().network;
// move sending to a new task so local operations can complete in the meantime
tokio::spawn(async move {
let adapter = N::new(settings, net).await;
adapter.send(item).await;
});
if let Err(e) = reply_channel.send(Ok(())) { if let Err(e) = reply_channel.send(Ok(())) {
tracing::debug!("Failed to send reply to AddTx: {:?}", e); tracing::debug!("Failed to send reply to AddTx: {:?}", e);
} }

View File

@ -11,8 +11,6 @@ use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData; use overwatch_rs::services::ServiceData;
pub const CARNOT_TX_TOPIC: &str = "CarnotTx";
pub struct Libp2pAdapter<Item, Key> { pub struct Libp2pAdapter<Item, Key> {
network_relay: OutboundRelay<<NetworkService<Libp2p> as ServiceData>::Message>, network_relay: OutboundRelay<<NetworkService<Libp2p> as ServiceData>::Message>,
settings: Settings<Key, Item>, settings: Settings<Key, Item>,
@ -35,7 +33,7 @@ where
) -> Self { ) -> Self {
network_relay network_relay
.send(NetworkMsg::Process(Command::Subscribe( .send(NetworkMsg::Process(Command::Subscribe(
CARNOT_TX_TOPIC.to_string(), settings.topic.clone(),
))) )))
.await .await
.expect("Network backend should be ready"); .expect("Network backend should be ready");
@ -73,6 +71,23 @@ where
}, },
))) )))
} }
async fn send(&self, item: Item) {
if let Ok(wire) = wire::serialize(&item) {
if let Err((e, _)) = self
.network_relay
.send(NetworkMsg::Process(Command::Broadcast {
topic: self.settings.topic.clone(),
message: wire.into(),
}))
.await
{
tracing::error!("failed to send item to topic: {e}");
}
} else {
tracing::error!("Failed to serialize item");
}
}
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]

View File

@ -93,4 +93,17 @@ impl NetworkAdapter for MockAdapter {
}, },
))) )))
} }
async fn send(&self, msg: Self::Item) {
if let Err((e, _)) = self
.network_relay
.send(NetworkMsg::Process(MockBackendMessage::Broadcast {
topic: MOCK_PUB_SUB_TOPIC.into(),
msg: msg.message().clone(),
}))
.await
{
tracing::error!("failed to send item to topic: {e}");
}
}
} }

View File

@ -12,7 +12,7 @@ use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData; use overwatch_rs::services::ServiceData;
use serde::Serialize; use serde::Serialize;
use waku_bindings::{Encoding, WakuContentTopic, WakuPubSubTopic}; use waku_bindings::{Encoding, WakuContentTopic, WakuMessage, WakuPubSubTopic};
pub const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic = pub const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic =
WakuPubSubTopic::new("CarnotSim", Encoding::Proto); WakuPubSubTopic::new("CarnotSim", Encoding::Proto);
@ -90,4 +90,31 @@ where
}, },
))) )))
} }
async fn send(&self, item: Self::Item) {
if let Ok(wire) = wire::serialize(&item) {
if let Err((e, _)) = self
.network_relay
.send(NetworkMsg::Process(WakuBackendMessage::Broadcast {
topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()),
message: WakuMessage::new(
wire,
WAKU_CARNOT_TX_CONTENT_TOPIC.clone(),
1,
chrono::Utc::now()
.timestamp_nanos_opt()
.expect("timestamp should be in valid range")
as usize,
[],
false,
),
}))
.await
{
tracing::error!("failed to send item to topic: {e}");
}
} else {
tracing::error!("Failed to serialize item");
}
}
} }

View File

@ -25,4 +25,6 @@ pub trait NetworkAdapter {
async fn transactions_stream( async fn transactions_stream(
&self, &self,
) -> Box<dyn Stream<Item = (Self::Key, Self::Item)> + Unpin + Send>; ) -> Box<dyn Stream<Item = (Self::Key, Self::Item)> + Unpin + Send>;
async fn send(&self, item: Self::Item);
} }