Make mempool generic (#428)

* Make mempool item generic

Make the mempool generic with respect to the item and remove
mentions of specific transaction formats/traits. This will allow
us to reuse the same code for both coordination layer transactions
and certificates, or in general, whatever items need to be included
in a block.

* Add mempool network adapter settings

Allow for greater customization of the mempool network adapter by
adding a settings field.

* update node after mempool changes

* fix waku mempool adapter

* fmt

* fix tests

* fmt
This commit is contained in:
Giacomo Pasini 2023-09-26 11:14:44 +02:00 committed by GitHub
parent 95618c0a72
commit 03973cd422
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 297 additions and 220 deletions

View File

@ -1,4 +1,5 @@
mod libp2p;
use libp2p::*;
// std
// crates
@ -9,24 +10,20 @@ use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
use tracing::error;
// internal
use nomos_core::tx::Transaction;
use nomos_http::backends::axum::AxumBackend;
use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner};
use nomos_http::http::{HttpMethod, HttpRequest, HttpResponse};
use nomos_mempool::backend::mockpool::MockPool;
use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService};
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter;
use nomos_mempool::network::NetworkAdapter;
use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService};
use nomos_network::backends::libp2p::Libp2p;
use nomos_network::backends::NetworkBackend;
use nomos_network::NetworkService;
use nomos_node::{Carnot, Tx};
use overwatch_rs::services::relay::OutboundRelay;
use libp2p::*;
use nomos_mempool::network::NetworkAdapter;
use nomos_network::backends::NetworkBackend;
macro_rules! get_handler {
($handle:expr, $service:ty, $path:expr => $handler:tt) => {{
let (channel, mut http_request_channel) =
@ -54,7 +51,7 @@ pub fn mempool_metrics_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
get_handler!(handle, MempoolService<Libp2pAdapter<Tx>, MockPool<Tx>>, "metrics" => handle_mempool_metrics_req)
get_handler!(handle, MempoolService<Libp2pAdapter<Tx, <Tx as Transaction>::Hash>, MockPool<Tx, <Tx as Transaction>::Hash>>, "metrics" => handle_mempool_metrics_req)
}))
}
@ -66,19 +63,24 @@ pub fn network_info_bridge(
}))
}
pub fn mempool_add_tx_bridge<
N: NetworkBackend,
A: NetworkAdapter<Backend = N, Tx = Tx> + Send + Sync + 'static,
>(
pub fn mempool_add_tx_bridge<N, A>(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
) -> HttpBridgeRunner
where
N: NetworkBackend,
A: NetworkAdapter<Backend = N, Item = Tx, Key = <Tx as Transaction>::Hash>
+ Send
+ Sync
+ 'static,
A::Settings: Send + Sync,
{
Box::new(Box::pin(async move {
let (mempool_channel, mut http_request_channel) =
build_http_bridge::<MempoolService<A, MockPool<Tx>>, AxumBackend, _>(
handle.clone(),
HttpMethod::POST,
"addtx",
)
build_http_bridge::<
MempoolService<A, MockPool<Tx, <Tx as Transaction>::Hash>>,
AxumBackend,
_,
>(handle.clone(), HttpMethod::POST, "addtx")
.await
.unwrap();
@ -114,7 +116,7 @@ async fn handle_carnot_info_req(
}
async fn handle_mempool_metrics_req(
mempool_channel: &OutboundRelay<MempoolMsg<Tx>>,
mempool_channel: &OutboundRelay<MempoolMsg<Tx, <Tx as Transaction>::Hash>>,
res_tx: Sender<HttpResponse>,
) -> Result<(), overwatch_rs::DynError> {
let (sender, receiver) = oneshot::channel();
@ -129,8 +131,8 @@ async fn handle_mempool_metrics_req(
res_tx
// TODO: use serde to serialize metrics
.send(Ok(format!(
"{{\"pending_tx\": {}, \"last_tx\": {}}}",
metrics.pending_txs, metrics.last_tx_timestamp
"{{\"pending_items\": {}, \"last_item\": {}}}",
metrics.pending_items, metrics.last_item_timestamp
)
.into()))
.await?;
@ -140,7 +142,7 @@ async fn handle_mempool_metrics_req(
pub(super) async fn handle_mempool_add_tx_req(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
mempool_channel: &OutboundRelay<MempoolMsg<Tx>>,
mempool_channel: &OutboundRelay<MempoolMsg<Tx, <Tx as Transaction>::Hash>>,
res_tx: Sender<HttpResponse>,
payload: Option<Bytes>,
) -> Result<(), overwatch_rs::DynError> {
@ -151,8 +153,9 @@ pub(super) async fn handle_mempool_add_tx_req(
let tx = Tx(data);
let (sender, receiver) = oneshot::channel();
mempool_channel
.send(MempoolMsg::AddTx {
tx: tx.clone(),
.send(MempoolMsg::Add {
item: tx.clone(),
key: tx.hash(),
reply_channel: sender,
})
.await

View File

@ -10,7 +10,7 @@ use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsServic
use nomos_consensus::network::adapters::libp2p::Libp2pAdapter as ConsensusLibp2pAdapter;
use nomos_consensus::CarnotConsensus;
use nomos_core::tx::Transaction;
use nomos_da::{
backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaLibp2pAdapter,
DataAvailabilityService,
@ -39,8 +39,8 @@ const MB16: usize = 1024 * 1024 * 16;
pub type Carnot = CarnotConsensus<
ConsensusLibp2pAdapter,
MockPool<Tx>,
MempoolLibp2pAdapter<Tx>,
MockPool<Tx, <Tx as Transaction>::Hash>,
MempoolLibp2pAdapter<Tx, <Tx as Transaction>::Hash>,
FlatOverlay<RoundRobin, RandomBeaconState>,
Certificate,
FillSizeWithTx<MB16, Tx>,
@ -53,11 +53,16 @@ type DataAvailability = DataAvailabilityService<
DaLibp2pAdapter<Blob, Attestation>,
>;
type Mempool = MempoolService<
MempoolLibp2pAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<Tx, <Tx as Transaction>::Hash>,
>;
#[derive(Services)]
pub struct Nomos {
logging: ServiceHandle<Logger>,
network: ServiceHandle<NetworkService<Libp2p>>,
mockpool: ServiceHandle<MempoolService<MempoolLibp2pAdapter<Tx>, MockPool<Tx>>>,
mockpool: ServiceHandle<Mempool>,
consensus: ServiceHandle<Carnot>,
http: ServiceHandle<HttpService<AxumBackend>>,
bridges: ServiceHandle<HttpBridgeService>,

View File

@ -8,11 +8,13 @@ mod bridges;
use clap::Parser;
use color_eyre::eyre::{eyre, Result};
use nomos_http::bridge::{HttpBridge, HttpBridgeSettings};
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter;
use nomos_mempool::network::adapters::libp2p::{Libp2pAdapter, Settings as AdapterSettings};
use nomos_network::backends::libp2p::Libp2p;
use overwatch_rs::overwatch::*;
use std::sync::Arc;
use nomos_core::tx::Transaction;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
@ -56,7 +58,7 @@ fn main() -> Result<()> {
Arc::new(Box::new(bridges::mempool_metrics_bridge)),
Arc::new(Box::new(bridges::network_info_bridge)),
Arc::new(Box::new(
bridges::mempool_add_tx_bridge::<Libp2p, Libp2pAdapter<Tx>>,
bridges::mempool_add_tx_bridge::<Libp2p, Libp2pAdapter<Tx, <Tx as Transaction>::Hash>>,
)),
];
let app = OverwatchRunner::<Nomos>::run(
@ -64,7 +66,13 @@ fn main() -> Result<()> {
network: config.network,
logging: config.log,
http: config.http,
mockpool: (),
mockpool: nomos_mempool::Settings {
backend: (),
network: AdapterSettings {
topic: String::from("tx"),
id: <Tx as Transaction>::hash,
},
},
consensus: config.consensus,
bridges: HttpBridgeSettings { bridges },
#[cfg(feature = "metrics")]

View File

@ -106,13 +106,13 @@ impl<O: Overlay, Ts, Bs> CarnotSettings<O, Ts, Bs> {
pub struct CarnotConsensus<A, P, M, O, C, TxS, BS>
where
A: NetworkAdapter,
M: MempoolAdapter<Tx = P::Tx>,
M: MempoolAdapter<Item = P::Item, Key = P::Key>,
P: MemPool,
O: Overlay + Debug,
P::Tx: Transaction + Debug + 'static,
<P::Tx as Transaction>::Hash: Debug,
P::Item: Debug + 'static,
P::Key: Debug + 'static,
A::Backend: 'static,
TxS: TxSelect<Tx = P::Tx>,
TxS: TxSelect<Tx = P::Item>,
BS: BlobCertificateSelect<Certificate = C>,
{
service_state: ServiceStateHandle<Self>,
@ -129,11 +129,11 @@ impl<A, P, M, O, C, TxS, BS> ServiceData for CarnotConsensus<A, P, M, O, C, TxS,
where
A: NetworkAdapter,
P: MemPool,
P::Tx: Transaction + Debug,
<P::Tx as Transaction>::Hash: Debug,
M: MempoolAdapter<Tx = P::Tx>,
P::Item: Debug,
P::Key: Debug,
M: MempoolAdapter<Item = P::Item, Key = P::Key>,
O: Overlay + Debug,
TxS: TxSelect<Tx = P::Tx>,
TxS: TxSelect<Tx = P::Item>,
BS: BlobCertificateSelect<Certificate = C>,
{
const SERVICE_ID: ServiceId = "Carnot";
@ -149,9 +149,16 @@ where
A: NetworkAdapter + Clone + Send + Sync + 'static,
P: MemPool + Send + Sync + 'static,
P::Settings: Send + Sync + 'static,
P::Tx:
Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
<P::Tx as Transaction>::Hash: Debug + Send + Sync,
P::Item: Transaction
+ Debug
+ Clone
+ Eq
+ Hash
+ Serialize
+ serde::de::DeserializeOwned
+ Send
+ Sync
+ 'static,
C: Certificate
+ Debug
+ Clone
@ -162,11 +169,12 @@ where
+ Send
+ Sync
+ 'static,
M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static,
P::Key: Debug + Send + Sync,
M: MempoolAdapter<Item = P::Item, Key = P::Key> + Send + Sync + 'static,
O: Overlay + Debug + Send + Sync + 'static,
O::LeaderSelection: UpdateableLeaderSelection,
O::CommitteeMembership: UpdateableCommitteeMembership,
TxS: TxSelect<Tx = P::Tx> + Clone + Send + Sync + 'static,
TxS: TxSelect<Tx = P::Item> + Clone + Send + Sync + 'static,
TxS::Settings: Send + Sync + 'static,
BS: BlobCertificateSelect<Certificate = C> + Clone + Send + Sync + 'static,
BS::Settings: Send + Sync + 'static,
@ -310,9 +318,16 @@ where
A: NetworkAdapter + Clone + Send + Sync + 'static,
P: MemPool + Send + Sync + 'static,
P::Settings: Send + Sync + 'static,
P::Tx:
Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
<P::Tx as Transaction>::Hash: Debug + Send + Sync,
P::Item: Transaction
+ Debug
+ Clone
+ Eq
+ Hash
+ Serialize
+ serde::de::DeserializeOwned
+ Send
+ Sync
+ 'static,
C: Certificate
+ Debug
+ Clone
@ -323,12 +338,17 @@ where
+ Send
+ Sync
+ 'static,
M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static,
O: Overlay + Debug + Send + Sync + 'static,
O::LeaderSelection: UpdateableLeaderSelection,
O::CommitteeMembership: UpdateableCommitteeMembership,
TxS: TxSelect<Tx = P::Tx> + Clone + Send + Sync + 'static,
TxS: TxSelect<Tx = P::Item> + Clone + Send + Sync + 'static,
BS: BlobCertificateSelect<Certificate = C> + Clone + Send + Sync + 'static,
P::Key: Debug + Send + Sync,
M: MempoolAdapter<Item = P::Item, Key = P::Key> + Send + Sync + 'static,
O: Overlay + Debug + Send + Sync + 'static,
O::LeaderSelection: UpdateableLeaderSelection,
O::CommitteeMembership: UpdateableCommitteeMembership,
TxS: TxSelect<Tx = P::Item> + Clone + Send + Sync + 'static,
{
fn process_message(carnot: &Carnot<O>, msg: ConsensusMsg) {
match msg {
@ -352,11 +372,11 @@ where
#[allow(clippy::too_many_arguments)]
async fn process_carnot_event(
mut carnot: Carnot<O>,
event: Event<P::Tx, C>,
task_manager: &mut TaskManager<View, Event<P::Tx, C>>,
event: Event<P::Item, C>,
task_manager: &mut TaskManager<View, Event<P::Item, C>>,
adapter: A,
private_key: PrivateKey,
mempool_relay: OutboundRelay<MempoolMsg<P::Tx>>,
mempool_relay: OutboundRelay<MempoolMsg<P::Item, P::Key>>,
tx_selector: TxS,
blobl_selector: BS,
timeout: Duration,
@ -372,7 +392,7 @@ where
tracing::debug!("approving proposal {:?}", block);
let (new_carnot, out) = carnot.approve_block(block);
carnot = new_carnot;
output = Some(Output::Send::<P::Tx, C>(out));
output = Some(Output::Send::<P::Item, C>(out));
}
Event::LocalTimeout { view } => {
tracing::debug!("local timeout");
@ -442,11 +462,11 @@ where
#[instrument(level = "debug", skip(adapter, task_manager, stream))]
async fn process_block(
mut carnot: Carnot<O>,
block: Block<P::Tx, C>,
mut stream: Pin<Box<dyn Stream<Item = Block<P::Tx, C>> + Send>>,
task_manager: &mut TaskManager<View, Event<P::Tx, C>>,
block: Block<P::Item, C>,
mut stream: Pin<Box<dyn Stream<Item = Block<P::Item, C>> + Send>>,
task_manager: &mut TaskManager<View, Event<P::Item, C>>,
adapter: A,
) -> (Carnot<O>, Option<Output<P::Tx, C>>) {
) -> (Carnot<O>, Option<Output<P::Item, C>>) {
tracing::debug!("received proposal {:?}", block);
if carnot.highest_voted_view() >= block.header().view {
tracing::debug!("already voted for view {}", block.header().view);
@ -522,9 +542,9 @@ where
carnot: Carnot<O>,
timeout_qc: TimeoutQc,
new_views: HashSet<NewView>,
task_manager: &mut TaskManager<View, Event<P::Tx, C>>,
task_manager: &mut TaskManager<View, Event<P::Item, C>>,
adapter: A,
) -> (Carnot<O>, Option<Output<P::Tx, C>>) {
) -> (Carnot<O>, Option<Output<P::Item, C>>) {
let leader_committee = [carnot.id()].into_iter().collect();
let leader_tally_settings = CarnotTallySettings {
threshold: carnot.leader_super_majority_threshold(),
@ -559,9 +579,9 @@ where
async fn receive_timeout_qc(
carnot: Carnot<O>,
timeout_qc: TimeoutQc,
task_manager: &mut TaskManager<View, Event<P::Tx, C>>,
task_manager: &mut TaskManager<View, Event<P::Item, C>>,
adapter: A,
) -> (Carnot<O>, Option<Output<P::Tx, C>>) {
) -> (Carnot<O>, Option<Output<P::Item, C>>) {
let mut new_state = carnot.receive_timeout_qc(timeout_qc.clone());
let self_committee = carnot.self_committee();
let tally_settings = CarnotTallySettings {
@ -586,7 +606,7 @@ where
async fn process_root_timeout(
carnot: Carnot<O>,
timeouts: HashSet<Timeout>,
) -> (Carnot<O>, Option<Output<P::Tx, C>>) {
) -> (Carnot<O>, Option<Output<P::Item, C>>) {
// we might have received a timeout_qc sent by some other node and advanced the view
// already, in which case we should ignore the timeout
if carnot.current_view()
@ -630,8 +650,8 @@ where
qc: Qc,
tx_selector: TxS,
blob_selector: BS,
mempool_relay: OutboundRelay<MempoolMsg<P::Tx>>,
) -> Option<Output<P::Tx, C>> {
mempool_relay: OutboundRelay<MempoolMsg<P::Item, P::Key>>,
) -> Option<Output<P::Item, C>> {
let (reply_channel, rx) = tokio::sync::oneshot::channel();
let mut output = None;
mempool_relay
@ -666,7 +686,7 @@ where
async fn process_view_change(
carnot: Carnot<O>,
prev_view: View,
task_manager: &mut TaskManager<View, Event<P::Tx, C>>,
task_manager: &mut TaskManager<View, Event<P::Item, C>>,
adapter: A,
timeout: Duration,
) {
@ -703,7 +723,7 @@ where
}
}
async fn gather_timeout_qc(adapter: A, view: consensus_engine::View) -> Event<P::Tx, C> {
async fn gather_timeout_qc(adapter: A, view: consensus_engine::View) -> Event<P::Item, C> {
if let Some(timeout_qc) = adapter
.timeout_qc_stream(view)
.await
@ -723,7 +743,7 @@ where
committee: Committee,
block: consensus_engine::Block,
tally: CarnotTallySettings,
) -> Event<P::Tx, C> {
) -> Event<P::Item, C> {
let tally = CarnotTally::new(tally);
let votes_stream = adapter.votes_stream(&committee, block.view, block.id).await;
match tally.tally(block.clone(), votes_stream).await {
@ -740,7 +760,7 @@ where
committee: Committee,
timeout_qc: TimeoutQc,
tally: CarnotTallySettings,
) -> Event<P::Tx, C> {
) -> Event<P::Item, C> {
let tally = NewViewTally::new(tally);
let stream = adapter
.new_view_stream(&committee, timeout_qc.view().next())
@ -762,7 +782,7 @@ where
committee: Committee,
view: consensus_engine::View,
tally: CarnotTallySettings,
) -> Event<P::Tx, C> {
) -> Event<P::Item, C> {
let tally = TimeoutTally::new(tally);
let stream = adapter.timeout_stream(&committee, view).await;
match tally.tally(view, stream).await {
@ -774,7 +794,7 @@ where
}
#[instrument(level = "debug", skip(adapter))]
async fn gather_block(adapter: A, view: consensus_engine::View) -> Event<P::Tx, C> {
async fn gather_block(adapter: A, view: consensus_engine::View) -> Event<P::Item, C> {
let stream = adapter
.proposal_chunks_stream(view)
.await

View File

@ -7,58 +7,57 @@ use std::{collections::BTreeMap, time::UNIX_EPOCH};
// internal
use crate::backend::{MemPool, MempoolError};
use nomos_core::block::BlockId;
use nomos_core::tx::Transaction;
/// A mock mempool implementation that stores all transactions in memory in the order received.
pub struct MockPool<Tx: Transaction>
where
Tx::Hash: Hash,
{
pending_txs: LinkedHashMap<Tx::Hash, Tx>,
in_block_txs: BTreeMap<BlockId, Vec<Tx>>,
in_block_txs_by_id: BTreeMap<Tx::Hash, BlockId>,
last_tx_timestamp: u64,
pub struct MockPool<Item, Key> {
pending_items: LinkedHashMap<Key, Item>,
in_block_items: BTreeMap<BlockId, Vec<Item>>,
in_block_items_by_id: BTreeMap<Key, BlockId>,
last_item_timestamp: u64,
}
impl<Tx: Transaction> Default for MockPool<Tx> {
impl<Item, Key> Default for MockPool<Item, Key>
where
Key: Hash + Eq,
{
fn default() -> Self {
Self {
pending_txs: LinkedHashMap::new(),
in_block_txs: BTreeMap::new(),
in_block_txs_by_id: BTreeMap::new(),
last_tx_timestamp: 0,
pending_items: LinkedHashMap::new(),
in_block_items: BTreeMap::new(),
in_block_items_by_id: BTreeMap::new(),
last_item_timestamp: 0,
}
}
}
impl<Tx: Transaction> MockPool<Tx>
impl<Item, Key> MockPool<Item, Key>
where
Tx::Hash: Ord,
Key: Hash + Eq + Clone,
{
pub fn new() -> Self {
Default::default()
}
}
impl<Tx> MemPool for MockPool<Tx>
impl<Item, Key> MemPool for MockPool<Item, Key>
where
Tx: Transaction + Clone + Send + Sync + 'static + Hash,
Tx::Hash: Ord,
Item: Clone + Send + Sync + 'static + Hash,
Key: Clone + Ord + Hash,
{
type Settings = ();
type Tx = Tx;
type Item = Item;
type Key = Key;
fn new(_settings: Self::Settings) -> Self {
Self::new()
}
fn add_tx(&mut self, tx: Self::Tx) -> Result<(), MempoolError> {
let id = <Self::Tx as Transaction>::hash(&tx);
if self.pending_txs.contains_key(&id) || self.in_block_txs_by_id.contains_key(&id) {
return Err(MempoolError::ExistingTx);
fn add_item(&mut self, key: Self::Key, item: Self::Item) -> Result<(), MempoolError> {
if self.pending_items.contains_key(&key) || self.in_block_items_by_id.contains_key(&key) {
return Err(MempoolError::ExistingItem);
}
self.pending_txs.insert(id, tx);
self.last_tx_timestamp = SystemTime::now()
self.pending_items.insert(key, item);
self.last_item_timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
@ -66,47 +65,44 @@ where
Ok(())
}
fn view(&self, _ancestor_hint: BlockId) -> Box<dyn Iterator<Item = Self::Tx> + Send> {
fn view(&self, _ancestor_hint: BlockId) -> Box<dyn Iterator<Item = Self::Item> + Send> {
// we need to have an owned version of the iterator to bypass adding a lifetime bound to the return iterator type
#[allow(clippy::needless_collect)]
let pending_txs: Vec<Tx> = self.pending_txs.values().cloned().collect();
Box::new(pending_txs.into_iter())
let pending_items: Vec<Item> = self.pending_items.values().cloned().collect();
Box::new(pending_items.into_iter())
}
fn mark_in_block(&mut self, txs: &[<Self::Tx as Transaction>::Hash], block: BlockId) {
let mut txs_in_block = Vec::with_capacity(txs.len());
for tx_id in txs.iter() {
if let Some(tx) = self.pending_txs.remove(tx_id) {
txs_in_block.push(tx);
fn mark_in_block(&mut self, keys: &[Self::Key], block: BlockId) {
let mut items_in_block = Vec::with_capacity(keys.len());
for key in keys {
if let Some(item) = self.pending_items.remove(key) {
items_in_block.push(item);
}
}
let block_entry = self.in_block_txs.entry(block).or_default();
self.in_block_txs_by_id
.extend(txs.iter().cloned().map(|tx| (tx, block)));
block_entry.append(&mut txs_in_block);
let block_entry = self.in_block_items.entry(block).or_default();
self.in_block_items_by_id
.extend(keys.iter().cloned().map(|key| (key, block)));
block_entry.append(&mut items_in_block);
}
#[cfg(test)]
fn block_transactions(
&self,
block: BlockId,
) -> Option<Box<dyn Iterator<Item = Self::Tx> + Send>> {
self.in_block_txs.get(&block).map(|txs| {
Box::new(txs.clone().into_iter()) as Box<dyn Iterator<Item = Self::Tx> + Send>
fn block_items(&self, block: BlockId) -> Option<Box<dyn Iterator<Item = Self::Item> + Send>> {
self.in_block_items.get(&block).map(|items| {
Box::new(items.clone().into_iter()) as Box<dyn Iterator<Item = Self::Item> + Send>
})
}
fn prune(&mut self, txs: &[<Self::Tx as Transaction>::Hash]) {
for tx_id in txs {
self.pending_txs.remove(tx_id);
fn prune(&mut self, keys: &[Self::Key]) {
for key in keys {
self.pending_items.remove(key);
}
}
fn pending_tx_count(&self) -> usize {
self.pending_txs.len()
fn pending_item_count(&self) -> usize {
self.pending_items.len()
}
fn last_tx_timestamp(&self) -> u64 {
self.last_tx_timestamp
fn last_item_timestamp(&self) -> u64 {
self.last_item_timestamp
}
}

View File

@ -2,47 +2,44 @@
pub mod mockpool;
use nomos_core::block::BlockId;
use nomos_core::tx::Transaction;
#[derive(thiserror::Error, Debug)]
pub enum MempoolError {
#[error("Tx already in mempool")]
ExistingTx,
#[error("Item already in mempool")]
ExistingItem,
#[error(transparent)]
DynamicPoolError(#[from] overwatch_rs::DynError),
}
pub trait MemPool {
type Settings: Clone;
type Tx: Transaction;
type Item;
type Key;
/// Construct a new empty pool
fn new(settings: Self::Settings) -> Self;
/// Add a new transaction to the mempool, for example because we received it from the network
fn add_tx(&mut self, tx: Self::Tx) -> Result<(), MempoolError>;
/// Add a new item to the mempool, for example because we received it from the network
fn add_item(&mut self, key: Self::Key, item: Self::Item) -> Result<(), MempoolError>;
/// Return a view over the transactions contained in the mempool.
/// Implementations should provide *at least* all the transactions which have not been marked as
/// Return a view over items contained in the mempool.
/// Implementations should provide *at least* all the items which have not been marked as
/// in a block.
/// The hint on the ancestor *can* be used by the implementation to display additional
/// transactions that were not included up to that point if available.
fn view(&self, ancestor_hint: BlockId) -> Box<dyn Iterator<Item = Self::Tx> + Send>;
/// items that were not included up to that point if available.
fn view(&self, ancestor_hint: BlockId) -> Box<dyn Iterator<Item = Self::Item> + Send>;
/// Record that a set of transactions were included in a block
fn mark_in_block(&mut self, txs: &[<Self::Tx as Transaction>::Hash], block: BlockId);
/// Record that a set of items were included in a block
fn mark_in_block(&mut self, items: &[Self::Key], block: BlockId);
/// Returns all of the transactions for the block
#[cfg(test)]
fn block_transactions(
&self,
block: BlockId,
) -> Option<Box<dyn Iterator<Item = Self::Tx> + Send>>;
fn block_items(&self, block: BlockId) -> Option<Box<dyn Iterator<Item = Self::Item> + Send>>;
/// Signal that a set of transactions can't be possibly requested anymore and can be
/// discarded.
fn prune(&mut self, txs: &[<Self::Tx as Transaction>::Hash]);
fn prune(&mut self, items: &[Self::Key]);
fn pending_tx_count(&self) -> usize;
fn last_tx_timestamp(&self) -> u64;
fn pending_item_count(&self) -> usize;
fn last_item_timestamp(&self) -> u64;
}

View File

@ -11,7 +11,6 @@ use tokio::sync::oneshot::Sender;
use crate::network::NetworkAdapter;
use backend::MemPool;
use nomos_core::block::BlockId;
use nomos_core::tx::Transaction;
use nomos_network::NetworkService;
use overwatch_rs::services::{
handle::ServiceStateHandle,
@ -22,11 +21,11 @@ use overwatch_rs::services::{
pub struct MempoolService<N, P>
where
N: NetworkAdapter<Tx = P::Tx>,
N: NetworkAdapter<Item = P::Item, Key = P::Key>,
P: MemPool,
P::Settings: Clone,
P::Tx: Debug + 'static,
<P::Tx as Transaction>::Hash: Debug,
P::Item: Debug + 'static,
P::Key: Debug + 'static,
{
service_state: ServiceStateHandle<Self>,
network_relay: Relay<NetworkService<N::Backend>>,
@ -34,29 +33,30 @@ where
}
pub struct MempoolMetrics {
pub pending_txs: usize,
pub last_tx_timestamp: u64,
pub pending_items: usize,
pub last_item_timestamp: u64,
}
pub enum MempoolMsg<Tx: Transaction> {
AddTx {
tx: Tx,
pub enum MempoolMsg<Item, Key> {
Add {
item: Item,
key: Key,
reply_channel: Sender<Result<(), ()>>,
},
View {
ancestor_hint: BlockId,
reply_channel: Sender<Box<dyn Iterator<Item = Tx> + Send>>,
reply_channel: Sender<Box<dyn Iterator<Item = Item> + Send>>,
},
Prune {
ids: Vec<Tx::Hash>,
ids: Vec<Key>,
},
#[cfg(test)]
BlockTransaction {
BlockItems {
block: BlockId,
reply_channel: Sender<Option<Box<dyn Iterator<Item = Tx> + Send>>>,
reply_channel: Sender<Option<Box<dyn Iterator<Item = Item> + Send>>>,
},
MarkInBlock {
ids: Vec<Tx::Hash>,
ids: Vec<Key>,
block: BlockId,
},
Metrics {
@ -64,16 +64,17 @@ pub enum MempoolMsg<Tx: Transaction> {
},
}
impl<Tx: Transaction + Debug> Debug for MempoolMsg<Tx>
impl<Item, Key> Debug for MempoolMsg<Item, Key>
where
Tx::Hash: Debug,
Item: Debug,
Key: Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
match self {
Self::View { ancestor_hint, .. } => {
write!(f, "MempoolMsg::View {{ ancestor_hint: {ancestor_hint:?}}}")
}
Self::AddTx { tx, .. } => write!(f, "MempoolMsg::AddTx{{tx: {tx:?}}}"),
Self::Add { item, .. } => write!(f, "MempoolMsg::Add{{item: {item:?}}}"),
Self::Prune { ids } => write!(f, "MempoolMsg::Prune{{ids: {ids:?}}}"),
Self::MarkInBlock { ids, block } => {
write!(
@ -82,29 +83,29 @@ where
)
}
#[cfg(test)]
Self::BlockTransaction { block, .. } => {
write!(f, "MempoolMsg::BlockTransaction{{block: {block:?}}}")
Self::BlockItems { block, .. } => {
write!(f, "MempoolMsg::BlockItem{{block: {block:?}}}")
}
Self::Metrics { .. } => write!(f, "MempoolMsg::Metrics"),
}
}
}
impl<Tx: Transaction + 'static> RelayMessage for MempoolMsg<Tx> {}
impl<Item: 'static, Key: 'static> RelayMessage for MempoolMsg<Item, Key> {}
impl<N, P> ServiceData for MempoolService<N, P>
where
N: NetworkAdapter<Tx = P::Tx>,
N: NetworkAdapter<Item = P::Item, Key = P::Key>,
P: MemPool,
P::Settings: Clone,
P::Tx: Debug + 'static,
<P::Tx as Transaction>::Hash: Debug,
P::Item: Debug + 'static,
P::Key: Debug + 'static,
{
const SERVICE_ID: ServiceId = "Mempool";
type Settings = P::Settings;
type Settings = Settings<P::Settings, N::Settings>;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = MempoolMsg<<P as MemPool>::Tx>;
type Message = MempoolMsg<<P as MemPool>::Item, <P as MemPool>::Key>;
}
#[async_trait::async_trait]
@ -112,17 +113,18 @@ impl<N, P> ServiceCore for MempoolService<N, P>
where
P: MemPool + Send + 'static,
P::Settings: Clone + Send + Sync + 'static,
P::Tx: Transaction + Clone + Debug + Send + Sync + 'static,
<P::Tx as Transaction>::Hash: Debug + Send + Sync + 'static,
N: NetworkAdapter<Tx = P::Tx> + Send + Sync + 'static,
N::Settings: Clone + Send + Sync + 'static,
P::Item: Clone + Debug + Send + Sync + 'static,
P::Key: Debug + Send + Sync + 'static,
N: NetworkAdapter<Item = P::Item, Key = P::Key> + Send + Sync + 'static,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay();
let pool_settings = service_state.settings_reader.get_updated_settings();
let settings = service_state.settings_reader.get_updated_settings();
Ok(Self {
service_state,
network_relay,
pool: P::new(pool_settings),
pool: P::new(settings.backend),
})
}
@ -138,15 +140,20 @@ where
.await
.expect("Relay connection with NetworkService should succeed");
let adapter = N::new(network_relay).await;
let mut network_txs = adapter.transactions_stream().await;
let adapter = N::new(
service_state.settings_reader.get_updated_settings().network,
network_relay,
);
let adapter = adapter.await;
let mut network_items = adapter.transactions_stream().await;
loop {
tokio::select! {
Some(msg) = service_state.inbound_relay.recv() => {
match msg {
MempoolMsg::AddTx { tx, reply_channel } => {
match pool.add_tx(tx.clone()) {
MempoolMsg::Add { item, key, reply_channel } => {
match pool.add_item(key, item) {
Ok(_id) => {
if let Err(e) = reply_channel.send(Ok(())) {
tracing::debug!("Failed to send reply to AddTx: {:?}", e);
@ -166,16 +173,16 @@ where
pool.mark_in_block(&ids, block);
}
#[cfg(test)]
MempoolMsg::BlockTransaction { block, reply_channel } => {
reply_channel.send(pool.block_transactions(block)).unwrap_or_else(|_| {
tracing::debug!("could not send back block transactions")
MempoolMsg::BlockItems { block, reply_channel } => {
reply_channel.send(pool.block_items(block)).unwrap_or_else(|_| {
tracing::debug!("could not send back block items")
});
}
MempoolMsg::Prune { ids } => { pool.prune(&ids); },
MempoolMsg::Metrics { reply_channel } => {
let metrics = MempoolMetrics {
pending_txs: pool.pending_tx_count(),
last_tx_timestamp: pool.last_tx_timestamp(),
pending_items: pool.pending_item_count(),
last_item_timestamp: pool.last_item_timestamp(),
};
reply_channel.send(metrics).unwrap_or_else(|_| {
tracing::debug!("could not send back mempool metrics")
@ -183,12 +190,18 @@ where
}
}
}
Some(tx) = network_txs.next() => {
pool.add_tx(tx).unwrap_or_else(|e| {
tracing::debug!("could not add tx to the pool due to: {}", e)
Some((key, item )) = network_items.next() => {
pool.add_item(key, item).unwrap_or_else(|e| {
tracing::debug!("could not add item to the pool due to: {}", e)
});
}
}
}
}
}
#[derive(Clone, Debug)]
pub struct Settings<B, N> {
pub backend: B,
pub network: N,
}

View File

@ -1,14 +1,9 @@
// std
use std::marker::PhantomData;
// crates
use futures::Stream;
use serde::{de::DeserializeOwned, Serialize};
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;
use tracing::log::error;
// internal
use crate::network::messages::TransactionMsg;
use crate::network::NetworkAdapter;
use nomos_core::wire;
use nomos_network::backends::libp2p::{Command, Event, EventKind, Libp2p, Message, TopicHash};
@ -18,20 +13,24 @@ use overwatch_rs::services::ServiceData;
pub const CARNOT_TX_TOPIC: &str = "CarnotTx";
pub struct Libp2pAdapter<Tx> {
pub struct Libp2pAdapter<Item, Key> {
network_relay: OutboundRelay<<NetworkService<Libp2p> as ServiceData>::Message>,
_tx: PhantomData<Tx>,
settings: Settings<Key, Item>,
}
#[async_trait::async_trait]
impl<Tx> NetworkAdapter for Libp2pAdapter<Tx>
impl<Item, Key> NetworkAdapter for Libp2pAdapter<Item, Key>
where
Tx: DeserializeOwned + Serialize + Send + Sync + 'static,
Item: DeserializeOwned + Serialize + Send + Sync + 'static + Clone,
Key: Clone + Send + Sync + 'static,
{
type Backend = Libp2p;
type Tx = Tx;
type Settings = Settings<Key, Item>;
type Item = Item;
type Key = Key;
async fn new(
settings: Self::Settings,
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self {
network_relay
@ -42,11 +41,14 @@ where
.expect("Network backend should be ready");
Self {
network_relay,
_tx: PhantomData,
settings,
}
}
async fn transactions_stream(&self) -> Box<dyn Stream<Item = Self::Tx> + Unpin + Send> {
let topic_hash = TopicHash::from_raw(CARNOT_TX_TOPIC);
async fn transactions_stream(
&self,
) -> Box<dyn Stream<Item = (Self::Key, Self::Item)> + Unpin + Send> {
let topic_hash = TopicHash::from_raw(self.settings.topic.clone());
let id = self.settings.id;
let (sender, receiver) = tokio::sync::oneshot::channel();
self.network_relay
.send(NetworkMsg::Subscribe {
@ -59,10 +61,10 @@ where
Box::new(Box::pin(BroadcastStream::new(receiver).filter_map(
move |message| match message {
Ok(Event::Message(Message { data, topic, .. })) if topic == topic_hash => {
match wire::deserialize::<TransactionMsg<Tx>>(&data) {
Ok(msg) => Some(msg.tx),
match wire::deserialize::<Item>(&data) {
Ok(item) => Some((id(&item), item)),
Err(e) => {
error!("Unrecognized Tx message: {e}");
tracing::debug!("Unrecognized message: {e}");
None
}
}
@ -72,3 +74,9 @@ where
)))
}
}
#[derive(Clone, Debug)]
pub struct Settings<K, V> {
pub topic: String,
pub id: fn(&V) -> K,
}

View File

@ -2,7 +2,7 @@
// crates
use futures::{Stream, StreamExt};
use nomos_core::tx::mock::MockTransaction;
use nomos_core::tx::mock::{MockTransaction, MockTxId};
use nomos_network::backends::mock::{
EventKind, Mock, MockBackendMessage, MockContentTopic, MockMessage, NetworkEvent,
};
@ -25,9 +25,12 @@ pub struct MockAdapter {
#[async_trait::async_trait]
impl NetworkAdapter for MockAdapter {
type Backend = Mock;
type Tx = MockTransaction<MockMessage>;
type Settings = ();
type Item = MockTransaction<MockMessage>;
type Key = MockTxId;
async fn new(
_settings: Self::Settings,
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self {
// send message to boot the network producer
@ -57,7 +60,9 @@ impl NetworkAdapter for MockAdapter {
Self { network_relay }
}
async fn transactions_stream(&self) -> Box<dyn Stream<Item = Self::Tx> + Unpin + Send> {
async fn transactions_stream(
&self,
) -> Box<dyn Stream<Item = (Self::Key, Self::Item)> + Unpin + Send> {
let (sender, receiver) = tokio::sync::oneshot::channel();
if let Err((_, e)) = self
.network_relay
@ -77,7 +82,8 @@ impl NetworkAdapter for MockAdapter {
Ok(NetworkEvent::RawMessage(message)) => {
tracing::info!("Received message: {:?}", message.payload());
if message.content_topic().eq(&MOCK_TX_CONTENT_TOPIC) {
Some(MockTransaction::new(message))
let tx = MockTransaction::new(message);
Some((tx.id(), tx))
} else {
None
}

View File

@ -5,7 +5,6 @@ use futures::{Stream, StreamExt};
use serde::de::DeserializeOwned;
use tokio_stream::wrappers::BroadcastStream;
// internal
use crate::network::messages::TransactionMsg;
use crate::network::NetworkAdapter;
use nomos_core::wire;
use nomos_network::backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage};
@ -21,20 +20,24 @@ pub const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic =
pub const WAKU_CARNOT_TX_CONTENT_TOPIC: WakuContentTopic =
WakuContentTopic::new("CarnotSim", 1, "CarnotTx", Encoding::Proto);
pub struct WakuAdapter<Tx> {
pub struct WakuAdapter<Item> {
network_relay: OutboundRelay<<NetworkService<Waku> as ServiceData>::Message>,
_tx: PhantomData<Tx>,
_item: PhantomData<Item>,
}
#[async_trait::async_trait]
impl<Tx> NetworkAdapter for WakuAdapter<Tx>
impl<Item> NetworkAdapter for WakuAdapter<Item>
where
Tx: DeserializeOwned + Serialize + Send + Sync + 'static,
Item: DeserializeOwned + Serialize + Send + Sync + 'static,
{
type Backend = Waku;
type Tx = Tx;
type Settings = ();
type Item = Item;
// TODO: implement real key
type Key = ();
async fn new(
_settings: Self::Settings,
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self {
// Subscribe to the carnot pubsub topic
@ -50,10 +53,13 @@ where
};
Self {
network_relay,
_tx: Default::default(),
_item: Default::default(),
}
}
async fn transactions_stream(&self) -> Box<dyn Stream<Item = Self::Tx> + Unpin + Send> {
async fn transactions_stream(
&self,
) -> Box<dyn Stream<Item = (Self::Key, Self::Item)> + Unpin + Send> {
let (sender, receiver) = tokio::sync::oneshot::channel();
if let Err((_, _e)) = self
.network_relay
@ -71,9 +77,10 @@ where
match event {
Ok(NetworkEvent::RawMessage(message)) => {
if message.content_topic() == &WAKU_CARNOT_TX_CONTENT_TOPIC {
let tx: TransactionMsg<Self::Tx> =
let item: Self::Item =
wire::deserializer(message.payload()).deserialize().unwrap();
Some(tx.tx)
// TODO: implement real key
Some(((), item))
} else {
None
}

View File

@ -14,9 +14,15 @@ use overwatch_rs::services::ServiceData;
#[async_trait::async_trait]
pub trait NetworkAdapter {
type Backend: NetworkBackend + 'static;
type Tx: Send + Sync + 'static;
type Settings: Clone;
type Item: Send + Sync + 'static;
type Key: Send + Sync + 'static;
async fn new(
settings: Self::Settings,
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self;
async fn transactions_stream(&self) -> Box<dyn Stream<Item = Self::Tx> + Unpin + Send>;
async fn transactions_stream(
&self,
) -> Box<dyn Stream<Item = (Self::Key, Self::Item)> + Unpin + Send>;
}

View File

@ -1,4 +1,7 @@
use nomos_core::{block::BlockId, tx::mock::MockTransaction};
use nomos_core::{
block::BlockId,
tx::mock::{MockTransaction, MockTxId},
};
use nomos_log::{Logger, LoggerSettings};
use nomos_network::{
backends::mock::{Mock, MockBackendMessage, MockConfig, MockMessage},
@ -10,14 +13,16 @@ use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle};
use nomos_mempool::{
backend::mockpool::MockPool,
network::adapters::mock::{MockAdapter, MOCK_TX_CONTENT_TOPIC},
MempoolMsg, MempoolService,
MempoolMsg, MempoolService, Settings,
};
#[derive(Services)]
struct MockPoolNode {
logging: ServiceHandle<Logger>,
network: ServiceHandle<NetworkService<Mock>>,
mockpool: ServiceHandle<MempoolService<MockAdapter, MockPool<MockTransaction<MockMessage>>>>,
mockpool: ServiceHandle<
MempoolService<MockAdapter, MockPool<MockTransaction<MockMessage>, MockTxId>>,
>,
}
#[test]
@ -56,7 +61,10 @@ fn test_mockmempool() {
weights: None,
},
},
mockpool: (),
mockpool: Settings {
backend: (),
network: (),
},
logging: LoggerSettings::default(),
},
None,
@ -67,7 +75,7 @@ fn test_mockmempool() {
let network = app.handle().relay::<NetworkService<Mock>>();
let mempool = app
.handle()
.relay::<MempoolService<MockAdapter, MockPool<MockTransaction<MockMessage>>>>();
.relay::<MempoolService<MockAdapter, MockPool<MockTransaction<MockMessage>, MockTxId>>>();
app.spawn(async move {
let network_outbound = network.connect().await.unwrap();