From 320755d19d8e9d42ac78b18c0cad9ef73da15e97 Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Date: Wed, 8 Feb 2023 11:07:09 +0100 Subject: [PATCH] Detach tx broadcast from mempool (#69) * Remove send tx method from mempool network adapter * Add error reporting to add_tx operation in mempool Delegate broadcasting to external caller --- nodes/mockpool-node/Cargo.toml | 2 + nodes/mockpool-node/src/bridges.rs | 53 +++++++++++++++---- nomos-services/consensus/src/leadership.rs | 5 +- .../consensus/src/network/adapters/waku.rs | 2 +- nomos-services/mempool/Cargo.toml | 1 + .../mempool/src/backend/mockpool.rs | 6 +-- nomos-services/mempool/src/backend/mod.rs | 10 +++- nomos-services/mempool/src/lib.rs | 16 +++--- .../mempool/src/network/adapters/mock.rs | 20 +------ .../mempool/src/network/adapters/waku.rs | 25 ++------- nomos-services/mempool/src/network/mod.rs | 1 - nomos-services/mempool/tests/mock.rs | 2 +- 12 files changed, 78 insertions(+), 65 deletions(-) diff --git a/nodes/mockpool-node/Cargo.toml b/nodes/mockpool-node/Cargo.toml index e145c5af..143b5972 100644 --- a/nodes/mockpool-node/Cargo.toml +++ b/nodes/mockpool-node/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" blake2 = "0.10" bincode = "2.0.0-rc.2" clap = { version = "4", features = ["derive"] } +chrono = "0.4" futures = "0.3" overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" } @@ -25,3 +26,4 @@ serde_json = "1.0" serde_yaml = "0.9" color-eyre = "0.6.0" serde = "1" +waku-bindings = "0.1.0-beta3" diff --git a/nodes/mockpool-node/src/bridges.rs b/nodes/mockpool-node/src/bridges.rs index 215775d8..3c42eabb 100644 --- a/nodes/mockpool-node/src/bridges.rs +++ b/nodes/mockpool-node/src/bridges.rs @@ -1,21 +1,24 @@ // std - // crates +use tokio::sync::oneshot; +use tracing::debug; +// internal +use crate::tx::{Tx, TxId}; use futures::future::join_all; use multiaddr::Multiaddr; +use nomos_core::wire; use nomos_http::backends::axum::AxumBackend; use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner}; use nomos_http::http::{HttpMethod, HttpRequest}; use nomos_mempool::backend::mockpool::MockPool; -use nomos_mempool::network::adapters::waku::WakuAdapter; +use nomos_mempool::network::adapters::waku::{ + WakuAdapter, WAKU_CARNOT_PUB_SUB_TOPIC, WAKU_CARNOT_TX_CONTENT_TOPIC, +}; use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService}; use nomos_network::backends::waku::{Waku, WakuBackendMessage, WakuInfo}; use nomos_network::{NetworkMsg, NetworkService}; -use tokio::sync::oneshot; -use tracing::debug; - -// internal -use crate::tx::{Tx, TxId}; +use overwatch_rs::services::relay::OutboundRelay; +use waku_bindings::WakuMessage; pub fn mempool_metrics_bridge( handle: overwatch_rs::overwatch::handle::OverwatchHandle, @@ -73,11 +76,25 @@ pub fn mempool_add_tx_bridge( .as_ref() .and_then(|b| String::from_utf8(b.to_vec()).ok()) { + let tx = Tx(data); + let (sender, receiver) = oneshot::channel(); mempool_channel - .send(MempoolMsg::AddTx { tx: Tx(data) }) + .send(MempoolMsg::AddTx { + tx: tx.clone(), + reply_channel: sender, + }) .await .unwrap(); - res_tx.send(b"".to_vec().into()).await.unwrap(); + if let Ok(()) = receiver.await.unwrap() { + // broadcast transaction to peers + let network_relay = handle + .relay::>() + .connect() + .await + .unwrap(); + send_transaction(network_relay, tx).await; + res_tx.send(b"".to_vec().into()).await.unwrap(); + } } else { debug!( "Invalid payload, {:?}. Empty or couldn't transform into a utf8 String", @@ -161,3 +178,21 @@ pub fn waku_add_conn_bridge( Ok(()) })) } + +async fn send_transaction(network_relay: OutboundRelay>, tx: Tx) { + let payload = wire::serialize(&tx).expect("Tx serialization failed"); + if let Err((_, _e)) = network_relay + .send(NetworkMsg::Process(WakuBackendMessage::Broadcast { + message: WakuMessage::new( + payload, + WAKU_CARNOT_TX_CONTENT_TOPIC.clone(), + 1, + chrono::Utc::now().timestamp() as usize, + ), + topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()), + })) + .await + { + todo!("log error"); + }; +} diff --git a/nomos-services/consensus/src/leadership.rs b/nomos-services/consensus/src/leadership.rs index eb64a947..70620563 100644 --- a/nomos-services/consensus/src/leadership.rs +++ b/nomos-services/consensus/src/leadership.rs @@ -45,7 +45,10 @@ impl Leadership { let ancestor_hint = todo!("get the ancestor from the tip"); if view.is_leader(self.key.key) { let (tx, rx) = tokio::sync::oneshot::channel(); - self.mempool.send(MempoolMsg::View { ancestor_hint, tx }); + self.mempool.send(MempoolMsg::View { + ancestor_hint, + reply_channel: tx, + }); let _iter = rx.await; LeadershipResult::Leader { diff --git a/nomos-services/consensus/src/network/adapters/waku.rs b/nomos-services/consensus/src/network/adapters/waku.rs index fa15f997..e0cfb2e0 100644 --- a/nomos-services/consensus/src/network/adapters/waku.rs +++ b/nomos-services/consensus/src/network/adapters/waku.rs @@ -18,7 +18,7 @@ use nomos_network::{ use overwatch_rs::services::{relay::OutboundRelay, ServiceData}; use waku_bindings::{Encoding, WakuContentTopic, WakuMessage, WakuPubSubTopic}; -const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic = +pub const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic = WakuPubSubTopic::new("CarnotSim", Encoding::Proto); const APPLICATION_NAME: &str = "CarnotSim"; diff --git a/nomos-services/mempool/Cargo.toml b/nomos-services/mempool/Cargo.toml index af47d79b..4b2bb6cb 100644 --- a/nomos-services/mempool/Cargo.toml +++ b/nomos-services/mempool/Cargo.toml @@ -16,6 +16,7 @@ nomos-core = { path = "../../nomos-core" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } rand = { version = "0.8", optional = true } serde = { version = "1.0", features = ["derive"] } +thiserror = "1.0" tracing = "0.1" tokio = { version = "1", features = ["sync"] } tokio-stream = "0.1" diff --git a/nomos-services/mempool/src/backend/mockpool.rs b/nomos-services/mempool/src/backend/mockpool.rs index 9a074d1d..34822e4e 100644 --- a/nomos-services/mempool/src/backend/mockpool.rs +++ b/nomos-services/mempool/src/backend/mockpool.rs @@ -4,7 +4,7 @@ use std::collections::BTreeMap; use std::hash::Hash; // crates // internal -use crate::backend::MemPool; +use crate::backend::{MemPool, MempoolError}; use nomos_core::block::{BlockHeader, BlockId}; /// A mock mempool implementation that stores all transactions in memory in the order received. @@ -49,10 +49,10 @@ where Self::new() } - fn add_tx(&mut self, tx: Self::Tx) -> Result<(), overwatch_rs::DynError> { + fn add_tx(&mut self, tx: Self::Tx) -> Result<(), MempoolError> { let id = Id::from(&tx); if self.pending_txs.contains_key(&id) || self.in_block_txs_by_id.contains_key(&id) { - return Ok(()); + return Err(MempoolError::ExistingTx); } self.pending_txs.insert(id, tx); Ok(()) diff --git a/nomos-services/mempool/src/backend/mod.rs b/nomos-services/mempool/src/backend/mod.rs index 706a99de..8a376835 100644 --- a/nomos-services/mempool/src/backend/mod.rs +++ b/nomos-services/mempool/src/backend/mod.rs @@ -3,6 +3,14 @@ pub mod mockpool; use nomos_core::block::{BlockHeader, BlockId}; +#[derive(thiserror::Error, Debug)] +pub enum MempoolError { + #[error("Tx already in mempool")] + ExistingTx, + #[error(transparent)] + DynamicPoolError(#[from] overwatch_rs::DynError), +} + pub trait MemPool { type Settings: Clone; type Tx; @@ -12,7 +20,7 @@ pub trait MemPool { fn new(settings: Self::Settings) -> Self; /// Add a new transaction to the mempool, for example because we received it from the network - fn add_tx(&mut self, tx: Self::Tx) -> Result<(), overwatch_rs::DynError>; + fn add_tx(&mut self, tx: Self::Tx) -> Result<(), MempoolError>; /// Return a view over the transactions contained in the mempool. /// Implementations should provide *at least* all the transactions which have not been marked as diff --git a/nomos-services/mempool/src/lib.rs b/nomos-services/mempool/src/lib.rs index 58e47f75..ffb86025 100644 --- a/nomos-services/mempool/src/lib.rs +++ b/nomos-services/mempool/src/lib.rs @@ -7,7 +7,6 @@ use std::fmt::{Debug, Error, Formatter}; // crates use futures::StreamExt; use tokio::sync::oneshot::Sender; - // internal use crate::network::NetworkAdapter; use backend::MemPool; @@ -40,10 +39,11 @@ pub struct MempoolMetrics { pub enum MempoolMsg { AddTx { tx: Tx, + reply_channel: Sender>, }, View { ancestor_hint: BlockId, - tx: Sender + Send>>, + reply_channel: Sender + Send>>, }, Prune { ids: Vec, @@ -63,7 +63,7 @@ impl Debug for MempoolMsg { Self::View { ancestor_hint, .. } => { write!(f, "MempoolMsg::View {{ ancestor_hint: {ancestor_hint:?}}}") } - Self::AddTx { tx } => write!(f, "MempoolMsg::AddTx{{tx: {tx:?}}}"), + Self::AddTx { tx, .. } => write!(f, "MempoolMsg::AddTx{{tx: {tx:?}}}"), Self::Prune { ids } => write!(f, "MempoolMsg::Prune{{ids: {ids:?}}}"), Self::MarkInBlock { ids, block } => { write!( @@ -131,18 +131,20 @@ where tokio::select! { Some(msg) = service_state.inbound_relay.recv() => { match msg { - MempoolMsg::AddTx { tx } => { + MempoolMsg::AddTx { tx, reply_channel } => { match pool.add_tx(tx.clone()) { Ok(_id) => { - adapter.send_transaction(tx).await; + if let Err(e) = reply_channel.send(Ok(())) { + tracing::debug!("Failed to send reply to AddTx: {:?}", e); + } } Err(e) => { tracing::debug!("could not add tx to the pool due to: {}", e); } } } - MempoolMsg::View { ancestor_hint, tx } => { - tx.send(pool.view(ancestor_hint)).unwrap_or_else(|_| { + MempoolMsg::View { ancestor_hint, reply_channel } => { + reply_channel.send(pool.view(ancestor_hint)).unwrap_or_else(|_| { tracing::debug!("could not send back pool view") }); } diff --git a/nomos-services/mempool/src/network/adapters/mock.rs b/nomos-services/mempool/src/network/adapters/mock.rs index 6e67f3d7..3f29ef08 100644 --- a/nomos-services/mempool/src/network/adapters/mock.rs +++ b/nomos-services/mempool/src/network/adapters/mock.rs @@ -4,7 +4,7 @@ use std::marker::PhantomData; // crates use futures::{Stream, StreamExt}; use nomos_network::backends::mock::{ - EventKind, Mock, MockBackendMessage, MockContentTopic, MockMessage, NetworkEvent, + EventKind, Mock, MockBackendMessage, MockContentTopic, NetworkEvent, }; use nomos_network::{NetworkMsg, NetworkService}; use overwatch_rs::services::relay::OutboundRelay; @@ -95,22 +95,4 @@ where }, ))) } - - async fn send_transaction(&self, tx: Self::Tx) { - if let Err((e, _e)) = self - .network_relay - .send(NetworkMsg::Process(MockBackendMessage::Broadcast { - msg: MockMessage::new( - tx.into(), - MOCK_TX_CONTENT_TOPIC, - 1, - chrono::Utc::now().timestamp() as usize, - ), - topic: MOCK_PUB_SUB_TOPIC, - })) - .await - { - tracing::error!(err = ?e); - }; - } } diff --git a/nomos-services/mempool/src/network/adapters/waku.rs b/nomos-services/mempool/src/network/adapters/waku.rs index 7fb6cfb0..e072f5be 100644 --- a/nomos-services/mempool/src/network/adapters/waku.rs +++ b/nomos-services/mempool/src/network/adapters/waku.rs @@ -13,12 +13,12 @@ use nomos_network::{NetworkMsg, NetworkService}; use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::ServiceData; use serde::Serialize; -use waku_bindings::{Encoding, WakuContentTopic, WakuMessage, WakuPubSubTopic}; +use waku_bindings::{Encoding, WakuContentTopic, WakuPubSubTopic}; -const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic = +pub const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic = WakuPubSubTopic::new("CarnotSim", Encoding::Proto); -const WAKU_CARNOT_TX_CONTENT_TOPIC: WakuContentTopic = +pub const WAKU_CARNOT_TX_CONTENT_TOPIC: WakuContentTopic = WakuContentTopic::new("CarnotSim", 1, "CarnotTx", Encoding::Proto); pub struct WakuAdapter { @@ -83,23 +83,4 @@ where }, ))) } - - async fn send_transaction(&self, tx: Self::Tx) { - let payload = wire::serialize(&tx).expect("Tx serialization failed"); - if let Err((_, _e)) = self - .network_relay - .send(NetworkMsg::Process(WakuBackendMessage::Broadcast { - message: WakuMessage::new( - payload, - WAKU_CARNOT_TX_CONTENT_TOPIC.clone(), - 1, - chrono::Utc::now().timestamp() as usize, - ), - topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()), - })) - .await - { - todo!("log error"); - }; - } } diff --git a/nomos-services/mempool/src/network/mod.rs b/nomos-services/mempool/src/network/mod.rs index 5d8d34d0..8c56d973 100644 --- a/nomos-services/mempool/src/network/mod.rs +++ b/nomos-services/mempool/src/network/mod.rs @@ -19,5 +19,4 @@ pub trait NetworkAdapter { network_relay: OutboundRelay< as ServiceData>::Message>, ) -> Self; async fn transactions_stream(&self) -> Box + Unpin + Send>; - async fn send_transaction(&self, tx: Self::Tx); } diff --git a/nomos-services/mempool/tests/mock.rs b/nomos-services/mempool/tests/mock.rs index 533a7df5..2939e0ae 100644 --- a/nomos-services/mempool/tests/mock.rs +++ b/nomos-services/mempool/tests/mock.rs @@ -96,7 +96,7 @@ fn test_mockmempool() { mempool_outbound .send(MempoolMsg::View { ancestor_hint: BlockId, - tx: mtx, + reply_channel: mtx, }) .await .unwrap();