Mockpool (#43)
* Use ids slices on mempool trait * Create a mock mempool Added placeholders for block/blockheader/blockid relations * Use linked hashmap * Fix bounds * Remove unnecessary bound
This commit is contained in:
parent
f5a1dd5513
commit
677d4a245c
@ -12,7 +12,7 @@ pub struct Block;
|
||||
pub struct BlockHeader;
|
||||
|
||||
/// Identifier of a block
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)]
|
||||
pub struct BlockId;
|
||||
|
||||
impl Block {
|
||||
@ -25,3 +25,9 @@ impl Block {
|
||||
Self
|
||||
}
|
||||
}
|
||||
|
||||
impl BlockHeader {
|
||||
pub fn id(&self) -> BlockId {
|
||||
BlockId
|
||||
}
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ edition = "2021"
|
||||
async-trait = "0.1"
|
||||
bincode = { version = "2.0.0-rc.2", features = ["serde"] }
|
||||
futures = "0.3"
|
||||
linked-hash-map = { verison = "0.5.6", optional = true }
|
||||
nomos-network = { path = "../network", features = ["waku"] }
|
||||
nomos-core = { path = "../../nomos-core" }
|
||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
|
||||
@ -16,9 +17,10 @@ serde = { version = "1.0", features = ["derive"] }
|
||||
tracing = "0.1"
|
||||
tokio = { version = "1", features = ["sync"] }
|
||||
tokio-stream = "0.1"
|
||||
waku-bindings = { version = "0.1.0-beta1", optional = true}
|
||||
waku-bindings = { version = "0.1.0-beta2", optional = true}
|
||||
|
||||
|
||||
[features]
|
||||
default = []
|
||||
waku = ["nomos-network/waku", "waku-bindings"]
|
||||
mock = ["linked-hash-map"]
|
||||
|
86
nomos-services/mempool/src/backend/mockpool.rs
Normal file
86
nomos-services/mempool/src/backend/mockpool.rs
Normal file
@ -0,0 +1,86 @@
|
||||
// std
|
||||
use linked_hash_map::LinkedHashMap;
|
||||
use std::collections::BTreeMap;
|
||||
use std::hash::Hash;
|
||||
// crates
|
||||
// internal
|
||||
use crate::backend::MemPool;
|
||||
use nomos_core::block::{BlockHeader, BlockId};
|
||||
|
||||
/// A mock mempool implementation that stores all transactions in memory in the order received.
|
||||
pub struct MockPool<Id, Tx> {
|
||||
pending_txs: LinkedHashMap<Id, Tx>,
|
||||
in_block_txs: BTreeMap<BlockId, Vec<Tx>>,
|
||||
in_block_txs_by_id: BTreeMap<Id, BlockId>,
|
||||
}
|
||||
|
||||
impl<Id, Tx> Default for MockPool<Id, Tx>
|
||||
where
|
||||
Id: Eq + Hash,
|
||||
{
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
pending_txs: LinkedHashMap::new(),
|
||||
in_block_txs: BTreeMap::new(),
|
||||
in_block_txs_by_id: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Id, Tx> MockPool<Id, Tx>
|
||||
where
|
||||
Id: Eq + Hash,
|
||||
{
|
||||
pub fn new() -> Self {
|
||||
Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
impl<Id, Tx> MemPool for MockPool<Id, Tx>
|
||||
where
|
||||
Id: From<Tx> + PartialOrd + Ord + Eq + Hash + Clone,
|
||||
Tx: Clone + Send + Sync + 'static + Hash,
|
||||
{
|
||||
type Settings = ();
|
||||
type Tx = Tx;
|
||||
type Id = Id;
|
||||
|
||||
fn new(_settings: Self::Settings) -> Self {
|
||||
Self::new()
|
||||
}
|
||||
|
||||
fn add_tx(&mut self, tx: Self::Tx) -> Result<(), overwatch_rs::DynError> {
|
||||
let id = Id::from(tx.clone());
|
||||
if self.pending_txs.contains_key(&id) || self.in_block_txs_by_id.contains_key(&id) {
|
||||
return Ok(());
|
||||
}
|
||||
self.pending_txs.insert(id, tx);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn view(&self, _ancestor_hint: BlockId) -> Box<dyn Iterator<Item = Self::Tx> + Send> {
|
||||
// we need to have an owned version of the iterator to bypass adding a lifetime bound to the return iterator type
|
||||
#[allow(clippy::needless_collect)]
|
||||
let pending_txs: Vec<Tx> = self.pending_txs.values().cloned().collect();
|
||||
Box::new(pending_txs.into_iter())
|
||||
}
|
||||
|
||||
fn mark_in_block(&mut self, txs: &[Self::Id], block: BlockHeader) {
|
||||
let mut txs_in_block = Vec::with_capacity(txs.len());
|
||||
for tx_id in txs.iter() {
|
||||
if let Some(tx) = self.pending_txs.remove(tx_id) {
|
||||
txs_in_block.push(tx);
|
||||
}
|
||||
}
|
||||
let block_entry = self.in_block_txs.entry(block.id()).or_default();
|
||||
self.in_block_txs_by_id
|
||||
.extend(txs.iter().cloned().map(|tx| (tx, block.id())));
|
||||
block_entry.append(&mut txs_in_block);
|
||||
}
|
||||
|
||||
fn prune(&mut self, txs: &[Self::Id]) {
|
||||
for tx_id in txs {
|
||||
self.pending_txs.remove(tx_id);
|
||||
}
|
||||
}
|
||||
}
|
@ -1,3 +1,6 @@
|
||||
#[cfg(feature = "mock")]
|
||||
pub mod mockpool;
|
||||
|
||||
use nomos_core::block::{BlockHeader, BlockId};
|
||||
|
||||
pub trait MemPool {
|
||||
@ -19,9 +22,9 @@ pub trait MemPool {
|
||||
fn view(&self, ancestor_hint: BlockId) -> Box<dyn Iterator<Item = Self::Tx> + Send>;
|
||||
|
||||
/// Record that a set of transactions were included in a block
|
||||
fn mark_in_block(&mut self, txs: Vec<Self::Id>, block: BlockHeader);
|
||||
fn mark_in_block(&mut self, txs: &[Self::Id], block: BlockHeader);
|
||||
|
||||
/// Signal that a set of transactions can't be possibly requested anymore and can be
|
||||
/// discarded.
|
||||
fn prune(&mut self, txs: Vec<Self::Id>);
|
||||
fn prune(&mut self, txs: &[Self::Id]);
|
||||
}
|
||||
|
@ -139,9 +139,9 @@ where
|
||||
});
|
||||
}
|
||||
MempoolMsg::MarkInBlock { ids, block } => {
|
||||
pool.mark_in_block(ids, block);
|
||||
pool.mark_in_block(&ids, block);
|
||||
}
|
||||
MempoolMsg::Prune { ids } => { pool.prune(ids); },
|
||||
MempoolMsg::Prune { ids } => { pool.prune(&ids); },
|
||||
}
|
||||
}
|
||||
Some(tx) = network_txs.next() => {
|
||||
|
Loading…
x
Reference in New Issue
Block a user