From 677d4a245cb30ce0bcac05eea097ea7f33950f51 Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Date: Wed, 18 Jan 2023 03:11:03 -0800 Subject: [PATCH] Mockpool (#43) * Use ids slices on mempool trait * Create a mock mempool Added placeholders for block/blockheader/blockid relations * Use linked hashmap * Fix bounds * Remove unnecessary bound --- nomos-core/src/block.rs | 8 +- nomos-services/mempool/Cargo.toml | 6 +- .../mempool/src/backend/mockpool.rs | 86 +++++++++++++++++++ nomos-services/mempool/src/backend/mod.rs | 7 +- nomos-services/mempool/src/lib.rs | 4 +- 5 files changed, 104 insertions(+), 7 deletions(-) create mode 100644 nomos-services/mempool/src/backend/mockpool.rs diff --git a/nomos-core/src/block.rs b/nomos-core/src/block.rs index dc1095d7..77d2f143 100644 --- a/nomos-core/src/block.rs +++ b/nomos-core/src/block.rs @@ -12,7 +12,7 @@ pub struct Block; pub struct BlockHeader; /// Identifier of a block -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)] pub struct BlockId; impl Block { @@ -25,3 +25,9 @@ impl Block { Self } } + +impl BlockHeader { + pub fn id(&self) -> BlockId { + BlockId + } +} diff --git a/nomos-services/mempool/Cargo.toml b/nomos-services/mempool/Cargo.toml index e2e490cc..1ed49459 100644 --- a/nomos-services/mempool/Cargo.toml +++ b/nomos-services/mempool/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" async-trait = "0.1" bincode = { version = "2.0.0-rc.2", features = ["serde"] } futures = "0.3" +linked-hash-map = { verison = "0.5.6", optional = true } nomos-network = { path = "../network", features = ["waku"] } nomos-core = { path = "../../nomos-core" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } @@ -16,9 +17,10 @@ serde = { version = "1.0", features = ["derive"] } tracing = "0.1" tokio = { version = "1", features = ["sync"] } tokio-stream = "0.1" -waku-bindings = { version = "0.1.0-beta1", optional = true} +waku-bindings = { version = "0.1.0-beta2", optional = true} [features] default = [] -waku = ["nomos-network/waku", "waku-bindings"] \ No newline at end of file +waku = ["nomos-network/waku", "waku-bindings"] +mock = ["linked-hash-map"] diff --git a/nomos-services/mempool/src/backend/mockpool.rs b/nomos-services/mempool/src/backend/mockpool.rs new file mode 100644 index 00000000..11583207 --- /dev/null +++ b/nomos-services/mempool/src/backend/mockpool.rs @@ -0,0 +1,86 @@ +// std +use linked_hash_map::LinkedHashMap; +use std::collections::BTreeMap; +use std::hash::Hash; +// crates +// internal +use crate::backend::MemPool; +use nomos_core::block::{BlockHeader, BlockId}; + +/// A mock mempool implementation that stores all transactions in memory in the order received. +pub struct MockPool { + pending_txs: LinkedHashMap, + in_block_txs: BTreeMap>, + in_block_txs_by_id: BTreeMap, +} + +impl Default for MockPool +where + Id: Eq + Hash, +{ + fn default() -> Self { + Self { + pending_txs: LinkedHashMap::new(), + in_block_txs: BTreeMap::new(), + in_block_txs_by_id: BTreeMap::new(), + } + } +} + +impl MockPool +where + Id: Eq + Hash, +{ + pub fn new() -> Self { + Default::default() + } +} + +impl MemPool for MockPool +where + Id: From + PartialOrd + Ord + Eq + Hash + Clone, + Tx: Clone + Send + Sync + 'static + Hash, +{ + type Settings = (); + type Tx = Tx; + type Id = Id; + + fn new(_settings: Self::Settings) -> Self { + Self::new() + } + + fn add_tx(&mut self, tx: Self::Tx) -> Result<(), overwatch_rs::DynError> { + let id = Id::from(tx.clone()); + if self.pending_txs.contains_key(&id) || self.in_block_txs_by_id.contains_key(&id) { + return Ok(()); + } + self.pending_txs.insert(id, tx); + Ok(()) + } + + fn view(&self, _ancestor_hint: BlockId) -> Box + Send> { + // we need to have an owned version of the iterator to bypass adding a lifetime bound to the return iterator type + #[allow(clippy::needless_collect)] + let pending_txs: Vec = self.pending_txs.values().cloned().collect(); + Box::new(pending_txs.into_iter()) + } + + fn mark_in_block(&mut self, txs: &[Self::Id], block: BlockHeader) { + let mut txs_in_block = Vec::with_capacity(txs.len()); + for tx_id in txs.iter() { + if let Some(tx) = self.pending_txs.remove(tx_id) { + txs_in_block.push(tx); + } + } + let block_entry = self.in_block_txs.entry(block.id()).or_default(); + self.in_block_txs_by_id + .extend(txs.iter().cloned().map(|tx| (tx, block.id()))); + block_entry.append(&mut txs_in_block); + } + + fn prune(&mut self, txs: &[Self::Id]) { + for tx_id in txs { + self.pending_txs.remove(tx_id); + } + } +} diff --git a/nomos-services/mempool/src/backend/mod.rs b/nomos-services/mempool/src/backend/mod.rs index e957b43e..4a98be8d 100644 --- a/nomos-services/mempool/src/backend/mod.rs +++ b/nomos-services/mempool/src/backend/mod.rs @@ -1,3 +1,6 @@ +#[cfg(feature = "mock")] +pub mod mockpool; + use nomos_core::block::{BlockHeader, BlockId}; pub trait MemPool { @@ -19,9 +22,9 @@ pub trait MemPool { fn view(&self, ancestor_hint: BlockId) -> Box + Send>; /// Record that a set of transactions were included in a block - fn mark_in_block(&mut self, txs: Vec, block: BlockHeader); + fn mark_in_block(&mut self, txs: &[Self::Id], block: BlockHeader); /// Signal that a set of transactions can't be possibly requested anymore and can be /// discarded. - fn prune(&mut self, txs: Vec); + fn prune(&mut self, txs: &[Self::Id]); } diff --git a/nomos-services/mempool/src/lib.rs b/nomos-services/mempool/src/lib.rs index bc63dfc1..60c097d9 100644 --- a/nomos-services/mempool/src/lib.rs +++ b/nomos-services/mempool/src/lib.rs @@ -139,9 +139,9 @@ where }); } MempoolMsg::MarkInBlock { ids, block } => { - pool.mark_in_block(ids, block); + pool.mark_in_block(&ids, block); } - MempoolMsg::Prune { ids } => { pool.prune(ids); }, + MempoolMsg::Prune { ids } => { pool.prune(&ids); }, } } Some(tx) = network_txs.next() => {