Add mempool stub (#29)

* add mempool stub

* address review comments

* move base data types to nomos-core

* allow clippy warning
This commit is contained in:
Giacomo Pasini 2022-12-14 15:30:45 +01:00 committed by GitHub
parent fb3fd6f3b1
commit 539c986f69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 203 additions and 2 deletions

View File

@ -5,5 +5,6 @@ members = [
"nomos-services/log",
"nomos-services/network",
"nomos-services/storage",
"nomos-services/consensus"
"nomos-services/consensus",
"nomos-services/mempool"
]

View File

@ -1,7 +1,15 @@
/// A block
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct Block;
/// A block header
#[derive(Clone, Debug)]
pub struct BlockHeader;
/// Identifier of a block
#[derive(Clone, Debug)]
pub struct BlockId;
/// A block chunk, N pieces are necessary to reconstruct the full block
#[derive(Clone, Copy, Debug)]
pub struct BlockChunk {

View File

@ -0,0 +1,15 @@
[package]
name = "mempool"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
nomos-network = { path = "../network", features = ["waku"] }
nomos-core = { path = "../../nomos-core" }
tracing = "0.1"
tokio = { version = "1", features = ["sync"] }
futures = "0.3"

View File

@ -0,0 +1,26 @@
use nomos_core::block::{BlockHeader, BlockId};
pub trait Pool {
type Tx;
type Id;
/// Construct a new empty pool
fn new() -> Self;
/// Add a new transaction to the mempool, for example because we received it from the network
fn add_tx(&mut self, tx: Self::Tx, id: Self::Id) -> Result<(), overwatch_rs::DynError>;
/// Return a view over the transactions contained in the mempool.
/// Implementations should provide *at least* all the transactions which have not been marked as
/// in a block.
/// The hint on the ancestor *can* be used by the implementation to display additional
/// transactions that were not included up to that point if available.
fn view(&self, ancestor_hint: BlockId) -> Box<dyn Iterator<Item = Self::Tx> + Send>;
/// Record that a set of transactions were included in a block
fn mark_in_block(&mut self, txs: Vec<Self::Id>, block: BlockHeader);
/// Signal that a set of transactions can't be possibly requested anymore and can be
/// discarded.
fn prune(&mut self, txs: Vec<Self::Id>);
}

View File

@ -0,0 +1,151 @@
/// std
use std::fmt::{Debug, Error, Formatter};
/// crates
use tokio::sync::broadcast::Receiver;
use tokio::sync::oneshot::Sender;
/// internal
pub mod backend;
use backend::Pool;
use nomos_core::block::{BlockHeader, BlockId};
use nomos_network::{
backends::{waku::NetworkEvent, NetworkBackend},
NetworkService,
};
use overwatch_rs::services::{
handle::ServiceStateHandle,
relay::{OutboundRelay, Relay, RelayMessage},
state::{NoOperator, NoState},
ServiceCore, ServiceData, ServiceId,
};
pub struct Mempool<
N: NetworkBackend + Send + Sync + 'static,
Tx: Debug + Send + Sync + 'static,
Id: Debug + Send + Sync + 'static,
P: Pool<Tx = Tx, Id = Id> + Send + Sync + 'static,
> {
service_state: ServiceStateHandle<Self>,
network_relay: Relay<NetworkService<N>>,
pool: P,
}
pub enum MempoolMsg<Tx, Id> {
AddTx {
id: Id,
tx: Tx,
},
View {
ancestor_hint: BlockId,
rx: Sender<Box<dyn Iterator<Item = Tx> + Send>>,
},
Prune {
ids: Vec<Id>,
},
MarkInBlock {
ids: Vec<Id>,
block: BlockHeader,
},
}
impl<Tx: Debug, Id: Debug> Debug for MempoolMsg<Tx, Id> {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
match self {
Self::View { ancestor_hint, .. } => {
write!(
f,
"MempoolMsg::View {{ ancestor_hint: {:?}}}",
ancestor_hint
)
}
Self::AddTx { id, tx } => write!(f, "MempoolMsg::AddTx{{id: {:?}, tx: {:?}}}", id, tx),
Self::Prune { ids } => write!(f, "MempoolMsg::Prune{{ids: {:?}}}", ids),
Self::MarkInBlock { ids, block } => {
write!(
f,
"MempoolMsg::MarkInBlock{{ids: {:?}, block: {:?}}}",
ids, block
)
}
}
}
}
impl<Tx: 'static, Id: 'static> RelayMessage for MempoolMsg<Tx, Id> {}
impl<
N: NetworkBackend + Send + Sync + 'static,
Tx: Debug + Send + Sync + 'static,
Id: Debug + Send + Sync + 'static,
P: Pool<Tx = Tx, Id = Id> + Send + Sync + 'static,
> ServiceData for Mempool<N, Tx, Id, P>
{
const SERVICE_ID: ServiceId = "Mempool";
type Settings = ();
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = MempoolMsg<Tx, Id>;
}
#[async_trait::async_trait]
impl<N, Tx, Id, P> ServiceCore for Mempool<N, Tx, Id, P>
where
Tx: Debug + Send + Sync + 'static,
Id: Debug + Send + Sync + 'static,
P: Pool<Tx = Tx, Id = Id> + Send + Sync + 'static,
N: NetworkBackend + Send + Sync + 'static,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay();
Ok(Self {
service_state,
network_relay,
pool: P::new(),
})
}
#[allow(unreachable_code, unused, clippy::diverging_sub_expression)]
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
let Self {
service_state,
network_relay,
pool,
} = self;
let network_relay: OutboundRelay<_> = network_relay
.connect()
.await
.expect("Relay connection with NetworkService should succeed");
// Separate function so that we can specialize it for different network backends
let network_txs: Receiver<NetworkEvent> = todo!();
loop {
tokio::select! {
Some(msg) = service_state.inbound_relay.recv() => {
match msg {
MempoolMsg::AddTx { id, tx } => {
pool.add_tx(tx, id).unwrap_or_else(|e| {
tracing::debug!("could not add tx to the pool due to: {}", e)
});
}
MempoolMsg::View { ancestor_hint, rx } => {
rx.send(pool.view(ancestor_hint)).unwrap_or_else(|_| {
tracing::debug!("could not send back pool view")
})
}
MempoolMsg::MarkInBlock { ids, block } => {
pool.mark_in_block(ids, block);
}
MempoolMsg::Prune { ids } => pool.prune(ids),
}
}
Ok(msg) = network_txs.recv() => {
// filter incoming transactions and add them to the pool
todo!()
}
}
}
}
}