From 539c986f6927796568d6e2f47d49df676c680029 Mon Sep 17 00:00:00 2001 From: Giacomo Pasini Date: Wed, 14 Dec 2022 15:30:45 +0100 Subject: [PATCH] Add mempool stub (#29) * add mempool stub * address review comments * move base data types to nomos-core * allow clippy warning --- Cargo.toml | 3 +- nomos-core/src/block.rs | 10 +- nomos-services/mempool/Cargo.toml | 15 +++ nomos-services/mempool/src/backend/mod.rs | 26 ++++ nomos-services/mempool/src/lib.rs | 151 ++++++++++++++++++++++ 5 files changed, 203 insertions(+), 2 deletions(-) create mode 100644 nomos-services/mempool/Cargo.toml create mode 100644 nomos-services/mempool/src/backend/mod.rs create mode 100644 nomos-services/mempool/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 1b81d9d9..ea5bf51c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,5 +5,6 @@ members = [ "nomos-services/log", "nomos-services/network", "nomos-services/storage", - "nomos-services/consensus" + "nomos-services/consensus", + "nomos-services/mempool" ] \ No newline at end of file diff --git a/nomos-core/src/block.rs b/nomos-core/src/block.rs index 52b3af66..f2fb0bf0 100644 --- a/nomos-core/src/block.rs +++ b/nomos-core/src/block.rs @@ -1,7 +1,15 @@ /// A block -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Block; +/// A block header +#[derive(Clone, Debug)] +pub struct BlockHeader; + +/// Identifier of a block +#[derive(Clone, Debug)] +pub struct BlockId; + /// A block chunk, N pieces are necessary to reconstruct the full block #[derive(Clone, Copy, Debug)] pub struct BlockChunk { diff --git a/nomos-services/mempool/Cargo.toml b/nomos-services/mempool/Cargo.toml new file mode 100644 index 00000000..c9232138 --- /dev/null +++ b/nomos-services/mempool/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "mempool" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1" +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } +nomos-network = { path = "../network", features = ["waku"] } +nomos-core = { path = "../../nomos-core" } +tracing = "0.1" +tokio = { version = "1", features = ["sync"] } +futures = "0.3" \ No newline at end of file diff --git a/nomos-services/mempool/src/backend/mod.rs b/nomos-services/mempool/src/backend/mod.rs new file mode 100644 index 00000000..4d84fe62 --- /dev/null +++ b/nomos-services/mempool/src/backend/mod.rs @@ -0,0 +1,26 @@ +use nomos_core::block::{BlockHeader, BlockId}; + +pub trait Pool { + type Tx; + type Id; + + /// Construct a new empty pool + fn new() -> Self; + + /// Add a new transaction to the mempool, for example because we received it from the network + fn add_tx(&mut self, tx: Self::Tx, id: Self::Id) -> Result<(), overwatch_rs::DynError>; + + /// Return a view over the transactions contained in the mempool. + /// Implementations should provide *at least* all the transactions which have not been marked as + /// in a block. + /// The hint on the ancestor *can* be used by the implementation to display additional + /// transactions that were not included up to that point if available. + fn view(&self, ancestor_hint: BlockId) -> Box + Send>; + + /// Record that a set of transactions were included in a block + fn mark_in_block(&mut self, txs: Vec, block: BlockHeader); + + /// Signal that a set of transactions can't be possibly requested anymore and can be + /// discarded. + fn prune(&mut self, txs: Vec); +} diff --git a/nomos-services/mempool/src/lib.rs b/nomos-services/mempool/src/lib.rs new file mode 100644 index 00000000..50e5a6f4 --- /dev/null +++ b/nomos-services/mempool/src/lib.rs @@ -0,0 +1,151 @@ +/// std +use std::fmt::{Debug, Error, Formatter}; + +/// crates +use tokio::sync::broadcast::Receiver; +use tokio::sync::oneshot::Sender; + +/// internal +pub mod backend; +use backend::Pool; +use nomos_core::block::{BlockHeader, BlockId}; +use nomos_network::{ + backends::{waku::NetworkEvent, NetworkBackend}, + NetworkService, +}; +use overwatch_rs::services::{ + handle::ServiceStateHandle, + relay::{OutboundRelay, Relay, RelayMessage}, + state::{NoOperator, NoState}, + ServiceCore, ServiceData, ServiceId, +}; + +pub struct Mempool< + N: NetworkBackend + Send + Sync + 'static, + Tx: Debug + Send + Sync + 'static, + Id: Debug + Send + Sync + 'static, + P: Pool + Send + Sync + 'static, +> { + service_state: ServiceStateHandle, + network_relay: Relay>, + pool: P, +} + +pub enum MempoolMsg { + AddTx { + id: Id, + tx: Tx, + }, + View { + ancestor_hint: BlockId, + rx: Sender + Send>>, + }, + Prune { + ids: Vec, + }, + MarkInBlock { + ids: Vec, + block: BlockHeader, + }, +} + +impl Debug for MempoolMsg { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> { + match self { + Self::View { ancestor_hint, .. } => { + write!( + f, + "MempoolMsg::View {{ ancestor_hint: {:?}}}", + ancestor_hint + ) + } + Self::AddTx { id, tx } => write!(f, "MempoolMsg::AddTx{{id: {:?}, tx: {:?}}}", id, tx), + Self::Prune { ids } => write!(f, "MempoolMsg::Prune{{ids: {:?}}}", ids), + Self::MarkInBlock { ids, block } => { + write!( + f, + "MempoolMsg::MarkInBlock{{ids: {:?}, block: {:?}}}", + ids, block + ) + } + } + } +} + +impl RelayMessage for MempoolMsg {} + +impl< + N: NetworkBackend + Send + Sync + 'static, + Tx: Debug + Send + Sync + 'static, + Id: Debug + Send + Sync + 'static, + P: Pool + Send + Sync + 'static, + > ServiceData for Mempool +{ + const SERVICE_ID: ServiceId = "Mempool"; + type Settings = (); + type State = NoState; + type StateOperator = NoOperator; + type Message = MempoolMsg; +} + +#[async_trait::async_trait] +impl ServiceCore for Mempool +where + Tx: Debug + Send + Sync + 'static, + Id: Debug + Send + Sync + 'static, + P: Pool + Send + Sync + 'static, + N: NetworkBackend + Send + Sync + 'static, +{ + fn init(service_state: ServiceStateHandle) -> Result { + let network_relay = service_state.overwatch_handle.relay(); + Ok(Self { + service_state, + network_relay, + pool: P::new(), + }) + } + + #[allow(unreachable_code, unused, clippy::diverging_sub_expression)] + async fn run(mut self) -> Result<(), overwatch_rs::DynError> { + let Self { + service_state, + network_relay, + pool, + } = self; + + let network_relay: OutboundRelay<_> = network_relay + .connect() + .await + .expect("Relay connection with NetworkService should succeed"); + + // Separate function so that we can specialize it for different network backends + let network_txs: Receiver = todo!(); + + loop { + tokio::select! { + Some(msg) = service_state.inbound_relay.recv() => { + match msg { + MempoolMsg::AddTx { id, tx } => { + pool.add_tx(tx, id).unwrap_or_else(|e| { + tracing::debug!("could not add tx to the pool due to: {}", e) + }); + } + MempoolMsg::View { ancestor_hint, rx } => { + rx.send(pool.view(ancestor_hint)).unwrap_or_else(|_| { + tracing::debug!("could not send back pool view") + }) + } + MempoolMsg::MarkInBlock { ids, block } => { + pool.mark_in_block(ids, block); + } + MempoolMsg::Prune { ids } => pool.prune(ids), + } + } + Ok(msg) = network_txs.recv() => { + // filter incoming transactions and add them to the pool + todo!() + } + } + } + } +}