diff --git a/nodes/nomos-node/src/api.rs b/nodes/nomos-node/src/api.rs index 20e65903..f30770c8 100644 --- a/nodes/nomos-node/src/api.rs +++ b/nodes/nomos-node/src/api.rs @@ -53,7 +53,7 @@ pub struct AxumBackend { da_status, ), components( - schemas(Status, MempoolMetrics) + schemas(Status, MempoolMetrics) ), tags( (name = "da", description = "data availibility related APIs") diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index cebd8886..c8869744 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -15,6 +15,7 @@ use bytes::Bytes; use carnot_consensus::CarnotConsensus; use nomos_api::ApiService; use nomos_core::{ + block::BlockId, da::{blob, certificate}, tx::Transaction, wire, @@ -58,9 +59,13 @@ const MB16: usize = 1024 * 1024 * 16; pub type Carnot = CarnotConsensus< ConsensusLibp2pAdapter, - MockPool::Hash>, + MockPool::Hash>, MempoolLibp2pAdapter::Hash>, - MockPool::Blob as blob::Blob>::Hash>, + MockPool< + BlockId, + Certificate, + <::Blob as blob::Blob>::Hash, + >, MempoolLibp2pAdapter< Certificate, <::Blob as blob::Blob>::Hash, @@ -77,7 +82,7 @@ pub type DataAvailability = DataAvailabilityService< DaLibp2pAdapter, >; -type Mempool = MempoolService, MockPool, D>; +type Mempool = MempoolService, MockPool, D>; #[derive(Services)] pub struct Nomos { diff --git a/nomos-services/api/src/http/cl.rs b/nomos-services/api/src/http/cl.rs index add0cddd..d0bc8437 100644 --- a/nomos-services/api/src/http/cl.rs +++ b/nomos-services/api/src/http/cl.rs @@ -1,5 +1,6 @@ use core::{fmt::Debug, hash::Hash}; +use nomos_core::block::BlockId; use nomos_core::tx::Transaction; use nomos_mempool::{ backend::mockpool::MockPool, @@ -12,7 +13,7 @@ use tokio::sync::oneshot; type ClMempoolService = MempoolService< Libp2pAdapter::Hash>, - MockPool::Hash>, + MockPool::Hash>, TxDiscriminant, >; @@ -46,7 +47,7 @@ where pub async fn cl_mempool_status( handle: &overwatch_rs::overwatch::handle::OverwatchHandle, items: Vec<::Hash>, -) -> Result, super::DynError> +) -> Result>, super::DynError> where T: Transaction + Clone diff --git a/nomos-services/api/src/http/consensus.rs b/nomos-services/api/src/http/consensus.rs index a858ebff..1655ae89 100644 --- a/nomos-services/api/src/http/consensus.rs +++ b/nomos-services/api/src/http/consensus.rs @@ -27,9 +27,13 @@ use nomos_storage::backends::{sled::SledBackend, StorageSerde}; pub type Carnot = CarnotConsensus< ConsensusLibp2pAdapter, - MockPool::Hash>, + MockPool::Hash>, MempoolLibp2pAdapter::Hash>, - MockPool::Blob as blob::Blob>::Hash>, + MockPool< + BlockId, + Certificate, + <::Blob as blob::Blob>::Hash, + >, MempoolLibp2pAdapter< Certificate, <::Blob as blob::Blob>::Hash, diff --git a/nomos-services/api/src/http/da.rs b/nomos-services/api/src/http/da.rs index 918bb854..b8bc12a7 100644 --- a/nomos-services/api/src/http/da.rs +++ b/nomos-services/api/src/http/da.rs @@ -1,4 +1,5 @@ use full_replication::{AbsoluteNumber, Attestation, Blob, Certificate, FullReplication}; +use nomos_core::block::BlockId; use nomos_core::da::blob; use nomos_da::{ backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaLibp2pAdapter, @@ -14,7 +15,7 @@ use tokio::sync::oneshot; pub type DaMempoolService = MempoolService< Libp2pAdapter::Hash>, - MockPool::Hash>, + MockPool::Hash>, CertDiscriminant, >; @@ -42,7 +43,7 @@ pub async fn da_mempool_metrics( pub async fn da_mempool_status( handle: &overwatch_rs::overwatch::handle::OverwatchHandle, items: Vec<::Hash>, -) -> Result, super::DynError> { +) -> Result>, super::DynError> { let relay = handle.relay::().connect().await?; let (sender, receiver) = oneshot::channel(); relay diff --git a/nomos-services/api/src/http/mempool.rs b/nomos-services/api/src/http/mempool.rs index 67e4bfb8..9d9c3c3d 100644 --- a/nomos-services/api/src/http/mempool.rs +++ b/nomos-services/api/src/http/mempool.rs @@ -1,5 +1,5 @@ use core::{fmt::Debug, hash::Hash}; - +use nomos_core::block::BlockId; use nomos_mempool::{ backend::mockpool::MockPool, network::NetworkAdapter, Discriminant, MempoolMsg, MempoolService, }; @@ -20,7 +20,7 @@ where Key: Clone + Debug + Ord + Hash + 'static, { let relay = handle - .relay::, D>>() + .relay::, D>>() .connect() .await?; let (sender, receiver) = oneshot::channel(); diff --git a/nomos-services/carnot-consensus/src/lib.rs b/nomos-services/carnot-consensus/src/lib.rs index 9c51862f..8e2cf48f 100644 --- a/nomos-services/carnot-consensus/src/lib.rs +++ b/nomos-services/carnot-consensus/src/lib.rs @@ -113,8 +113,8 @@ pub struct CarnotConsensus, - ClPool: MemPool, - DaPool: MemPool, + ClPool: MemPool, + DaPool: MemPool, DaPoolAdapter: MempoolAdapter, O: Overlay + Debug, ClPool::Item: Debug + 'static, @@ -140,10 +140,10 @@ impl Servi for CarnotConsensus where A: NetworkAdapter, - ClPool: MemPool, + ClPool: MemPool, ClPool::Item: Debug, ClPool::Key: Debug, - DaPool: MemPool, + DaPool: MemPool, DaPool::Item: Debug, DaPool::Key: Debug, ClPoolAdapter: MempoolAdapter, @@ -165,9 +165,9 @@ impl Servi for CarnotConsensus where A: NetworkAdapter + Clone + Send + Sync + 'static, - ClPool: MemPool + Send + Sync + 'static, + ClPool: MemPool + Send + Sync + 'static, ClPool::Settings: Send + Sync + 'static, - DaPool: MemPool + Send + Sync + 'static, + DaPool: MemPool + Send + Sync + 'static, DaPool::Settings: Send + Sync + 'static, ClPool::Item: Transaction + Debug @@ -364,9 +364,9 @@ impl CarnotConsensus where A: NetworkAdapter + Clone + Send + Sync + 'static, - ClPool: MemPool + Send + Sync + 'static, + ClPool: MemPool + Send + Sync + 'static, ClPool::Settings: Send + Sync + 'static, - DaPool: MemPool + Send + Sync + 'static, + DaPool: MemPool + Send + Sync + 'static, DaPool::Settings: Send + Sync + 'static, ClPool::Item: Transaction + Debug @@ -462,8 +462,8 @@ where task_manager: &mut TaskManager>, adapter: A, private_key: PrivateKey, - cl_mempool_relay: OutboundRelay>, - da_mempool_relay: OutboundRelay>, + cl_mempool_relay: OutboundRelay>, + da_mempool_relay: OutboundRelay>, storage_relay: OutboundRelay>, tx_selector: TxS, blobl_selector: BS, @@ -577,8 +577,8 @@ where task_manager: &mut TaskManager>, adapter: A, storage_relay: OutboundRelay>, - cl_mempool_relay: OutboundRelay>, - da_mempool_relay: OutboundRelay>, + cl_mempool_relay: OutboundRelay>, + da_mempool_relay: OutboundRelay>, ) -> (Carnot, Option>) { tracing::debug!("received proposal {:?}", block); if carnot.highest_voted_view() >= block.header().view { @@ -793,8 +793,8 @@ where qc: Qc, tx_selector: TxS, blob_selector: BS, - cl_mempool_relay: OutboundRelay>, - da_mempool_relay: OutboundRelay>, + cl_mempool_relay: OutboundRelay>, + da_mempool_relay: OutboundRelay>, ) -> Option> { let mut output = None; let cl_txs = get_mempool_contents(cl_mempool_relay); @@ -1127,7 +1127,7 @@ pub struct CarnotInfo { } async fn get_mempool_contents( - mempool: OutboundRelay>, + mempool: OutboundRelay>, ) -> Result + Send>, tokio::sync::oneshot::error::RecvError> { let (reply_channel, rx) = tokio::sync::oneshot::channel(); @@ -1143,7 +1143,7 @@ async fn get_mempool_contents( } async fn mark_in_block( - mempool: OutboundRelay>, + mempool: OutboundRelay>, ids: impl Iterator, block: BlockId, ) { diff --git a/nomos-services/mempool/src/backend/mockpool.rs b/nomos-services/mempool/src/backend/mockpool.rs index f87e7b8a..6c1de925 100644 --- a/nomos-services/mempool/src/backend/mockpool.rs +++ b/nomos-services/mempool/src/backend/mockpool.rs @@ -6,19 +6,18 @@ use std::{collections::BTreeMap, time::UNIX_EPOCH}; // crates // internal use crate::backend::{MemPool, MempoolError}; -use nomos_core::block::BlockId; use super::Status; /// A mock mempool implementation that stores all transactions in memory in the order received. -pub struct MockPool { +pub struct MockPool { pending_items: LinkedHashMap, in_block_items: BTreeMap>, in_block_items_by_id: BTreeMap, last_item_timestamp: u64, } -impl Default for MockPool +impl Default for MockPool where Key: Hash + Eq, { @@ -32,7 +31,7 @@ where } } -impl MockPool +impl MockPool where Key: Hash + Eq + Clone, { @@ -41,14 +40,16 @@ where } } -impl MemPool for MockPool +impl MemPool for MockPool where Item: Clone + Send + Sync + 'static + Hash, Key: Clone + Ord + Hash, + BlockId: Copy + Ord, { type Settings = (); type Item = Item; type Key = Key; + type BlockId = BlockId; fn new(_settings: Self::Settings) -> Self { Self::new() @@ -108,7 +109,7 @@ where self.last_item_timestamp } - fn status(&self, items: &[Self::Key]) -> Vec { + fn status(&self, items: &[Self::Key]) -> Vec> { items .iter() .map(|key| { diff --git a/nomos-services/mempool/src/backend/mod.rs b/nomos-services/mempool/src/backend/mod.rs index 727ef2a8..e97f25be 100644 --- a/nomos-services/mempool/src/backend/mod.rs +++ b/nomos-services/mempool/src/backend/mod.rs @@ -1,7 +1,6 @@ #[cfg(feature = "mock")] pub mod mockpool; -use nomos_core::block::BlockId; use serde::{Deserialize, Serialize}; #[derive(thiserror::Error, Debug)] @@ -16,6 +15,7 @@ pub trait MemPool { type Settings: Clone; type Item; type Key; + type BlockId; /// Construct a new empty pool fn new(settings: Self::Settings) -> Self; @@ -28,14 +28,17 @@ pub trait MemPool { /// in a block. /// The hint on the ancestor *can* be used by the implementation to display additional /// items that were not included up to that point if available. - fn view(&self, ancestor_hint: BlockId) -> Box + Send>; + fn view(&self, ancestor_hint: Self::BlockId) -> Box + Send>; /// Record that a set of items were included in a block - fn mark_in_block(&mut self, items: &[Self::Key], block: BlockId); + fn mark_in_block(&mut self, items: &[Self::Key], block: Self::BlockId); /// Returns all of the transactions for the block #[cfg(test)] - fn block_items(&self, block: BlockId) -> Option + Send>>; + fn block_items( + &self, + block: Self::BlockId, + ) -> Option + Send>>; /// Signal that a set of transactions can't be possibly requested anymore and can be /// discarded. @@ -46,12 +49,12 @@ pub trait MemPool { // Return the status of a set of items. // This is a best effort attempt, and implementations are free to return `Unknown` for all of them. - fn status(&self, items: &[Self::Key]) -> Vec; + fn status(&self, items: &[Self::Key]) -> Vec>; } #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -pub enum Status { +pub enum Status { /// Unknown status Unknown, /// Pending status diff --git a/nomos-services/mempool/src/lib.rs b/nomos-services/mempool/src/lib.rs index 41eadb27..2f3ab4e0 100644 --- a/nomos-services/mempool/src/lib.rs +++ b/nomos-services/mempool/src/lib.rs @@ -24,7 +24,6 @@ use tokio::sync::oneshot::Sender; // internal use crate::network::NetworkAdapter; use backend::{MemPool, Status}; -use nomos_core::block::BlockId; use nomos_network::{NetworkMsg, NetworkService}; use overwatch_rs::services::life_cycle::LifecycleMessage; use overwatch_rs::services::{ @@ -42,6 +41,7 @@ where P::Settings: Clone, P::Item: Debug + 'static, P::Key: Debug + 'static, + P::BlockId: Debug + 'static, D: Discriminant, { service_state: ServiceStateHandle, @@ -63,7 +63,7 @@ pub struct MempoolMetrics { pub last_item_timestamp: u64, } -pub enum MempoolMsg { +pub enum MempoolMsg { Add { item: Item, key: Key, @@ -90,12 +90,13 @@ pub enum MempoolMsg { }, Status { items: Vec, - reply_channel: Sender>, + reply_channel: Sender>>, }, } -impl Debug for MempoolMsg +impl Debug for MempoolMsg where + BlockId: Debug, Item: Debug, Key: Debug, { @@ -122,7 +123,10 @@ where } } -impl RelayMessage for MempoolMsg {} +impl RelayMessage + for MempoolMsg +{ +} pub struct Transaction; pub struct Certificate; @@ -146,13 +150,14 @@ where P::Settings: Clone, P::Item: Debug + 'static, P::Key: Debug + 'static, + P::BlockId: Debug + 'static, D: Discriminant, { const SERVICE_ID: ServiceId = D::ID; type Settings = Settings; type State = NoState; type StateOperator = NoOperator; - type Message = MempoolMsg<

::Item,

::Key>; + type Message = MempoolMsg<

::BlockId,

::Item,

::Key>; } #[async_trait::async_trait] @@ -163,6 +168,7 @@ where N::Settings: Clone + Send + Sync + 'static, P::Item: Clone + Debug + Send + Sync + 'static, P::Key: Debug + Send + Sync + 'static, + P::BlockId: Send + Debug + 'static, N: NetworkAdapter + Send + Sync + 'static, D: Discriminant + Send, { @@ -237,6 +243,7 @@ where N::Settings: Clone + Send + Sync + 'static, P::Item: Clone + Debug + Send + Sync + 'static, P::Key: Debug + Send + Sync + 'static, + P::BlockId: Debug + Send + 'static, N: NetworkAdapter + Send + Sync + 'static, D: Discriminant + Send, { @@ -256,7 +263,7 @@ where } async fn handle_mempool_message( - message: MempoolMsg, + message: MempoolMsg, pool: &mut P, network_relay: &mut OutboundRelay>, service_state: &mut ServiceStateHandle, diff --git a/nomos-services/mempool/src/metrics.rs b/nomos-services/mempool/src/metrics.rs index 6073d818..02825ea5 100644 --- a/nomos-services/mempool/src/metrics.rs +++ b/nomos-services/mempool/src/metrics.rs @@ -18,12 +18,12 @@ enum MempoolMsgType { MarkInBlock, } -impl From<&MempoolMsg> for MempoolMsgType +impl From<&MempoolMsg> for MempoolMsgType where I: 'static + Debug, K: 'static + Debug, { - fn from(event: &MempoolMsg) -> Self { + fn from(event: &MempoolMsg) -> Self { match event { MempoolMsg::Add { .. } => MempoolMsgType::Add, MempoolMsg::View { .. } => MempoolMsgType::View, @@ -60,7 +60,7 @@ impl Metrics { Self { messages } } - pub(crate) fn record(&self, msg: &MempoolMsg) + pub(crate) fn record(&self, msg: &MempoolMsg) where I: 'static + Debug, K: 'static + Debug, diff --git a/nomos-services/mempool/tests/mock.rs b/nomos-services/mempool/tests/mock.rs index c86bed99..5024a640 100644 --- a/nomos-services/mempool/tests/mock.rs +++ b/nomos-services/mempool/tests/mock.rs @@ -21,7 +21,11 @@ struct MockPoolNode { logging: ServiceHandle, network: ServiceHandle>, mockpool: ServiceHandle< - MempoolService, MockTxId>, Transaction>, + MempoolService< + MockAdapter, + MockPool, MockTxId>, + Transaction, + >, >, } @@ -76,7 +80,7 @@ fn test_mockmempool() { let network = app.handle().relay::>(); let mempool = app.handle().relay::, MockTxId>, + MockPool, MockTxId>, Transaction, >>();