From 03973cd4221d996837e608403048d241abf268db Mon Sep 17 00:00:00 2001 From: Giacomo Pasini Date: Tue, 26 Sep 2023 11:14:44 +0200 Subject: [PATCH] Make mempool generic (#428) * Make mempool item generic Make the mempool generic with respect to the item and remove mentions of specific transaction formats/traits. This will allow us to reuse the same code for both coordination layer transactions and certificates, or in general, whatever items need to be included in a block. * Add mempool network adapter settings Allow for greater customization of the mempool network adapter by adding a settings field. * update node after mempool changes * fix waku mempool adapter * fmt * fix tests * fmt --- nodes/nomos-node/src/bridges/mod.rs | 53 +++++----- nodes/nomos-node/src/lib.rs | 13 ++- nodes/nomos-node/src/main.rs | 14 ++- nomos-services/consensus/src/lib.rs | 98 +++++++++++-------- .../mempool/src/backend/mockpool.rs | 98 +++++++++---------- nomos-services/mempool/src/backend/mod.rs | 35 +++---- nomos-services/mempool/src/lib.rs | 97 ++++++++++-------- .../mempool/src/network/adapters/libp2p.rs | 40 +++++--- .../mempool/src/network/adapters/mock.rs | 14 ++- .../mempool/src/network/adapters/waku.rs | 27 +++-- nomos-services/mempool/src/network/mod.rs | 10 +- nomos-services/mempool/tests/mock.rs | 18 +++- 12 files changed, 297 insertions(+), 220 deletions(-) diff --git a/nodes/nomos-node/src/bridges/mod.rs b/nodes/nomos-node/src/bridges/mod.rs index f16ac967..fe9aefdc 100644 --- a/nodes/nomos-node/src/bridges/mod.rs +++ b/nodes/nomos-node/src/bridges/mod.rs @@ -1,4 +1,5 @@ mod libp2p; +use libp2p::*; // std // crates @@ -9,24 +10,20 @@ use tokio::sync::mpsc::Sender; use tokio::sync::oneshot; use tracing::error; // internal +use nomos_core::tx::Transaction; use nomos_http::backends::axum::AxumBackend; use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner}; use nomos_http::http::{HttpMethod, HttpRequest, HttpResponse}; use nomos_mempool::backend::mockpool::MockPool; - -use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService}; - use nomos_mempool::network::adapters::libp2p::Libp2pAdapter; - +use nomos_mempool::network::NetworkAdapter; +use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService}; use nomos_network::backends::libp2p::Libp2p; +use nomos_network::backends::NetworkBackend; use nomos_network::NetworkService; use nomos_node::{Carnot, Tx}; use overwatch_rs::services::relay::OutboundRelay; -use libp2p::*; -use nomos_mempool::network::NetworkAdapter; -use nomos_network::backends::NetworkBackend; - macro_rules! get_handler { ($handle:expr, $service:ty, $path:expr => $handler:tt) => {{ let (channel, mut http_request_channel) = @@ -54,7 +51,7 @@ pub fn mempool_metrics_bridge( handle: overwatch_rs::overwatch::handle::OverwatchHandle, ) -> HttpBridgeRunner { Box::new(Box::pin(async move { - get_handler!(handle, MempoolService, MockPool>, "metrics" => handle_mempool_metrics_req) + get_handler!(handle, MempoolService::Hash>, MockPool::Hash>>, "metrics" => handle_mempool_metrics_req) })) } @@ -66,19 +63,24 @@ pub fn network_info_bridge( })) } -pub fn mempool_add_tx_bridge< - N: NetworkBackend, - A: NetworkAdapter + Send + Sync + 'static, ->( +pub fn mempool_add_tx_bridge( handle: overwatch_rs::overwatch::handle::OverwatchHandle, -) -> HttpBridgeRunner { +) -> HttpBridgeRunner +where + N: NetworkBackend, + A: NetworkAdapter::Hash> + + Send + + Sync + + 'static, + A::Settings: Send + Sync, +{ Box::new(Box::pin(async move { let (mempool_channel, mut http_request_channel) = - build_http_bridge::>, AxumBackend, _>( - handle.clone(), - HttpMethod::POST, - "addtx", - ) + build_http_bridge::< + MempoolService::Hash>>, + AxumBackend, + _, + >(handle.clone(), HttpMethod::POST, "addtx") .await .unwrap(); @@ -114,7 +116,7 @@ async fn handle_carnot_info_req( } async fn handle_mempool_metrics_req( - mempool_channel: &OutboundRelay>, + mempool_channel: &OutboundRelay::Hash>>, res_tx: Sender, ) -> Result<(), overwatch_rs::DynError> { let (sender, receiver) = oneshot::channel(); @@ -129,8 +131,8 @@ async fn handle_mempool_metrics_req( res_tx // TODO: use serde to serialize metrics .send(Ok(format!( - "{{\"pending_tx\": {}, \"last_tx\": {}}}", - metrics.pending_txs, metrics.last_tx_timestamp + "{{\"pending_items\": {}, \"last_item\": {}}}", + metrics.pending_items, metrics.last_item_timestamp ) .into())) .await?; @@ -140,7 +142,7 @@ async fn handle_mempool_metrics_req( pub(super) async fn handle_mempool_add_tx_req( handle: &overwatch_rs::overwatch::handle::OverwatchHandle, - mempool_channel: &OutboundRelay>, + mempool_channel: &OutboundRelay::Hash>>, res_tx: Sender, payload: Option, ) -> Result<(), overwatch_rs::DynError> { @@ -151,8 +153,9 @@ pub(super) async fn handle_mempool_add_tx_req( let tx = Tx(data); let (sender, receiver) = oneshot::channel(); mempool_channel - .send(MempoolMsg::AddTx { - tx: tx.clone(), + .send(MempoolMsg::Add { + item: tx.clone(), + key: tx.hash(), reply_channel: sender, }) .await diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index 7783b856..3dff12c8 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -10,7 +10,7 @@ use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsServic use nomos_consensus::network::adapters::libp2p::Libp2pAdapter as ConsensusLibp2pAdapter; use nomos_consensus::CarnotConsensus; - +use nomos_core::tx::Transaction; use nomos_da::{ backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaLibp2pAdapter, DataAvailabilityService, @@ -39,8 +39,8 @@ const MB16: usize = 1024 * 1024 * 16; pub type Carnot = CarnotConsensus< ConsensusLibp2pAdapter, - MockPool, - MempoolLibp2pAdapter, + MockPool::Hash>, + MempoolLibp2pAdapter::Hash>, FlatOverlay, Certificate, FillSizeWithTx, @@ -53,11 +53,16 @@ type DataAvailability = DataAvailabilityService< DaLibp2pAdapter, >; +type Mempool = MempoolService< + MempoolLibp2pAdapter::Hash>, + MockPool::Hash>, +>; + #[derive(Services)] pub struct Nomos { logging: ServiceHandle, network: ServiceHandle>, - mockpool: ServiceHandle, MockPool>>, + mockpool: ServiceHandle, consensus: ServiceHandle, http: ServiceHandle>, bridges: ServiceHandle, diff --git a/nodes/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs index a0ed7c08..a3307db6 100644 --- a/nodes/nomos-node/src/main.rs +++ b/nodes/nomos-node/src/main.rs @@ -8,11 +8,13 @@ mod bridges; use clap::Parser; use color_eyre::eyre::{eyre, Result}; use nomos_http::bridge::{HttpBridge, HttpBridgeSettings}; -use nomos_mempool::network::adapters::libp2p::Libp2pAdapter; +use nomos_mempool::network::adapters::libp2p::{Libp2pAdapter, Settings as AdapterSettings}; use nomos_network::backends::libp2p::Libp2p; use overwatch_rs::overwatch::*; use std::sync::Arc; +use nomos_core::tx::Transaction; + #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { @@ -56,7 +58,7 @@ fn main() -> Result<()> { Arc::new(Box::new(bridges::mempool_metrics_bridge)), Arc::new(Box::new(bridges::network_info_bridge)), Arc::new(Box::new( - bridges::mempool_add_tx_bridge::>, + bridges::mempool_add_tx_bridge::::Hash>>, )), ]; let app = OverwatchRunner::::run( @@ -64,7 +66,13 @@ fn main() -> Result<()> { network: config.network, logging: config.log, http: config.http, - mockpool: (), + mockpool: nomos_mempool::Settings { + backend: (), + network: AdapterSettings { + topic: String::from("tx"), + id: ::hash, + }, + }, consensus: config.consensus, bridges: HttpBridgeSettings { bridges }, #[cfg(feature = "metrics")] diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 784bb062..838b8547 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -106,13 +106,13 @@ impl CarnotSettings { pub struct CarnotConsensus where A: NetworkAdapter, - M: MempoolAdapter, + M: MempoolAdapter, P: MemPool, O: Overlay + Debug, - P::Tx: Transaction + Debug + 'static, - ::Hash: Debug, + P::Item: Debug + 'static, + P::Key: Debug + 'static, A::Backend: 'static, - TxS: TxSelect, + TxS: TxSelect, BS: BlobCertificateSelect, { service_state: ServiceStateHandle, @@ -129,11 +129,11 @@ impl ServiceData for CarnotConsensus::Hash: Debug, - M: MempoolAdapter, + P::Item: Debug, + P::Key: Debug, + M: MempoolAdapter, O: Overlay + Debug, - TxS: TxSelect, + TxS: TxSelect, BS: BlobCertificateSelect, { const SERVICE_ID: ServiceId = "Carnot"; @@ -149,9 +149,16 @@ where A: NetworkAdapter + Clone + Send + Sync + 'static, P: MemPool + Send + Sync + 'static, P::Settings: Send + Sync + 'static, - P::Tx: - Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static, - ::Hash: Debug + Send + Sync, + P::Item: Transaction + + Debug + + Clone + + Eq + + Hash + + Serialize + + serde::de::DeserializeOwned + + Send + + Sync + + 'static, C: Certificate + Debug + Clone @@ -162,11 +169,12 @@ where + Send + Sync + 'static, - M: MempoolAdapter + Send + Sync + 'static, + P::Key: Debug + Send + Sync, + M: MempoolAdapter + Send + Sync + 'static, O: Overlay + Debug + Send + Sync + 'static, O::LeaderSelection: UpdateableLeaderSelection, O::CommitteeMembership: UpdateableCommitteeMembership, - TxS: TxSelect + Clone + Send + Sync + 'static, + TxS: TxSelect + Clone + Send + Sync + 'static, TxS::Settings: Send + Sync + 'static, BS: BlobCertificateSelect + Clone + Send + Sync + 'static, BS::Settings: Send + Sync + 'static, @@ -310,9 +318,16 @@ where A: NetworkAdapter + Clone + Send + Sync + 'static, P: MemPool + Send + Sync + 'static, P::Settings: Send + Sync + 'static, - P::Tx: - Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static, - ::Hash: Debug + Send + Sync, + P::Item: Transaction + + Debug + + Clone + + Eq + + Hash + + Serialize + + serde::de::DeserializeOwned + + Send + + Sync + + 'static, C: Certificate + Debug + Clone @@ -323,12 +338,17 @@ where + Send + Sync + 'static, - M: MempoolAdapter + Send + Sync + 'static, O: Overlay + Debug + Send + Sync + 'static, O::LeaderSelection: UpdateableLeaderSelection, O::CommitteeMembership: UpdateableCommitteeMembership, - TxS: TxSelect + Clone + Send + Sync + 'static, + TxS: TxSelect + Clone + Send + Sync + 'static, BS: BlobCertificateSelect + Clone + Send + Sync + 'static, + P::Key: Debug + Send + Sync, + M: MempoolAdapter + Send + Sync + 'static, + O: Overlay + Debug + Send + Sync + 'static, + O::LeaderSelection: UpdateableLeaderSelection, + O::CommitteeMembership: UpdateableCommitteeMembership, + TxS: TxSelect + Clone + Send + Sync + 'static, { fn process_message(carnot: &Carnot, msg: ConsensusMsg) { match msg { @@ -352,11 +372,11 @@ where #[allow(clippy::too_many_arguments)] async fn process_carnot_event( mut carnot: Carnot, - event: Event, - task_manager: &mut TaskManager>, + event: Event, + task_manager: &mut TaskManager>, adapter: A, private_key: PrivateKey, - mempool_relay: OutboundRelay>, + mempool_relay: OutboundRelay>, tx_selector: TxS, blobl_selector: BS, timeout: Duration, @@ -372,7 +392,7 @@ where tracing::debug!("approving proposal {:?}", block); let (new_carnot, out) = carnot.approve_block(block); carnot = new_carnot; - output = Some(Output::Send::(out)); + output = Some(Output::Send::(out)); } Event::LocalTimeout { view } => { tracing::debug!("local timeout"); @@ -442,11 +462,11 @@ where #[instrument(level = "debug", skip(adapter, task_manager, stream))] async fn process_block( mut carnot: Carnot, - block: Block, - mut stream: Pin> + Send>>, - task_manager: &mut TaskManager>, + block: Block, + mut stream: Pin> + Send>>, + task_manager: &mut TaskManager>, adapter: A, - ) -> (Carnot, Option>) { + ) -> (Carnot, Option>) { tracing::debug!("received proposal {:?}", block); if carnot.highest_voted_view() >= block.header().view { tracing::debug!("already voted for view {}", block.header().view); @@ -522,9 +542,9 @@ where carnot: Carnot, timeout_qc: TimeoutQc, new_views: HashSet, - task_manager: &mut TaskManager>, + task_manager: &mut TaskManager>, adapter: A, - ) -> (Carnot, Option>) { + ) -> (Carnot, Option>) { let leader_committee = [carnot.id()].into_iter().collect(); let leader_tally_settings = CarnotTallySettings { threshold: carnot.leader_super_majority_threshold(), @@ -559,9 +579,9 @@ where async fn receive_timeout_qc( carnot: Carnot, timeout_qc: TimeoutQc, - task_manager: &mut TaskManager>, + task_manager: &mut TaskManager>, adapter: A, - ) -> (Carnot, Option>) { + ) -> (Carnot, Option>) { let mut new_state = carnot.receive_timeout_qc(timeout_qc.clone()); let self_committee = carnot.self_committee(); let tally_settings = CarnotTallySettings { @@ -586,7 +606,7 @@ where async fn process_root_timeout( carnot: Carnot, timeouts: HashSet, - ) -> (Carnot, Option>) { + ) -> (Carnot, Option>) { // we might have received a timeout_qc sent by some other node and advanced the view // already, in which case we should ignore the timeout if carnot.current_view() @@ -630,8 +650,8 @@ where qc: Qc, tx_selector: TxS, blob_selector: BS, - mempool_relay: OutboundRelay>, - ) -> Option> { + mempool_relay: OutboundRelay>, + ) -> Option> { let (reply_channel, rx) = tokio::sync::oneshot::channel(); let mut output = None; mempool_relay @@ -666,7 +686,7 @@ where async fn process_view_change( carnot: Carnot, prev_view: View, - task_manager: &mut TaskManager>, + task_manager: &mut TaskManager>, adapter: A, timeout: Duration, ) { @@ -703,7 +723,7 @@ where } } - async fn gather_timeout_qc(adapter: A, view: consensus_engine::View) -> Event { + async fn gather_timeout_qc(adapter: A, view: consensus_engine::View) -> Event { if let Some(timeout_qc) = adapter .timeout_qc_stream(view) .await @@ -723,7 +743,7 @@ where committee: Committee, block: consensus_engine::Block, tally: CarnotTallySettings, - ) -> Event { + ) -> Event { let tally = CarnotTally::new(tally); let votes_stream = adapter.votes_stream(&committee, block.view, block.id).await; match tally.tally(block.clone(), votes_stream).await { @@ -740,7 +760,7 @@ where committee: Committee, timeout_qc: TimeoutQc, tally: CarnotTallySettings, - ) -> Event { + ) -> Event { let tally = NewViewTally::new(tally); let stream = adapter .new_view_stream(&committee, timeout_qc.view().next()) @@ -762,7 +782,7 @@ where committee: Committee, view: consensus_engine::View, tally: CarnotTallySettings, - ) -> Event { + ) -> Event { let tally = TimeoutTally::new(tally); let stream = adapter.timeout_stream(&committee, view).await; match tally.tally(view, stream).await { @@ -774,7 +794,7 @@ where } #[instrument(level = "debug", skip(adapter))] - async fn gather_block(adapter: A, view: consensus_engine::View) -> Event { + async fn gather_block(adapter: A, view: consensus_engine::View) -> Event { let stream = adapter .proposal_chunks_stream(view) .await diff --git a/nomos-services/mempool/src/backend/mockpool.rs b/nomos-services/mempool/src/backend/mockpool.rs index 274a1c97..fb64ab80 100644 --- a/nomos-services/mempool/src/backend/mockpool.rs +++ b/nomos-services/mempool/src/backend/mockpool.rs @@ -7,58 +7,57 @@ use std::{collections::BTreeMap, time::UNIX_EPOCH}; // internal use crate::backend::{MemPool, MempoolError}; use nomos_core::block::BlockId; -use nomos_core::tx::Transaction; /// A mock mempool implementation that stores all transactions in memory in the order received. -pub struct MockPool -where - Tx::Hash: Hash, -{ - pending_txs: LinkedHashMap, - in_block_txs: BTreeMap>, - in_block_txs_by_id: BTreeMap, - last_tx_timestamp: u64, +pub struct MockPool { + pending_items: LinkedHashMap, + in_block_items: BTreeMap>, + in_block_items_by_id: BTreeMap, + last_item_timestamp: u64, } -impl Default for MockPool { +impl Default for MockPool +where + Key: Hash + Eq, +{ fn default() -> Self { Self { - pending_txs: LinkedHashMap::new(), - in_block_txs: BTreeMap::new(), - in_block_txs_by_id: BTreeMap::new(), - last_tx_timestamp: 0, + pending_items: LinkedHashMap::new(), + in_block_items: BTreeMap::new(), + in_block_items_by_id: BTreeMap::new(), + last_item_timestamp: 0, } } } -impl MockPool +impl MockPool where - Tx::Hash: Ord, + Key: Hash + Eq + Clone, { pub fn new() -> Self { Default::default() } } -impl MemPool for MockPool +impl MemPool for MockPool where - Tx: Transaction + Clone + Send + Sync + 'static + Hash, - Tx::Hash: Ord, + Item: Clone + Send + Sync + 'static + Hash, + Key: Clone + Ord + Hash, { type Settings = (); - type Tx = Tx; + type Item = Item; + type Key = Key; fn new(_settings: Self::Settings) -> Self { Self::new() } - fn add_tx(&mut self, tx: Self::Tx) -> Result<(), MempoolError> { - let id = ::hash(&tx); - if self.pending_txs.contains_key(&id) || self.in_block_txs_by_id.contains_key(&id) { - return Err(MempoolError::ExistingTx); + fn add_item(&mut self, key: Self::Key, item: Self::Item) -> Result<(), MempoolError> { + if self.pending_items.contains_key(&key) || self.in_block_items_by_id.contains_key(&key) { + return Err(MempoolError::ExistingItem); } - self.pending_txs.insert(id, tx); - self.last_tx_timestamp = SystemTime::now() + self.pending_items.insert(key, item); + self.last_item_timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_millis() as u64; @@ -66,47 +65,44 @@ where Ok(()) } - fn view(&self, _ancestor_hint: BlockId) -> Box + Send> { + fn view(&self, _ancestor_hint: BlockId) -> Box + Send> { // we need to have an owned version of the iterator to bypass adding a lifetime bound to the return iterator type #[allow(clippy::needless_collect)] - let pending_txs: Vec = self.pending_txs.values().cloned().collect(); - Box::new(pending_txs.into_iter()) + let pending_items: Vec = self.pending_items.values().cloned().collect(); + Box::new(pending_items.into_iter()) } - fn mark_in_block(&mut self, txs: &[::Hash], block: BlockId) { - let mut txs_in_block = Vec::with_capacity(txs.len()); - for tx_id in txs.iter() { - if let Some(tx) = self.pending_txs.remove(tx_id) { - txs_in_block.push(tx); + fn mark_in_block(&mut self, keys: &[Self::Key], block: BlockId) { + let mut items_in_block = Vec::with_capacity(keys.len()); + for key in keys { + if let Some(item) = self.pending_items.remove(key) { + items_in_block.push(item); } } - let block_entry = self.in_block_txs.entry(block).or_default(); - self.in_block_txs_by_id - .extend(txs.iter().cloned().map(|tx| (tx, block))); - block_entry.append(&mut txs_in_block); + let block_entry = self.in_block_items.entry(block).or_default(); + self.in_block_items_by_id + .extend(keys.iter().cloned().map(|key| (key, block))); + block_entry.append(&mut items_in_block); } #[cfg(test)] - fn block_transactions( - &self, - block: BlockId, - ) -> Option + Send>> { - self.in_block_txs.get(&block).map(|txs| { - Box::new(txs.clone().into_iter()) as Box + Send> + fn block_items(&self, block: BlockId) -> Option + Send>> { + self.in_block_items.get(&block).map(|items| { + Box::new(items.clone().into_iter()) as Box + Send> }) } - fn prune(&mut self, txs: &[::Hash]) { - for tx_id in txs { - self.pending_txs.remove(tx_id); + fn prune(&mut self, keys: &[Self::Key]) { + for key in keys { + self.pending_items.remove(key); } } - fn pending_tx_count(&self) -> usize { - self.pending_txs.len() + fn pending_item_count(&self) -> usize { + self.pending_items.len() } - fn last_tx_timestamp(&self) -> u64 { - self.last_tx_timestamp + fn last_item_timestamp(&self) -> u64 { + self.last_item_timestamp } } diff --git a/nomos-services/mempool/src/backend/mod.rs b/nomos-services/mempool/src/backend/mod.rs index 39fa4a0d..f26088fc 100644 --- a/nomos-services/mempool/src/backend/mod.rs +++ b/nomos-services/mempool/src/backend/mod.rs @@ -2,47 +2,44 @@ pub mod mockpool; use nomos_core::block::BlockId; -use nomos_core::tx::Transaction; #[derive(thiserror::Error, Debug)] pub enum MempoolError { - #[error("Tx already in mempool")] - ExistingTx, + #[error("Item already in mempool")] + ExistingItem, #[error(transparent)] DynamicPoolError(#[from] overwatch_rs::DynError), } pub trait MemPool { type Settings: Clone; - type Tx: Transaction; + type Item; + type Key; /// Construct a new empty pool fn new(settings: Self::Settings) -> Self; - /// Add a new transaction to the mempool, for example because we received it from the network - fn add_tx(&mut self, tx: Self::Tx) -> Result<(), MempoolError>; + /// Add a new item to the mempool, for example because we received it from the network + fn add_item(&mut self, key: Self::Key, item: Self::Item) -> Result<(), MempoolError>; - /// Return a view over the transactions contained in the mempool. - /// Implementations should provide *at least* all the transactions which have not been marked as + /// Return a view over items contained in the mempool. + /// Implementations should provide *at least* all the items which have not been marked as /// in a block. /// The hint on the ancestor *can* be used by the implementation to display additional - /// transactions that were not included up to that point if available. - fn view(&self, ancestor_hint: BlockId) -> Box + Send>; + /// items that were not included up to that point if available. + fn view(&self, ancestor_hint: BlockId) -> Box + Send>; - /// Record that a set of transactions were included in a block - fn mark_in_block(&mut self, txs: &[::Hash], block: BlockId); + /// Record that a set of items were included in a block + fn mark_in_block(&mut self, items: &[Self::Key], block: BlockId); /// Returns all of the transactions for the block #[cfg(test)] - fn block_transactions( - &self, - block: BlockId, - ) -> Option + Send>>; + fn block_items(&self, block: BlockId) -> Option + Send>>; /// Signal that a set of transactions can't be possibly requested anymore and can be /// discarded. - fn prune(&mut self, txs: &[::Hash]); + fn prune(&mut self, items: &[Self::Key]); - fn pending_tx_count(&self) -> usize; - fn last_tx_timestamp(&self) -> u64; + fn pending_item_count(&self) -> usize; + fn last_item_timestamp(&self) -> u64; } diff --git a/nomos-services/mempool/src/lib.rs b/nomos-services/mempool/src/lib.rs index c1dc65c8..728b4681 100644 --- a/nomos-services/mempool/src/lib.rs +++ b/nomos-services/mempool/src/lib.rs @@ -11,7 +11,6 @@ use tokio::sync::oneshot::Sender; use crate::network::NetworkAdapter; use backend::MemPool; use nomos_core::block::BlockId; -use nomos_core::tx::Transaction; use nomos_network::NetworkService; use overwatch_rs::services::{ handle::ServiceStateHandle, @@ -22,11 +21,11 @@ use overwatch_rs::services::{ pub struct MempoolService where - N: NetworkAdapter, + N: NetworkAdapter, P: MemPool, P::Settings: Clone, - P::Tx: Debug + 'static, - ::Hash: Debug, + P::Item: Debug + 'static, + P::Key: Debug + 'static, { service_state: ServiceStateHandle, network_relay: Relay>, @@ -34,29 +33,30 @@ where } pub struct MempoolMetrics { - pub pending_txs: usize, - pub last_tx_timestamp: u64, + pub pending_items: usize, + pub last_item_timestamp: u64, } -pub enum MempoolMsg { - AddTx { - tx: Tx, +pub enum MempoolMsg { + Add { + item: Item, + key: Key, reply_channel: Sender>, }, View { ancestor_hint: BlockId, - reply_channel: Sender + Send>>, + reply_channel: Sender + Send>>, }, Prune { - ids: Vec, + ids: Vec, }, #[cfg(test)] - BlockTransaction { + BlockItems { block: BlockId, - reply_channel: Sender + Send>>>, + reply_channel: Sender + Send>>>, }, MarkInBlock { - ids: Vec, + ids: Vec, block: BlockId, }, Metrics { @@ -64,16 +64,17 @@ pub enum MempoolMsg { }, } -impl Debug for MempoolMsg +impl Debug for MempoolMsg where - Tx::Hash: Debug, + Item: Debug, + Key: Debug, { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> { match self { Self::View { ancestor_hint, .. } => { write!(f, "MempoolMsg::View {{ ancestor_hint: {ancestor_hint:?}}}") } - Self::AddTx { tx, .. } => write!(f, "MempoolMsg::AddTx{{tx: {tx:?}}}"), + Self::Add { item, .. } => write!(f, "MempoolMsg::Add{{item: {item:?}}}"), Self::Prune { ids } => write!(f, "MempoolMsg::Prune{{ids: {ids:?}}}"), Self::MarkInBlock { ids, block } => { write!( @@ -82,29 +83,29 @@ where ) } #[cfg(test)] - Self::BlockTransaction { block, .. } => { - write!(f, "MempoolMsg::BlockTransaction{{block: {block:?}}}") + Self::BlockItems { block, .. } => { + write!(f, "MempoolMsg::BlockItem{{block: {block:?}}}") } Self::Metrics { .. } => write!(f, "MempoolMsg::Metrics"), } } } -impl RelayMessage for MempoolMsg {} +impl RelayMessage for MempoolMsg {} impl ServiceData for MempoolService where - N: NetworkAdapter, + N: NetworkAdapter, P: MemPool, P::Settings: Clone, - P::Tx: Debug + 'static, - ::Hash: Debug, + P::Item: Debug + 'static, + P::Key: Debug + 'static, { const SERVICE_ID: ServiceId = "Mempool"; - type Settings = P::Settings; + type Settings = Settings; type State = NoState; type StateOperator = NoOperator; - type Message = MempoolMsg<

::Tx>; + type Message = MempoolMsg<

::Item,

::Key>; } #[async_trait::async_trait] @@ -112,17 +113,18 @@ impl ServiceCore for MempoolService where P: MemPool + Send + 'static, P::Settings: Clone + Send + Sync + 'static, - P::Tx: Transaction + Clone + Debug + Send + Sync + 'static, - ::Hash: Debug + Send + Sync + 'static, - N: NetworkAdapter + Send + Sync + 'static, + N::Settings: Clone + Send + Sync + 'static, + P::Item: Clone + Debug + Send + Sync + 'static, + P::Key: Debug + Send + Sync + 'static, + N: NetworkAdapter + Send + Sync + 'static, { fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); - let pool_settings = service_state.settings_reader.get_updated_settings(); + let settings = service_state.settings_reader.get_updated_settings(); Ok(Self { service_state, network_relay, - pool: P::new(pool_settings), + pool: P::new(settings.backend), }) } @@ -138,15 +140,20 @@ where .await .expect("Relay connection with NetworkService should succeed"); - let adapter = N::new(network_relay).await; - let mut network_txs = adapter.transactions_stream().await; + let adapter = N::new( + service_state.settings_reader.get_updated_settings().network, + network_relay, + ); + let adapter = adapter.await; + + let mut network_items = adapter.transactions_stream().await; loop { tokio::select! { Some(msg) = service_state.inbound_relay.recv() => { match msg { - MempoolMsg::AddTx { tx, reply_channel } => { - match pool.add_tx(tx.clone()) { + MempoolMsg::Add { item, key, reply_channel } => { + match pool.add_item(key, item) { Ok(_id) => { if let Err(e) = reply_channel.send(Ok(())) { tracing::debug!("Failed to send reply to AddTx: {:?}", e); @@ -166,16 +173,16 @@ where pool.mark_in_block(&ids, block); } #[cfg(test)] - MempoolMsg::BlockTransaction { block, reply_channel } => { - reply_channel.send(pool.block_transactions(block)).unwrap_or_else(|_| { - tracing::debug!("could not send back block transactions") + MempoolMsg::BlockItems { block, reply_channel } => { + reply_channel.send(pool.block_items(block)).unwrap_or_else(|_| { + tracing::debug!("could not send back block items") }); } MempoolMsg::Prune { ids } => { pool.prune(&ids); }, MempoolMsg::Metrics { reply_channel } => { let metrics = MempoolMetrics { - pending_txs: pool.pending_tx_count(), - last_tx_timestamp: pool.last_tx_timestamp(), + pending_items: pool.pending_item_count(), + last_item_timestamp: pool.last_item_timestamp(), }; reply_channel.send(metrics).unwrap_or_else(|_| { tracing::debug!("could not send back mempool metrics") @@ -183,12 +190,18 @@ where } } } - Some(tx) = network_txs.next() => { - pool.add_tx(tx).unwrap_or_else(|e| { - tracing::debug!("could not add tx to the pool due to: {}", e) + Some((key, item )) = network_items.next() => { + pool.add_item(key, item).unwrap_or_else(|e| { + tracing::debug!("could not add item to the pool due to: {}", e) }); } } } } } + +#[derive(Clone, Debug)] +pub struct Settings { + pub backend: B, + pub network: N, +} diff --git a/nomos-services/mempool/src/network/adapters/libp2p.rs b/nomos-services/mempool/src/network/adapters/libp2p.rs index 5d2a3ef8..756879e8 100644 --- a/nomos-services/mempool/src/network/adapters/libp2p.rs +++ b/nomos-services/mempool/src/network/adapters/libp2p.rs @@ -1,14 +1,9 @@ -// std -use std::marker::PhantomData; // crates use futures::Stream; use serde::{de::DeserializeOwned, Serialize}; use tokio_stream::wrappers::BroadcastStream; use tokio_stream::StreamExt; -use tracing::log::error; - // internal -use crate::network::messages::TransactionMsg; use crate::network::NetworkAdapter; use nomos_core::wire; use nomos_network::backends::libp2p::{Command, Event, EventKind, Libp2p, Message, TopicHash}; @@ -18,20 +13,24 @@ use overwatch_rs::services::ServiceData; pub const CARNOT_TX_TOPIC: &str = "CarnotTx"; -pub struct Libp2pAdapter { +pub struct Libp2pAdapter { network_relay: OutboundRelay< as ServiceData>::Message>, - _tx: PhantomData, + settings: Settings, } #[async_trait::async_trait] -impl NetworkAdapter for Libp2pAdapter +impl NetworkAdapter for Libp2pAdapter where - Tx: DeserializeOwned + Serialize + Send + Sync + 'static, + Item: DeserializeOwned + Serialize + Send + Sync + 'static + Clone, + Key: Clone + Send + Sync + 'static, { type Backend = Libp2p; - type Tx = Tx; + type Settings = Settings; + type Item = Item; + type Key = Key; async fn new( + settings: Self::Settings, network_relay: OutboundRelay< as ServiceData>::Message>, ) -> Self { network_relay @@ -42,11 +41,14 @@ where .expect("Network backend should be ready"); Self { network_relay, - _tx: PhantomData, + settings, } } - async fn transactions_stream(&self) -> Box + Unpin + Send> { - let topic_hash = TopicHash::from_raw(CARNOT_TX_TOPIC); + async fn transactions_stream( + &self, + ) -> Box + Unpin + Send> { + let topic_hash = TopicHash::from_raw(self.settings.topic.clone()); + let id = self.settings.id; let (sender, receiver) = tokio::sync::oneshot::channel(); self.network_relay .send(NetworkMsg::Subscribe { @@ -59,10 +61,10 @@ where Box::new(Box::pin(BroadcastStream::new(receiver).filter_map( move |message| match message { Ok(Event::Message(Message { data, topic, .. })) if topic == topic_hash => { - match wire::deserialize::>(&data) { - Ok(msg) => Some(msg.tx), + match wire::deserialize::(&data) { + Ok(item) => Some((id(&item), item)), Err(e) => { - error!("Unrecognized Tx message: {e}"); + tracing::debug!("Unrecognized message: {e}"); None } } @@ -72,3 +74,9 @@ where ))) } } + +#[derive(Clone, Debug)] +pub struct Settings { + pub topic: String, + pub id: fn(&V) -> K, +} diff --git a/nomos-services/mempool/src/network/adapters/mock.rs b/nomos-services/mempool/src/network/adapters/mock.rs index 11252e02..5bbc42ae 100644 --- a/nomos-services/mempool/src/network/adapters/mock.rs +++ b/nomos-services/mempool/src/network/adapters/mock.rs @@ -2,7 +2,7 @@ // crates use futures::{Stream, StreamExt}; -use nomos_core::tx::mock::MockTransaction; +use nomos_core::tx::mock::{MockTransaction, MockTxId}; use nomos_network::backends::mock::{ EventKind, Mock, MockBackendMessage, MockContentTopic, MockMessage, NetworkEvent, }; @@ -25,9 +25,12 @@ pub struct MockAdapter { #[async_trait::async_trait] impl NetworkAdapter for MockAdapter { type Backend = Mock; - type Tx = MockTransaction; + type Settings = (); + type Item = MockTransaction; + type Key = MockTxId; async fn new( + _settings: Self::Settings, network_relay: OutboundRelay< as ServiceData>::Message>, ) -> Self { // send message to boot the network producer @@ -57,7 +60,9 @@ impl NetworkAdapter for MockAdapter { Self { network_relay } } - async fn transactions_stream(&self) -> Box + Unpin + Send> { + async fn transactions_stream( + &self, + ) -> Box + Unpin + Send> { let (sender, receiver) = tokio::sync::oneshot::channel(); if let Err((_, e)) = self .network_relay @@ -77,7 +82,8 @@ impl NetworkAdapter for MockAdapter { Ok(NetworkEvent::RawMessage(message)) => { tracing::info!("Received message: {:?}", message.payload()); if message.content_topic().eq(&MOCK_TX_CONTENT_TOPIC) { - Some(MockTransaction::new(message)) + let tx = MockTransaction::new(message); + Some((tx.id(), tx)) } else { None } diff --git a/nomos-services/mempool/src/network/adapters/waku.rs b/nomos-services/mempool/src/network/adapters/waku.rs index e072f5be..49f87a3a 100644 --- a/nomos-services/mempool/src/network/adapters/waku.rs +++ b/nomos-services/mempool/src/network/adapters/waku.rs @@ -5,7 +5,6 @@ use futures::{Stream, StreamExt}; use serde::de::DeserializeOwned; use tokio_stream::wrappers::BroadcastStream; // internal -use crate::network::messages::TransactionMsg; use crate::network::NetworkAdapter; use nomos_core::wire; use nomos_network::backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage}; @@ -21,20 +20,24 @@ pub const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic = pub const WAKU_CARNOT_TX_CONTENT_TOPIC: WakuContentTopic = WakuContentTopic::new("CarnotSim", 1, "CarnotTx", Encoding::Proto); -pub struct WakuAdapter { +pub struct WakuAdapter { network_relay: OutboundRelay< as ServiceData>::Message>, - _tx: PhantomData, + _item: PhantomData, } #[async_trait::async_trait] -impl NetworkAdapter for WakuAdapter +impl NetworkAdapter for WakuAdapter where - Tx: DeserializeOwned + Serialize + Send + Sync + 'static, + Item: DeserializeOwned + Serialize + Send + Sync + 'static, { type Backend = Waku; - type Tx = Tx; + type Settings = (); + type Item = Item; + // TODO: implement real key + type Key = (); async fn new( + _settings: Self::Settings, network_relay: OutboundRelay< as ServiceData>::Message>, ) -> Self { // Subscribe to the carnot pubsub topic @@ -50,10 +53,13 @@ where }; Self { network_relay, - _tx: Default::default(), + _item: Default::default(), } } - async fn transactions_stream(&self) -> Box + Unpin + Send> { + + async fn transactions_stream( + &self, + ) -> Box + Unpin + Send> { let (sender, receiver) = tokio::sync::oneshot::channel(); if let Err((_, _e)) = self .network_relay @@ -71,9 +77,10 @@ where match event { Ok(NetworkEvent::RawMessage(message)) => { if message.content_topic() == &WAKU_CARNOT_TX_CONTENT_TOPIC { - let tx: TransactionMsg = + let item: Self::Item = wire::deserializer(message.payload()).deserialize().unwrap(); - Some(tx.tx) + // TODO: implement real key + Some(((), item)) } else { None } diff --git a/nomos-services/mempool/src/network/mod.rs b/nomos-services/mempool/src/network/mod.rs index 8c56d973..da13d136 100644 --- a/nomos-services/mempool/src/network/mod.rs +++ b/nomos-services/mempool/src/network/mod.rs @@ -14,9 +14,15 @@ use overwatch_rs::services::ServiceData; #[async_trait::async_trait] pub trait NetworkAdapter { type Backend: NetworkBackend + 'static; - type Tx: Send + Sync + 'static; + type Settings: Clone; + + type Item: Send + Sync + 'static; + type Key: Send + Sync + 'static; async fn new( + settings: Self::Settings, network_relay: OutboundRelay< as ServiceData>::Message>, ) -> Self; - async fn transactions_stream(&self) -> Box + Unpin + Send>; + async fn transactions_stream( + &self, + ) -> Box + Unpin + Send>; } diff --git a/nomos-services/mempool/tests/mock.rs b/nomos-services/mempool/tests/mock.rs index c317fb71..329d0a3e 100644 --- a/nomos-services/mempool/tests/mock.rs +++ b/nomos-services/mempool/tests/mock.rs @@ -1,4 +1,7 @@ -use nomos_core::{block::BlockId, tx::mock::MockTransaction}; +use nomos_core::{ + block::BlockId, + tx::mock::{MockTransaction, MockTxId}, +}; use nomos_log::{Logger, LoggerSettings}; use nomos_network::{ backends::mock::{Mock, MockBackendMessage, MockConfig, MockMessage}, @@ -10,14 +13,16 @@ use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle}; use nomos_mempool::{ backend::mockpool::MockPool, network::adapters::mock::{MockAdapter, MOCK_TX_CONTENT_TOPIC}, - MempoolMsg, MempoolService, + MempoolMsg, MempoolService, Settings, }; #[derive(Services)] struct MockPoolNode { logging: ServiceHandle, network: ServiceHandle>, - mockpool: ServiceHandle>>>, + mockpool: ServiceHandle< + MempoolService, MockTxId>>, + >, } #[test] @@ -56,7 +61,10 @@ fn test_mockmempool() { weights: None, }, }, - mockpool: (), + mockpool: Settings { + backend: (), + network: (), + }, logging: LoggerSettings::default(), }, None, @@ -67,7 +75,7 @@ fn test_mockmempool() { let network = app.handle().relay::>(); let mempool = app .handle() - .relay::>>>(); + .relay::, MockTxId>>>(); app.spawn(async move { let network_outbound = network.connect().await.unwrap();