diff --git a/nodes/nomos-node/src/api.rs b/nodes/nomos-node/src/api.rs index f3a5235b..f6a201a1 100644 --- a/nodes/nomos-node/src/api.rs +++ b/nodes/nomos-node/src/api.rs @@ -22,8 +22,8 @@ use utoipa_swagger_ui::SwaggerUi; use full_replication::{Blob, Certificate}; use nomos_core::{da::blob, header::HeaderId, tx::Transaction}; use nomos_mempool::{ - network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter, openapi::Status, - MempoolMetrics, + network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter, + tx::service::openapi::Status, MempoolMetrics, }; use nomos_network::backends::libp2p::Libp2p as NetworkBackend; use nomos_storage::backends::StorageSerde; @@ -351,10 +351,9 @@ where Tx: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static, ::Hash: std::cmp::Ord + Debug + Send + Sync + 'static, { - make_request_and_return_response!(mempool::add::< + make_request_and_return_response!(mempool::add_tx::< NetworkBackend, MempoolNetworkAdapter::Hash>, - nomos_mempool::Transaction, Tx, ::Hash, >(&handle, tx, Transaction::hash)) @@ -372,10 +371,9 @@ async fn add_cert( State(handle): State, Json(cert): Json, ) -> Response { - make_request_and_return_response!(mempool::add::< + make_request_and_return_response!(mempool::add_cert::< NetworkBackend, MempoolNetworkAdapter::Hash>, - nomos_mempool::Certificate, Certificate, ::Hash, >( diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index e1517236..d740bf5f 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -24,10 +24,7 @@ use nomos_da::{ }; use nomos_log::Logger; use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter; -use nomos_mempool::{ - backend::mockpool::MockPool, Certificate as CertDiscriminant, MempoolService, - Transaction as TxDiscriminant, -}; +use nomos_mempool::{backend::mockpool::MockPool, TxMempoolService}; #[cfg(feature = "metrics")] use nomos_metrics::Metrics; use nomos_network::backends::libp2p::Libp2p as NetworkBackend; @@ -42,6 +39,7 @@ pub use nomos_core::{ da::certificate::select::FillSize as FillSizeWithBlobsCertificate, tx::select::FillSize as FillSizeWithTx, }; +use nomos_mempool::da::service::DaMempoolService; use nomos_network::NetworkService; use nomos_system_sig::SystemSig; use overwatch_derive::*; @@ -78,20 +76,29 @@ pub type DataAvailability = DataAvailabilityService< DaNetworkAdapter, >; -type Mempool = MempoolService, MockPool, D>; +pub type DaMempool = DaMempoolService< + MempoolNetworkAdapter< + Certificate, + <::Blob as blob::Blob>::Hash, + >, + MockPool< + HeaderId, + Certificate, + <::Blob as blob::Blob>::Hash, + >, +>; + +pub type TxMempool = TxMempoolService< + MempoolNetworkAdapter::Hash>, + MockPool::Hash>, +>; #[derive(Services)] pub struct Nomos { logging: ServiceHandle, network: ServiceHandle>, - cl_mempool: ServiceHandle::Hash, TxDiscriminant>>, - da_mempool: ServiceHandle< - Mempool< - Certificate, - <::Blob as blob::Blob>::Hash, - CertDiscriminant, - >, - >, + cl_mempool: ServiceHandle, + da_mempool: ServiceHandle, cryptarchia: ServiceHandle, http: ServiceHandle>>, da: ServiceHandle, diff --git a/nodes/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs index 414b56ca..bd4af96f 100644 --- a/nodes/nomos-node/src/main.rs +++ b/nodes/nomos-node/src/main.rs @@ -73,7 +73,7 @@ fn main() -> Result<()> { network: config.network, logging: config.log, http: config.http, - cl_mempool: nomos_mempool::Settings { + cl_mempool: nomos_mempool::TxMempoolSettings { backend: (), network: AdapterSettings { topic: String::from(nomos_node::CL_TOPIC), @@ -81,7 +81,7 @@ fn main() -> Result<()> { }, registry: registry.clone(), }, - da_mempool: nomos_mempool::Settings { + da_mempool: nomos_mempool::DaMempoolSettings { backend: (), network: AdapterSettings { topic: String::from(nomos_node::DA_TOPIC), diff --git a/nomos-services/api/src/http/cl.rs b/nomos-services/api/src/http/cl.rs index 83223a76..6dfebb52 100644 --- a/nomos-services/api/src/http/cl.rs +++ b/nomos-services/api/src/http/cl.rs @@ -3,18 +3,15 @@ use core::{fmt::Debug, hash::Hash}; use nomos_core::header::HeaderId; use nomos_core::tx::Transaction; use nomos_mempool::{ - backend::mockpool::MockPool, - network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter, - openapi::{MempoolMetrics, Status}, - MempoolMsg, MempoolService, Transaction as TxDiscriminant, + backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter, + tx::service::openapi::Status, MempoolMetrics, MempoolMsg, TxMempoolService, }; use serde::{Deserialize, Serialize}; use tokio::sync::oneshot; -type ClMempoolService = MempoolService< +type ClMempoolService = TxMempoolService< MempoolNetworkAdapter::Hash>, MockPool::Hash>, - TxDiscriminant, >; pub async fn cl_mempool_metrics( diff --git a/nomos-services/api/src/http/da.rs b/nomos-services/api/src/http/da.rs index d2f44f3a..45c0ed66 100644 --- a/nomos-services/api/src/http/da.rs +++ b/nomos-services/api/src/http/da.rs @@ -5,18 +5,17 @@ use nomos_da::{ backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaNetworkAdapter, DaMsg, DataAvailabilityService, }; +use nomos_mempool::da::service::DaMempoolService; use nomos_mempool::{ - backend::mockpool::MockPool, - network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter, - openapi::{MempoolMetrics, Status}, - Certificate as CertDiscriminant, MempoolMsg, MempoolService, + backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter, + tx::service::openapi::Status, }; +use nomos_mempool::{MempoolMetrics, MempoolMsg}; use tokio::sync::oneshot; -pub type DaMempoolService = MempoolService< +pub type MempoolServiceDa = DaMempoolService< MempoolNetworkAdapter::Hash>, MockPool::Hash>, - CertDiscriminant, >; pub type DataAvailability = DataAvailabilityService< @@ -28,7 +27,7 @@ pub type DataAvailability = DataAvailabilityService< pub async fn da_mempool_metrics( handle: &overwatch_rs::overwatch::handle::OverwatchHandle, ) -> Result { - let relay = handle.relay::().connect().await?; + let relay = handle.relay::().connect().await?; let (sender, receiver) = oneshot::channel(); relay .send(MempoolMsg::Metrics { @@ -44,7 +43,7 @@ pub async fn da_mempool_status( handle: &overwatch_rs::overwatch::handle::OverwatchHandle, items: Vec<::Hash>, ) -> Result>, super::DynError> { - let relay = handle.relay::().connect().await?; + let relay = handle.relay::().connect().await?; let (sender, receiver) = oneshot::channel(); relay .send(MempoolMsg::Status { diff --git a/nomos-services/api/src/http/mempool.rs b/nomos-services/api/src/http/mempool.rs index f3e62575..870465ac 100644 --- a/nomos-services/api/src/http/mempool.rs +++ b/nomos-services/api/src/http/mempool.rs @@ -1,12 +1,13 @@ use core::{fmt::Debug, hash::Hash}; use nomos_core::header::HeaderId; use nomos_mempool::{ - backend::mockpool::MockPool, network::NetworkAdapter, Discriminant, MempoolMsg, MempoolService, + backend::mockpool::MockPool, network::NetworkAdapter, DaMempoolService, MempoolMsg, + TxMempoolService, }; use nomos_network::backends::NetworkBackend; use tokio::sync::oneshot; -pub async fn add( +pub async fn add_tx( handle: &overwatch_rs::overwatch::handle::OverwatchHandle, item: Item, converter: impl Fn(&Item) -> Key, @@ -15,12 +16,45 @@ where N: NetworkBackend, A: NetworkAdapter + Send + Sync + 'static, A::Settings: Send + Sync, - D: Discriminant, Item: Clone + Debug + Send + Sync + 'static + Hash, Key: Clone + Debug + Ord + Hash + 'static, { let relay = handle - .relay::, D>>() + .relay::>>() + .connect() + .await?; + let (sender, receiver) = oneshot::channel(); + + relay + .send(MempoolMsg::Add { + key: converter(&item), + item, + reply_channel: sender, + }) + .await + .map_err(|(e, _)| e)?; + + match receiver.await { + Ok(Ok(())) => Ok(()), + Ok(Err(())) => Err("mempool error".into()), + Err(e) => Err(e.into()), + } +} + +pub async fn add_cert( + handle: &overwatch_rs::overwatch::handle::OverwatchHandle, + item: Item, + converter: impl Fn(&Item) -> Key, +) -> Result<(), super::DynError> +where + N: NetworkBackend, + A: NetworkAdapter + Send + Sync + 'static, + A::Settings: Send + Sync, + Item: Clone + Debug + Send + Sync + 'static + Hash, + Key: Clone + Debug + Ord + Hash + 'static, +{ + let relay = handle + .relay::>>() .connect() .await?; let (sender, receiver) = oneshot::channel(); diff --git a/nomos-services/cryptarchia-consensus/src/lib.rs b/nomos-services/cryptarchia-consensus/src/lib.rs index 4fbea2ca..01f31983 100644 --- a/nomos-services/cryptarchia-consensus/src/lib.rs +++ b/nomos-services/cryptarchia-consensus/src/lib.rs @@ -15,8 +15,8 @@ use nomos_core::{ header::cryptarchia::Builder, }; use nomos_mempool::{ - backend::MemPool, network::NetworkAdapter as MempoolAdapter, Certificate as CertDiscriminant, - MempoolMsg, MempoolService, Transaction as TxDiscriminant, + backend::MemPool, network::NetworkAdapter as MempoolAdapter, DaMempoolService, MempoolMsg, + TxMempoolService, }; use nomos_network::NetworkService; use nomos_storage::{backends::StorageBackend, StorageMsg, StorageService}; @@ -149,8 +149,8 @@ where // underlying networking backend. We need this so we can relay and check the types properly // when implementing ServiceCore for CryptarchiaConsensus network_relay: Relay>, - cl_mempool_relay: Relay>, - da_mempool_relay: Relay>, + cl_mempool_relay: Relay>, + da_mempool_relay: Relay>, block_subscription_sender: broadcast::Sender>, storage_relay: Relay>, } diff --git a/nomos-services/mempool/src/da/mod.rs b/nomos-services/mempool/src/da/mod.rs new file mode 100644 index 00000000..1f278a4d --- /dev/null +++ b/nomos-services/mempool/src/da/mod.rs @@ -0,0 +1 @@ +pub mod service; diff --git a/nomos-services/mempool/src/da/service.rs b/nomos-services/mempool/src/da/service.rs new file mode 100644 index 00000000..3644ded3 --- /dev/null +++ b/nomos-services/mempool/src/da/service.rs @@ -0,0 +1,244 @@ +/// Re-export for OpenAPI +#[cfg(feature = "openapi")] +pub mod openapi { + pub use crate::backend::Status; +} + +// std +use std::fmt::Debug; + +// crates +// TODO: Add again after metrics refactor +// #[cfg(feature = "metrics")] +// use super::metrics::Metrics; +use futures::StreamExt; +use nomos_metrics::NomosRegistry; +// internal +use crate::backend::MemPool; +use crate::network::NetworkAdapter; +use crate::{MempoolMetrics, MempoolMsg}; +use nomos_network::{NetworkMsg, NetworkService}; +use overwatch_rs::services::life_cycle::LifecycleMessage; +use overwatch_rs::services::{ + handle::ServiceStateHandle, + relay::{OutboundRelay, Relay}, + state::{NoOperator, NoState}, + ServiceCore, ServiceData, ServiceId, +}; +use tracing::error; + +pub struct DaMempoolService +where + N: NetworkAdapter, + P: MemPool, + P::Settings: Clone, + P::Item: Debug + 'static, + P::Key: Debug + 'static, + P::BlockId: Debug + 'static, +{ + service_state: ServiceStateHandle, + network_relay: Relay>, + pool: P, + // TODO: Add again after metrics refactor + // #[cfg(feature = "metrics")] + // metrics: Option, +} + +impl ServiceData for DaMempoolService +where + N: NetworkAdapter, + P: MemPool, + P::Settings: Clone, + P::Item: Debug + 'static, + P::Key: Debug + 'static, + P::BlockId: Debug + 'static, +{ + const SERVICE_ID: ServiceId = "mempool-da"; + type Settings = DaMempoolSettings; + type State = NoState; + type StateOperator = NoOperator; + type Message = MempoolMsg<

::BlockId,

::Item,

::Key>; +} + +#[async_trait::async_trait] +impl ServiceCore for DaMempoolService +where + P: MemPool + Send + 'static, + P::Settings: Clone + Send + Sync + 'static, + N::Settings: Clone + Send + Sync + 'static, + P::Item: Clone + Debug + Send + Sync + 'static, + P::Key: Debug + Send + Sync + 'static, + P::BlockId: Send + Debug + 'static, + N: NetworkAdapter + Send + Sync + 'static, +{ + fn init(service_state: ServiceStateHandle) -> Result { + let network_relay = service_state.overwatch_handle.relay(); + let settings = service_state.settings_reader.get_updated_settings(); + + // TODO: Refactor metrics to be reusable then replug it again + // #[cfg(feature = "metrics")] + // let metrics = settings + // .registry + // .map(|reg| Metrics::new(reg, service_state.id())); + + Ok(Self { + service_state, + network_relay, + pool: P::new(settings.backend), + // #[cfg(feature = "metrics")] + // metrics, + }) + } + + async fn run(mut self) -> Result<(), overwatch_rs::DynError> { + let Self { + mut service_state, + network_relay, + mut pool, + .. + } = self; + + let mut network_relay: OutboundRelay<_> = network_relay + .connect() + .await + .expect("Relay connection with NetworkService should succeed"); + + let adapter = N::new( + service_state.settings_reader.get_updated_settings().network, + network_relay.clone(), + ); + let adapter = adapter.await; + + let mut network_items = adapter.transactions_stream().await; + let mut lifecycle_stream = service_state.lifecycle_handle.message_stream(); + + loop { + tokio::select! { + Some(msg) = service_state.inbound_relay.recv() => { + // TODO: replug metrics once refactor is done + // #[cfg(feature = "metrics")] + // if let Some(metrics) = &self.metrics { metrics.record(&msg) } + Self::handle_mempool_message(msg, &mut pool, &mut network_relay, &mut service_state).await; + } + Some((key, item )) = network_items.next() => { + pool.add_item(key, item).unwrap_or_else(|e| { + tracing::debug!("could not add item to the pool due to: {}", e) + }); + } + Some(msg) = lifecycle_stream.next() => { + if Self::should_stop_service(msg).await { + break; + } + } + } + } + Ok(()) + } +} + +impl DaMempoolService +where + P: MemPool + Send + 'static, + P::Settings: Clone + Send + Sync + 'static, + N::Settings: Clone + Send + Sync + 'static, + P::Item: Clone + Debug + Send + Sync + 'static, + P::Key: Debug + Send + Sync + 'static, + P::BlockId: Debug + Send + 'static, + N: NetworkAdapter + Send + Sync + 'static, +{ + async fn should_stop_service(message: LifecycleMessage) -> bool { + match message { + LifecycleMessage::Shutdown(sender) => { + if sender.send(()).is_err() { + error!( + "Error sending successful shutdown signal from service {}", + Self::SERVICE_ID + ); + } + true + } + LifecycleMessage::Kill => true, + } + } + + async fn handle_mempool_message( + message: MempoolMsg, + pool: &mut P, + network_relay: &mut OutboundRelay>, + service_state: &mut ServiceStateHandle, + ) { + match message { + MempoolMsg::Add { + item, + key, + reply_channel, + } => { + match pool.add_item(key, item.clone()) { + Ok(_id) => { + // Broadcast the item to the network + let net = network_relay.clone(); + let settings = service_state.settings_reader.get_updated_settings().network; + // move sending to a new task so local operations can complete in the meantime + tokio::spawn(async move { + let adapter = N::new(settings, net).await; + adapter.send(item).await; + }); + if let Err(e) = reply_channel.send(Ok(())) { + tracing::debug!("Failed to send reply to AddTx: {:?}", e); + } + } + Err(e) => { + tracing::debug!("could not add tx to the pool due to: {}", e); + } + } + } + MempoolMsg::View { + ancestor_hint, + reply_channel, + } => { + reply_channel + .send(pool.view(ancestor_hint)) + .unwrap_or_else(|_| tracing::debug!("could not send back pool view")); + } + MempoolMsg::MarkInBlock { ids, block } => { + pool.mark_in_block(&ids, block); + } + #[cfg(test)] + MempoolMsg::BlockItems { + block, + reply_channel, + } => { + reply_channel + .send(pool.block_items(block)) + .unwrap_or_else(|_| tracing::debug!("could not send back block items")); + } + MempoolMsg::Prune { ids } => { + pool.prune(&ids); + } + MempoolMsg::Metrics { reply_channel } => { + let metrics = MempoolMetrics { + pending_items: pool.pending_item_count(), + last_item_timestamp: pool.last_item_timestamp(), + }; + reply_channel + .send(metrics) + .unwrap_or_else(|_| tracing::debug!("could not send back mempool metrics")); + } + MempoolMsg::Status { + items, + reply_channel, + } => { + reply_channel + .send(pool.status(&items)) + .unwrap_or_else(|_| tracing::debug!("could not send back mempool status")); + } + } + } +} + +#[derive(Clone, Debug)] +pub struct DaMempoolSettings { + pub backend: B, + pub network: N, + pub registry: Option, +} diff --git a/nomos-services/mempool/src/lib.rs b/nomos-services/mempool/src/lib.rs index 2f3ab4e0..c7a1f4d1 100644 --- a/nomos-services/mempool/src/lib.rs +++ b/nomos-services/mempool/src/lib.rs @@ -1,67 +1,15 @@ pub mod backend; -#[cfg(feature = "metrics")] -pub mod metrics; +pub mod da; pub mod network; +pub mod tx; -/// Re-export for OpenAPI -#[cfg(feature = "openapi")] -pub mod openapi { - pub use super::{backend::Status, MempoolMetrics}; -} - -// std -use std::{ - fmt::{Debug, Error, Formatter}, - marker::PhantomData, -}; - -// crates -use futures::StreamExt; -#[cfg(feature = "metrics")] -use metrics::Metrics; -use nomos_metrics::NomosRegistry; +use backend::Status; +use overwatch_rs::services::relay::RelayMessage; +use std::fmt::{Debug, Error, Formatter}; use tokio::sync::oneshot::Sender; -// internal -use crate::network::NetworkAdapter; -use backend::{MemPool, Status}; -use nomos_network::{NetworkMsg, NetworkService}; -use overwatch_rs::services::life_cycle::LifecycleMessage; -use overwatch_rs::services::{ - handle::ServiceStateHandle, - relay::{OutboundRelay, Relay, RelayMessage}, - state::{NoOperator, NoState}, - ServiceCore, ServiceData, ServiceId, -}; -use tracing::error; -pub struct MempoolService -where - N: NetworkAdapter, - P: MemPool, - P::Settings: Clone, - P::Item: Debug + 'static, - P::Key: Debug + 'static, - P::BlockId: Debug + 'static, - D: Discriminant, -{ - service_state: ServiceStateHandle, - network_relay: Relay>, - pool: P, - #[cfg(feature = "metrics")] - metrics: Option, - // This is an hack because SERVICE_ID has to be univoque and associated const - // values can't depend on generic parameters. - // Unfortunately, this means that the mempools for certificates and transactions - // would have the same SERVICE_ID and break overwatch asumptions. - _d: PhantomData, -} - -#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -#[derive(serde::Serialize, serde::Deserialize)] -pub struct MempoolMetrics { - pub pending_items: usize, - pub last_item_timestamp: u64, -} +pub use da::service::{DaMempoolService, DaMempoolSettings}; +pub use tx::service::{TxMempoolService, TxMempoolSettings}; pub enum MempoolMsg { Add { @@ -123,223 +71,14 @@ where } } +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] +#[derive(serde::Serialize, serde::Deserialize)] +pub struct MempoolMetrics { + pub pending_items: usize, + pub last_item_timestamp: u64, +} + impl RelayMessage for MempoolMsg { } - -pub struct Transaction; -pub struct Certificate; - -pub trait Discriminant { - const ID: &'static str; -} - -impl Discriminant for Transaction { - const ID: &'static str = "mempool-cl"; -} - -impl Discriminant for Certificate { - const ID: &'static str = "mempool-da"; -} - -impl ServiceData for MempoolService -where - N: NetworkAdapter, - P: MemPool, - P::Settings: Clone, - P::Item: Debug + 'static, - P::Key: Debug + 'static, - P::BlockId: Debug + 'static, - D: Discriminant, -{ - const SERVICE_ID: ServiceId = D::ID; - type Settings = Settings; - type State = NoState; - type StateOperator = NoOperator; - type Message = MempoolMsg<

::BlockId,

::Item,

::Key>; -} - -#[async_trait::async_trait] -impl ServiceCore for MempoolService -where - P: MemPool + Send + 'static, - P::Settings: Clone + Send + Sync + 'static, - N::Settings: Clone + Send + Sync + 'static, - P::Item: Clone + Debug + Send + Sync + 'static, - P::Key: Debug + Send + Sync + 'static, - P::BlockId: Send + Debug + 'static, - N: NetworkAdapter + Send + Sync + 'static, - D: Discriminant + Send, -{ - fn init(service_state: ServiceStateHandle) -> Result { - let network_relay = service_state.overwatch_handle.relay(); - let settings = service_state.settings_reader.get_updated_settings(); - - #[cfg(feature = "metrics")] - let metrics = settings - .registry - .map(|reg| Metrics::new(reg, service_state.id())); - - Ok(Self { - service_state, - network_relay, - pool: P::new(settings.backend), - #[cfg(feature = "metrics")] - metrics, - _d: PhantomData, - }) - } - - async fn run(mut self) -> Result<(), overwatch_rs::DynError> { - let Self { - mut service_state, - network_relay, - mut pool, - .. - } = self; - - let mut network_relay: OutboundRelay<_> = network_relay - .connect() - .await - .expect("Relay connection with NetworkService should succeed"); - - let adapter = N::new( - service_state.settings_reader.get_updated_settings().network, - network_relay.clone(), - ); - let adapter = adapter.await; - - let mut network_items = adapter.transactions_stream().await; - let mut lifecycle_stream = service_state.lifecycle_handle.message_stream(); - - loop { - tokio::select! { - Some(msg) = service_state.inbound_relay.recv() => { - #[cfg(feature = "metrics")] - if let Some(metrics) = &self.metrics { metrics.record(&msg) } - Self::handle_mempool_message(msg, &mut pool, &mut network_relay, &mut service_state).await; - } - Some((key, item )) = network_items.next() => { - pool.add_item(key, item).unwrap_or_else(|e| { - tracing::debug!("could not add item to the pool due to: {}", e) - }); - } - Some(msg) = lifecycle_stream.next() => { - if Self::should_stop_service(msg).await { - break; - } - } - } - } - Ok(()) - } -} - -impl MempoolService -where - P: MemPool + Send + 'static, - P::Settings: Clone + Send + Sync + 'static, - N::Settings: Clone + Send + Sync + 'static, - P::Item: Clone + Debug + Send + Sync + 'static, - P::Key: Debug + Send + Sync + 'static, - P::BlockId: Debug + Send + 'static, - N: NetworkAdapter + Send + Sync + 'static, - D: Discriminant + Send, -{ - async fn should_stop_service(message: LifecycleMessage) -> bool { - match message { - LifecycleMessage::Shutdown(sender) => { - if sender.send(()).is_err() { - error!( - "Error sending successful shutdown signal from service {}", - Self::SERVICE_ID - ); - } - true - } - LifecycleMessage::Kill => true, - } - } - - async fn handle_mempool_message( - message: MempoolMsg, - pool: &mut P, - network_relay: &mut OutboundRelay>, - service_state: &mut ServiceStateHandle, - ) { - match message { - MempoolMsg::Add { - item, - key, - reply_channel, - } => { - match pool.add_item(key, item.clone()) { - Ok(_id) => { - // Broadcast the item to the network - let net = network_relay.clone(); - let settings = service_state.settings_reader.get_updated_settings().network; - // move sending to a new task so local operations can complete in the meantime - tokio::spawn(async move { - let adapter = N::new(settings, net).await; - adapter.send(item).await; - }); - if let Err(e) = reply_channel.send(Ok(())) { - tracing::debug!("Failed to send reply to AddTx: {:?}", e); - } - } - Err(e) => { - tracing::debug!("could not add tx to the pool due to: {}", e); - } - } - } - MempoolMsg::View { - ancestor_hint, - reply_channel, - } => { - reply_channel - .send(pool.view(ancestor_hint)) - .unwrap_or_else(|_| tracing::debug!("could not send back pool view")); - } - MempoolMsg::MarkInBlock { ids, block } => { - pool.mark_in_block(&ids, block); - } - #[cfg(test)] - MempoolMsg::BlockItems { - block, - reply_channel, - } => { - reply_channel - .send(pool.block_items(block)) - .unwrap_or_else(|_| tracing::debug!("could not send back block items")); - } - MempoolMsg::Prune { ids } => { - pool.prune(&ids); - } - MempoolMsg::Metrics { reply_channel } => { - let metrics = MempoolMetrics { - pending_items: pool.pending_item_count(), - last_item_timestamp: pool.last_item_timestamp(), - }; - reply_channel - .send(metrics) - .unwrap_or_else(|_| tracing::debug!("could not send back mempool metrics")); - } - MempoolMsg::Status { - items, - reply_channel, - } => { - reply_channel - .send(pool.status(&items)) - .unwrap_or_else(|_| tracing::debug!("could not send back mempool status")); - } - } - } -} - -#[derive(Clone, Debug)] -pub struct Settings { - pub backend: B, - pub network: N, - pub registry: Option, -} diff --git a/nomos-services/mempool/src/network/messages.rs b/nomos-services/mempool/src/network/messages.rs index 0dbdb9b6..71d98f20 100644 --- a/nomos-services/mempool/src/network/messages.rs +++ b/nomos-services/mempool/src/network/messages.rs @@ -4,6 +4,6 @@ use serde::{Deserialize, Serialize}; // internal #[derive(Serialize, Deserialize)] -pub struct TransactionMsg { - pub tx: Tx, +pub struct PayloadMsg { + pub payload: Payload, } diff --git a/nomos-services/mempool/src/metrics.rs b/nomos-services/mempool/src/tx/metrics.rs similarity index 68% rename from nomos-services/mempool/src/metrics.rs rename to nomos-services/mempool/src/tx/metrics.rs index 02825ea5..94e5a10c 100644 --- a/nomos-services/mempool/src/metrics.rs +++ b/nomos-services/mempool/src/tx/metrics.rs @@ -8,7 +8,7 @@ use nomos_metrics::{ }; use overwatch_rs::services::ServiceId; // internal -use crate::MempoolMsg; +use super::service::TxMempoolMsg; #[derive(Debug, Clone, Hash, PartialEq, Eq, EncodeLabelValue)] enum MempoolMsgType { @@ -18,17 +18,17 @@ enum MempoolMsgType { MarkInBlock, } -impl From<&MempoolMsg> for MempoolMsgType +impl From<&TxMempoolMsg> for MempoolMsgType where I: 'static + Debug, K: 'static + Debug, { - fn from(event: &MempoolMsg) -> Self { + fn from(event: &TxMempoolMsg) -> Self { match event { - MempoolMsg::Add { .. } => MempoolMsgType::Add, - MempoolMsg::View { .. } => MempoolMsgType::View, - MempoolMsg::Prune { .. } => MempoolMsgType::Prune, - MempoolMsg::MarkInBlock { .. } => MempoolMsgType::MarkInBlock, + TxMempoolMsg::Add { .. } => MempoolMsgType::Add, + TxMempoolMsg::View { .. } => MempoolMsgType::View, + TxMempoolMsg::Prune { .. } => MempoolMsgType::Prune, + TxMempoolMsg::MarkInBlock { .. } => MempoolMsgType::MarkInBlock, _ => unimplemented!(), } } @@ -60,16 +60,16 @@ impl Metrics { Self { messages } } - pub(crate) fn record(&self, msg: &MempoolMsg) + pub(crate) fn record(&self, msg: &TxMempoolMsg) where I: 'static + Debug, K: 'static + Debug, { match msg { - MempoolMsg::Add { .. } - | MempoolMsg::View { .. } - | MempoolMsg::Prune { .. } - | MempoolMsg::MarkInBlock { .. } => { + TxMempoolMsg::Add { .. } + | TxMempoolMsg::View { .. } + | TxMempoolMsg::Prune { .. } + | TxMempoolMsg::MarkInBlock { .. } => { self.messages .get_or_create(&MessageLabels { label: msg.into() }) .inc(); diff --git a/nomos-services/mempool/src/tx/mod.rs b/nomos-services/mempool/src/tx/mod.rs new file mode 100644 index 00000000..320bcdf1 --- /dev/null +++ b/nomos-services/mempool/src/tx/mod.rs @@ -0,0 +1,3 @@ +#[cfg(feature = "metrics")] +pub mod metrics; +pub mod service; diff --git a/nomos-services/mempool/src/tx/service.rs b/nomos-services/mempool/src/tx/service.rs new file mode 100644 index 00000000..1820a9a7 --- /dev/null +++ b/nomos-services/mempool/src/tx/service.rs @@ -0,0 +1,240 @@ +/// Re-export for OpenAPI +#[cfg(feature = "openapi")] +pub mod openapi { + pub use crate::backend::Status; +} + +// std +use std::fmt::Debug; + +// crates +#[cfg(feature = "metrics")] +use super::metrics::Metrics; +use futures::StreamExt; +use nomos_metrics::NomosRegistry; +// internal +use crate::backend::MemPool; +use crate::network::NetworkAdapter; +use crate::{MempoolMetrics, MempoolMsg}; +use nomos_network::{NetworkMsg, NetworkService}; +use overwatch_rs::services::life_cycle::LifecycleMessage; +use overwatch_rs::services::{ + handle::ServiceStateHandle, + relay::{OutboundRelay, Relay}, + state::{NoOperator, NoState}, + ServiceCore, ServiceData, ServiceId, +}; +use tracing::error; + +pub struct TxMempoolService +where + N: NetworkAdapter, + P: MemPool, + P::Settings: Clone, + P::Item: Debug + 'static, + P::Key: Debug + 'static, + P::BlockId: Debug + 'static, +{ + service_state: ServiceStateHandle, + network_relay: Relay>, + pool: P, + #[cfg(feature = "metrics")] + metrics: Option, +} + +impl ServiceData for TxMempoolService +where + N: NetworkAdapter, + P: MemPool, + P::Settings: Clone, + P::Item: Debug + 'static, + P::Key: Debug + 'static, + P::BlockId: Debug + 'static, +{ + const SERVICE_ID: ServiceId = "mempool-cl"; + type Settings = TxMempoolSettings; + type State = NoState; + type StateOperator = NoOperator; + type Message = MempoolMsg<

::BlockId,

::Item,

::Key>; +} + +#[async_trait::async_trait] +impl ServiceCore for TxMempoolService +where + P: MemPool + Send + 'static, + P::Settings: Clone + Send + Sync + 'static, + N::Settings: Clone + Send + Sync + 'static, + P::Item: Clone + Debug + Send + Sync + 'static, + P::Key: Debug + Send + Sync + 'static, + P::BlockId: Send + Debug + 'static, + N: NetworkAdapter + Send + Sync + 'static, +{ + fn init(service_state: ServiceStateHandle) -> Result { + let network_relay = service_state.overwatch_handle.relay(); + let settings = service_state.settings_reader.get_updated_settings(); + + #[cfg(feature = "metrics")] + let metrics = settings + .registry + .map(|reg| Metrics::new(reg, service_state.id())); + + Ok(Self { + service_state, + network_relay, + pool: P::new(settings.backend), + #[cfg(feature = "metrics")] + metrics, + }) + } + + async fn run(mut self) -> Result<(), overwatch_rs::DynError> { + let Self { + mut service_state, + network_relay, + mut pool, + .. + } = self; + + let mut network_relay: OutboundRelay<_> = network_relay + .connect() + .await + .expect("Relay connection with NetworkService should succeed"); + + let adapter = N::new( + service_state.settings_reader.get_updated_settings().network, + network_relay.clone(), + ); + let adapter = adapter.await; + + let mut network_items = adapter.transactions_stream().await; + let mut lifecycle_stream = service_state.lifecycle_handle.message_stream(); + + loop { + tokio::select! { + Some(msg) = service_state.inbound_relay.recv() => { + #[cfg(feature = "metrics")] + if let Some(metrics) = &self.metrics { metrics.record(&msg) } + Self::handle_mempool_message(msg, &mut pool, &mut network_relay, &mut service_state).await; + } + Some((key, item )) = network_items.next() => { + pool.add_item(key, item).unwrap_or_else(|e| { + tracing::debug!("could not add item to the pool due to: {}", e) + }); + } + Some(msg) = lifecycle_stream.next() => { + if Self::should_stop_service(msg).await { + break; + } + } + } + } + Ok(()) + } +} + +impl TxMempoolService +where + P: MemPool + Send + 'static, + P::Settings: Clone + Send + Sync + 'static, + N::Settings: Clone + Send + Sync + 'static, + P::Item: Clone + Debug + Send + Sync + 'static, + P::Key: Debug + Send + Sync + 'static, + P::BlockId: Debug + Send + 'static, + N: NetworkAdapter + Send + Sync + 'static, +{ + async fn should_stop_service(message: LifecycleMessage) -> bool { + match message { + LifecycleMessage::Shutdown(sender) => { + if sender.send(()).is_err() { + error!( + "Error sending successful shutdown signal from service {}", + Self::SERVICE_ID + ); + } + true + } + LifecycleMessage::Kill => true, + } + } + + async fn handle_mempool_message( + message: MempoolMsg, + pool: &mut P, + network_relay: &mut OutboundRelay>, + service_state: &mut ServiceStateHandle, + ) { + match message { + MempoolMsg::Add { + item, + key, + reply_channel, + } => { + match pool.add_item(key, item.clone()) { + Ok(_id) => { + // Broadcast the item to the network + let net = network_relay.clone(); + let settings = service_state.settings_reader.get_updated_settings().network; + // move sending to a new task so local operations can complete in the meantime + tokio::spawn(async move { + let adapter = N::new(settings, net).await; + adapter.send(item).await; + }); + if let Err(e) = reply_channel.send(Ok(())) { + tracing::debug!("Failed to send reply to AddTx: {:?}", e); + } + } + Err(e) => { + tracing::debug!("could not add tx to the pool due to: {}", e); + } + } + } + MempoolMsg::View { + ancestor_hint, + reply_channel, + } => { + reply_channel + .send(pool.view(ancestor_hint)) + .unwrap_or_else(|_| tracing::debug!("could not send back pool view")); + } + MempoolMsg::MarkInBlock { ids, block } => { + pool.mark_in_block(&ids, block); + } + #[cfg(test)] + MempoolMsg::BlockItems { + block, + reply_channel, + } => { + reply_channel + .send(pool.block_items(block)) + .unwrap_or_else(|_| tracing::debug!("could not send back block items")); + } + MempoolMsg::Prune { ids } => { + pool.prune(&ids); + } + MempoolMsg::Metrics { reply_channel } => { + let metrics = MempoolMetrics { + pending_items: pool.pending_item_count(), + last_item_timestamp: pool.last_item_timestamp(), + }; + reply_channel + .send(metrics) + .unwrap_or_else(|_| tracing::debug!("could not send back mempool metrics")); + } + MempoolMsg::Status { + items, + reply_channel, + } => { + reply_channel + .send(pool.status(&items)) + .unwrap_or_else(|_| tracing::debug!("could not send back mempool status")); + } + } + } +} + +#[derive(Clone, Debug)] +pub struct TxMempoolSettings { + pub backend: B, + pub network: N, + pub registry: Option, +} diff --git a/nomos-services/mempool/tests/mock.rs b/nomos-services/mempool/tests/mock.rs index 1a4685f2..acbbcc3b 100644 --- a/nomos-services/mempool/tests/mock.rs +++ b/nomos-services/mempool/tests/mock.rs @@ -13,7 +13,7 @@ use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle}; use nomos_mempool::{ backend::mockpool::MockPool, network::adapters::mock::{MockAdapter, MOCK_TX_CONTENT_TOPIC}, - MempoolMsg, MempoolService, Settings, Transaction, + MempoolMsg, TxMempoolService, TxMempoolSettings, }; #[derive(Services)] @@ -21,11 +21,7 @@ struct MockPoolNode { logging: ServiceHandle, network: ServiceHandle>, mockpool: ServiceHandle< - MempoolService< - MockAdapter, - MockPool, MockTxId>, - Transaction, - >, + TxMempoolService, MockTxId>>, >, } @@ -65,7 +61,7 @@ fn test_mockmempool() { weights: None, }, }, - mockpool: Settings { + mockpool: TxMempoolSettings { backend: (), network: (), registry: None, @@ -78,11 +74,11 @@ fn test_mockmempool() { .unwrap(); let network = app.handle().relay::>(); - let mempool = app.handle().relay::, MockTxId>, - Transaction, - >>(); + let mempool = + app.handle().relay::, MockTxId>, + >>(); app.spawn(async move { let network_outbound = network.connect().await.unwrap();