feat(sdp-service): replace mempool mock with real adapter (#1892)

Co-authored-by: gusto <bacv@users.noreply.github.com>
This commit is contained in:
Petar Radovic 2025-11-06 11:25:40 +01:00 committed by GitHub
parent 6b3dad45a3
commit d08101a132
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 221 additions and 84 deletions

View File

@ -49,8 +49,8 @@ use nomos_node::{
da_get_shares, da_get_storage_commitments, libp2p_info, mantle_metrics, mantle_status,
monitor_stats, post_activity, post_declaration, post_withdrawal, unblock_peer,
},
generic_services::SdpService,
};
use nomos_sdp::adapters::mempool::SdpMempoolAdapter;
use nomos_storage::{StorageService, api::da};
use overwatch::{DynError, overwatch::handle::OverwatchHandle, services::AsServiceId};
use serde::{Serialize, de::DeserializeOwned};
@ -99,6 +99,7 @@ pub struct AxumBackend<
SdpAdapter,
HttpStorageAdapter,
MempoolStorageAdapter,
SdpMempool,
> {
settings: AxumBackendSettings,
#[expect(clippy::allow_attributes_without_reason)]
@ -127,6 +128,7 @@ pub struct AxumBackend<
SdpAdapter,
HttpStorageAdapter,
MempoolStorageAdapter,
SdpMempool,
)>,
}
@ -168,6 +170,7 @@ impl<
SdpAdapter,
StorageAdapter,
MempoolStorageAdapter,
SdpMempool,
RuntimeServiceId,
> Backend<RuntimeServiceId>
for AxumBackend<
@ -194,6 +197,7 @@ impl<
SdpAdapter,
StorageAdapter,
MempoolStorageAdapter,
SdpMempool,
>
where
DaShare: Share + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
@ -290,6 +294,7 @@ where
ApiAdapter: nomos_da_network_service::api::ApiAdapter + Send + Sync + 'static,
SdpAdapter: SdpAdapterTrait<RuntimeServiceId> + Send + Sync + 'static,
StorageAdapter: storage::StorageAdapter<RuntimeServiceId> + Send + Sync + 'static,
SdpMempool: SdpMempoolAdapter + Send + Sync + 'static,
MempoolStorageAdapter: tx_service::storage::MempoolStorageAdapter<
RuntimeServiceId,
Key = <SignedMantleTx as Transaction>::Hash,
@ -374,7 +379,7 @@ where
RuntimeServiceId,
>,
>
+ AsServiceId<SdpService<RuntimeServiceId>>,
+ AsServiceId<nomos_sdp::SdpService<SdpMempool, RuntimeServiceId>>,
{
type Error = std::io::Error;
type Settings = AxumBackendSettings;
@ -605,15 +610,15 @@ where
)
.route(
paths::SDP_POST_DECLARATION,
routing::post(post_declaration::<RuntimeServiceId>),
routing::post(post_declaration::<SdpMempool, RuntimeServiceId>),
)
.route(
paths::SDP_POST_ACTIVITY,
routing::post(post_activity::<RuntimeServiceId>),
routing::post(post_activity::<SdpMempool, RuntimeServiceId>),
)
.route(
paths::SDP_POST_WITHDRAWAL,
routing::post(post_withdrawal::<RuntimeServiceId>),
routing::post(post_withdrawal::<SdpMempool, RuntimeServiceId>),
)
.with_state(handle.clone())
.layer(TimeoutLayer::new(self.settings.timeout))

View File

@ -79,7 +79,8 @@ where
+ 'static
+ AsServiceId<TestDaNetworkService<RuntimeServiceId>>
+ AsServiceId<TestDaSamplingService<RuntimeServiceId>>
+ AsServiceId<SdpService<RuntimeServiceId>>,
+ AsServiceId<SdpService<RuntimeServiceId>>
+ AsServiceId<generic_services::TxMempoolService<RuntimeServiceId>>,
{
type Error = std::io::Error;
type Settings = AxumBackendSettings;

View File

@ -29,8 +29,8 @@ use nomos_node::Tracing;
use nomos_node::{
BlobInfo, DaNetworkApiAdapter, NetworkBackend, NomosDaMembership, RocksBackend, SystemSig,
generic_services::{
DaMembershipAdapter, DaMembershipStorageGeneric, SamplingMempoolAdapter, SdpService,
SdpServiceAdapterGeneric, VerifierMempoolAdapter,
DaMembershipAdapter, DaMembershipStorageGeneric, SamplingMempoolAdapter,
SdpMempoolAdapterGeneric, SdpService, SdpServiceAdapterGeneric, VerifierMempoolAdapter,
},
};
use nomos_time::backends::NtpTimeBackend;
@ -184,6 +184,7 @@ pub(crate) type ApiService = nomos_api::ApiService<
SdpServiceAdapterGeneric<RuntimeServiceId>,
ApiStorageAdapter<RuntimeServiceId>,
RocksStorageAdapter<SignedMantleTx, TxHash>,
SdpMempoolAdapterGeneric<RuntimeServiceId>,
>,
RuntimeServiceId,
>;

View File

@ -38,6 +38,7 @@ use nomos_da_verifier::{backend::VerifierBackend, mempool::DaMempoolAdapter};
pub use nomos_http_api_common::settings::AxumBackendSettings;
use nomos_http_api_common::{paths, utils::create_rate_limit_layer};
use nomos_libp2p::PeerId;
use nomos_sdp::adapters::mempool::SdpMempoolAdapter;
use nomos_storage::{StorageService, api::da::DaConverter, backends::rocksdb::RocksBackend};
use overwatch::{DynError, overwatch::handle::OverwatchHandle, services::AsServiceId};
use serde::{Serialize, de::DeserializeOwned};
@ -63,10 +64,7 @@ use super::handlers::{
da_get_shares, da_get_storage_commitments, libp2p_info, mantle_metrics, mantle_status,
monitor_stats, unblock_peer,
};
use crate::{
api::handlers::{post_activity, post_declaration, post_withdrawal},
generic_services::SdpService,
};
use crate::api::handlers::{post_activity, post_declaration, post_withdrawal};
pub(crate) type DaStorageBackend = RocksBackend;
type DaStorageService<RuntimeServiceId> = StorageService<DaStorageBackend, RuntimeServiceId>;
@ -90,6 +88,7 @@ pub struct AxumBackend<
SdpAdapter,
HttpStorageAdapter,
MempoolStorageAdapter,
SdpMempool,
> {
settings: AxumBackendSettings,
_share: core::marker::PhantomData<DaShare>,
@ -109,6 +108,7 @@ pub struct AxumBackend<
_da_membership: core::marker::PhantomData<(DaMembershipAdapter, DaMembershipStorage)>,
_verifier_mempool_adapter: core::marker::PhantomData<VerifierMempoolAdapter>,
_sampling_mempool_adapter: core::marker::PhantomData<SamplingMempoolAdapter>,
_sdp_mempool_adapter: core::marker::PhantomData<SdpMempool>,
}
#[derive(OpenApi)]
@ -144,6 +144,7 @@ impl<
SdpAdapter,
StorageAdapter,
MempoolStorageAdapter,
SdpMempool,
RuntimeServiceId,
> Backend<RuntimeServiceId>
for AxumBackend<
@ -165,6 +166,7 @@ impl<
SdpAdapter,
StorageAdapter,
MempoolStorageAdapter,
SdpMempool,
>
where
DaShare: Share + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
@ -225,6 +227,7 @@ where
+ Clone
+ 'static,
MempoolStorageAdapter::Error: Debug,
SdpMempool: SdpMempoolAdapter + Send + Sync + 'static,
SamplingMempoolAdapter: nomos_da_sampling::mempool::DaMempoolAdapter + Send + Sync + 'static,
RuntimeServiceId: Debug
+ Sync
@ -298,7 +301,7 @@ where
RuntimeServiceId,
>,
>
+ AsServiceId<SdpService<RuntimeServiceId>>,
+ AsServiceId<nomos_sdp::SdpService<SdpMempool, RuntimeServiceId>>,
{
type Error = std::io::Error;
type Settings = AxumBackendSettings;
@ -326,6 +329,7 @@ where
_da_membership: core::marker::PhantomData,
_verifier_mempool_adapter: core::marker::PhantomData,
_sampling_mempool_adapter: core::marker::PhantomData,
_sdp_mempool_adapter: core::marker::PhantomData,
})
}
@ -548,15 +552,15 @@ where
)
.route(
paths::SDP_POST_DECLARATION,
routing::post(post_declaration::<RuntimeServiceId>),
routing::post(post_declaration::<SdpMempool, RuntimeServiceId>),
)
.route(
paths::SDP_POST_ACTIVITY,
routing::post(post_activity::<RuntimeServiceId>),
routing::post(post_activity::<SdpMempool, RuntimeServiceId>),
)
.route(
paths::SDP_POST_WITHDRAWAL,
routing::post(post_withdrawal::<RuntimeServiceId>),
routing::post(post_withdrawal::<SdpMempool, RuntimeServiceId>),
)
.with_state(handle.clone())
.layer(TimeoutLayer::new(self.settings.timeout))

View File

@ -37,6 +37,7 @@ use nomos_da_verifier::{backend::VerifierBackend, mempool::DaMempoolAdapter};
use nomos_http_api_common::paths;
use nomos_libp2p::PeerId;
use nomos_network::backends::libp2p::Libp2p as Libp2pNetworkBackend;
use nomos_sdp::adapters::mempool::SdpMempoolAdapter;
use nomos_storage::{StorageService, api::da::DaConverter, backends::rocksdb::RocksBackend};
use overwatch::{overwatch::handle::OverwatchHandle, services::AsServiceId};
use serde::{Deserialize, Serialize, de::DeserializeOwned};
@ -949,19 +950,21 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
pub async fn post_declaration<RuntimeServiceId>(
pub async fn post_declaration<MempoolAdapter, RuntimeServiceId>(
State(handle): State<OverwatchHandle<RuntimeServiceId>>,
Json(declaration): Json<nomos_core::sdp::DeclarationMessage>,
) -> Response
where
MempoolAdapter: SdpMempoolAdapter + Send + Sync + 'static,
RuntimeServiceId: Debug
+ Sync
+ Send
+ Display
+ 'static
+ AsServiceId<nomos_sdp::SdpService<RuntimeServiceId>>,
+ AsServiceId<nomos_sdp::SdpService<MempoolAdapter, RuntimeServiceId>>,
{
make_request_and_return_response!(nomos_api::http::sdp::post_declaration_handler::<
MempoolAdapter,
RuntimeServiceId,
>(handle, declaration))
}
@ -974,19 +977,21 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
pub async fn post_activity<RuntimeServiceId>(
pub async fn post_activity<MempoolAdapter, RuntimeServiceId>(
State(handle): State<OverwatchHandle<RuntimeServiceId>>,
Json(metadata): Json<nomos_core::sdp::ActivityMetadata>,
) -> Response
where
MempoolAdapter: SdpMempoolAdapter + Send + Sync + 'static,
RuntimeServiceId: Debug
+ Sync
+ Send
+ Display
+ 'static
+ AsServiceId<nomos_sdp::SdpService<RuntimeServiceId>>,
+ AsServiceId<nomos_sdp::SdpService<MempoolAdapter, RuntimeServiceId>>,
{
make_request_and_return_response!(nomos_api::http::sdp::post_activity_handler::<
MempoolAdapter,
RuntimeServiceId,
>(handle, metadata))
}
@ -999,19 +1004,21 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
pub async fn post_withdrawal<RuntimeServiceId>(
pub async fn post_withdrawal<MempoolAdapter, RuntimeServiceId>(
State(handle): State<OverwatchHandle<RuntimeServiceId>>,
Json(declaration_id): Json<nomos_core::sdp::DeclarationId>,
) -> Response
where
MempoolAdapter: SdpMempoolAdapter + Send + Sync + 'static,
RuntimeServiceId: Debug
+ Sync
+ Send
+ Display
+ 'static
+ AsServiceId<nomos_sdp::SdpService<RuntimeServiceId>>,
+ AsServiceId<nomos_sdp::SdpService<MempoolAdapter, RuntimeServiceId>>,
{
make_request_and_return_response!(nomos_api::http::sdp::post_withdrawal_handler::<
MempoolAdapter,
RuntimeServiceId,
>(handle, declaration_id))
}

View File

@ -80,7 +80,8 @@ where
+ 'static
+ AsServiceId<TestDaNetworkService<RuntimeServiceId>>
+ AsServiceId<TestDaSamplingService<RuntimeServiceId>>
+ AsServiceId<SdpService<RuntimeServiceId>>,
+ AsServiceId<SdpService<RuntimeServiceId>>
+ AsServiceId<generic_services::TxMempoolService<RuntimeServiceId>>,
{
type Error = std::io::Error;
type Settings = AxumBackendSettings;

View File

@ -16,6 +16,7 @@ use nomos_da_sampling::{
use nomos_da_verifier::{
backend::kzgrs::KzgrsDaVerifier, mempool::kzgrs::KzgrsMempoolNetworkAdapter,
};
use nomos_sdp::adapters::mempool::sdp::SdpMempoolNetworkAdapter;
use nomos_storage::backends::rocksdb::RocksBackend;
use nomos_time::backends::NtpTimeBackend;
use tx_service::{backend::pool::Mempool, storage::adapters::rocksdb::RocksStorageAdapter};
@ -139,8 +140,22 @@ pub type CryptarchiaLeaderService<Cryptarchia, Wallet, SamplingAdapter, RuntimeS
pub type DaMembershipAdapter<RuntimeServiceId> = MembershipServiceAdapter<RuntimeServiceId>;
pub type SdpService<RuntimeServiceId> = nomos_sdp::SdpService<RuntimeServiceId>;
pub type SdpServiceAdapterGeneric<RuntimeServiceId> = SdpServiceAdapter<RuntimeServiceId>;
pub type SdpMempoolAdapterGeneric<RuntimeServiceId> = SdpMempoolNetworkAdapter<
tx_service::network::adapters::libp2p::Libp2pAdapter<SignedMantleTx, TxHash, RuntimeServiceId>,
Mempool<
HeaderId,
SignedMantleTx,
TxHash,
RocksStorageAdapter<SignedMantleTx, <SignedMantleTx as Transaction>::Hash>,
RuntimeServiceId,
>,
RuntimeServiceId,
>;
pub type SdpService<RuntimeServiceId> =
nomos_sdp::SdpService<SdpMempoolAdapterGeneric<RuntimeServiceId>, RuntimeServiceId>;
pub type SdpServiceAdapterGeneric<RuntimeServiceId> =
SdpServiceAdapter<SdpMempoolAdapterGeneric<RuntimeServiceId>, RuntimeServiceId>;
pub type DaMembershipStorageGeneric<RuntimeServiceId> =
RocksAdapter<RocksBackend, RuntimeServiceId>;

View File

@ -63,7 +63,8 @@ pub use crate::config::{Config, CryptarchiaLeaderArgs, HttpArgs, LogArgs, Networ
use crate::{
api::backend::AxumBackend,
generic_services::{
DaMembershipAdapter, DaMembershipStorageGeneric, SdpService, SdpServiceAdapterGeneric,
DaMembershipAdapter, DaMembershipStorageGeneric, SdpMempoolAdapterGeneric, SdpService,
SdpServiceAdapterGeneric,
},
};
@ -190,6 +191,7 @@ pub(crate) type ApiService = nomos_api::ApiService<
SdpServiceAdapterGeneric<RuntimeServiceId>,
ApiStorageAdapter<RuntimeServiceId>,
RocksStorageAdapter<SignedMantleTx, TxHash>,
SdpMempoolAdapterGeneric<RuntimeServiceId>,
>,
RuntimeServiceId,
>;

View File

@ -1,20 +1,21 @@
use std::fmt::{Debug, Display};
use nomos_core::sdp::{ActivityMetadata, DeclarationId, DeclarationMessage};
use nomos_sdp::SdpService;
use nomos_sdp::{SdpService, adapters::mempool::SdpMempoolAdapter};
use overwatch::{DynError, overwatch::OverwatchHandle};
pub async fn post_declaration_handler<RuntimeServiceId>(
pub async fn post_declaration_handler<MempoolAdapter, RuntimeServiceId>(
handle: OverwatchHandle<RuntimeServiceId>,
declaration: DeclarationMessage,
) -> Result<DeclarationId, DynError>
where
MempoolAdapter: SdpMempoolAdapter + Send + Sync + 'static,
RuntimeServiceId: Send
+ Sync
+ Debug
+ Display
+ 'static
+ overwatch::services::AsServiceId<SdpService<RuntimeServiceId>>,
+ overwatch::services::AsServiceId<SdpService<MempoolAdapter, RuntimeServiceId>>,
{
let relay = handle.relay().await?;
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
@ -30,17 +31,18 @@ where
reply_rx.await?
}
pub async fn post_activity_handler<RuntimeServiceId>(
pub async fn post_activity_handler<MempoolAdapter, RuntimeServiceId>(
handle: OverwatchHandle<RuntimeServiceId>,
metadata: ActivityMetadata,
) -> Result<(), DynError>
where
MempoolAdapter: SdpMempoolAdapter + Send + Sync + 'static,
RuntimeServiceId: Send
+ Sync
+ Debug
+ Display
+ 'static
+ overwatch::services::AsServiceId<SdpService<RuntimeServiceId>>,
+ overwatch::services::AsServiceId<SdpService<MempoolAdapter, RuntimeServiceId>>,
{
let relay = handle.relay().await?;
@ -52,17 +54,18 @@ where
Ok(())
}
pub async fn post_withdrawal_handler<RuntimeServiceId>(
pub async fn post_withdrawal_handler<MempoolAdapter, RuntimeServiceId>(
handle: OverwatchHandle<RuntimeServiceId>,
declaration_id: DeclarationId,
) -> Result<(), DynError>
where
MempoolAdapter: SdpMempoolAdapter + Send + Sync + 'static,
RuntimeServiceId: Send
+ Sync
+ Debug
+ Display
+ 'static
+ overwatch::services::AsServiceId<SdpService<RuntimeServiceId>>,
+ overwatch::services::AsServiceId<SdpService<MempoolAdapter, RuntimeServiceId>>,
{
let relay = handle.relay().await?;

View File

@ -4,7 +4,7 @@ use std::{
};
use async_trait::async_trait;
use nomos_sdp::{SdpMessage, SdpService};
use nomos_sdp::{SdpMessage, SdpService, adapters::mempool::SdpMempoolAdapter};
use overwatch::{
overwatch::OverwatchHandle,
services::{AsServiceId, relay::OutboundRelay},
@ -15,22 +15,29 @@ use crate::{
sdp::{SdpAdapter, SdpAdapterError},
};
pub struct SdpServiceAdapter<RuntimeServiceId> {
pub struct SdpServiceAdapter<MempoolAdapter, RuntimeServiceId> {
relay: OutboundRelay<SdpMessage>,
_phantom: PhantomData<RuntimeServiceId>,
_phantom: PhantomData<(RuntimeServiceId, MempoolAdapter)>,
}
#[async_trait]
impl<RuntimeServiceId> SdpAdapter<RuntimeServiceId> for SdpServiceAdapter<RuntimeServiceId>
impl<MempoolAdapter, RuntimeServiceId> SdpAdapter<RuntimeServiceId>
for SdpServiceAdapter<MempoolAdapter, RuntimeServiceId>
where
RuntimeServiceId:
AsServiceId<SdpService<RuntimeServiceId>> + Send + Sync + Debug + Display + 'static,
MempoolAdapter: SdpMempoolAdapter + Send + Sync + 'static,
RuntimeServiceId: AsServiceId<MempoolAdapter::MempoolService>
+ AsServiceId<SdpService<MempoolAdapter, RuntimeServiceId>>
+ Send
+ Sync
+ Debug
+ Display
+ 'static,
{
async fn new(
overwatch_handle: &OverwatchHandle<RuntimeServiceId>,
) -> Result<Self, SdpAdapterError> {
let relay = overwatch_handle
.relay::<SdpService<RuntimeServiceId>>()
.relay::<SdpService<MempoolAdapter, RuntimeServiceId>>()
.await
.map_err(|e| SdpAdapterError::Other(Box::new(e)))?;

View File

@ -14,6 +14,8 @@ futures = { default-features = false, version = "0.3" }
nomos-core = { workspace = true }
overwatch = { workspace = true }
serde = { default-features = false, features = ["derive"], version = "1.0" }
thiserror = { workspace = true }
tokio = { default-features = false, version = "1" }
tokio-stream = { default-features = false, version = "0.1" }
tracing = { workspace = true }
tx-service = { workspace = true }

View File

@ -1,26 +0,0 @@
use nomos_core::mantle::{SignedMantleTx, Transaction as _};
use overwatch::DynError;
use super::SdpMempoolAdapter;
pub struct MockMempoolAdapter;
#[async_trait::async_trait]
impl SdpMempoolAdapter for MockMempoolAdapter {
fn new() -> Self {
Self {}
}
async fn post_tx(&self, tx: SignedMantleTx) -> Result<(), DynError> {
// TODO: enable when membership service is deleted and we can depend on
// tx-service
// self.mempool_relay
// .send(MempoolMsg::Add {
// key: tx.mantle_tx.hash(),
// payload: tx,
// reply_channel,
// })
tracing::debug!("Mock: posting tx with hash {:?}", tx.hash());
Ok(())
}
}

View File

@ -1,11 +1,27 @@
pub mod mock;
pub mod sdp;
use nomos_core::mantle::SignedMantleTx;
use overwatch::DynError;
use overwatch::{
DynError,
services::{ServiceData, relay::OutboundRelay},
};
use tx_service::backend::MempoolError;
#[derive(thiserror::Error, Debug)]
pub enum MempoolAdapterError {
#[error("Mempool responded with and error: {0}")]
Mempool(#[from] MempoolError),
#[error("Channel receive error: {0}")]
ChannelRecv(#[from] tokio::sync::oneshot::error::RecvError),
#[error("Other mempool adapter error: {0}")]
Other(DynError),
}
#[async_trait::async_trait]
pub trait SdpMempoolAdapter {
fn new() -> Self;
type MempoolService: ServiceData;
type Tx;
async fn post_tx(&self, tx: SignedMantleTx) -> Result<(), DynError>;
fn new(outbound_relay: OutboundRelay<<Self::MempoolService as ServiceData>::Message>) -> Self;
async fn post_tx(&self, tx: Self::Tx) -> Result<(), MempoolAdapterError>;
}

View File

@ -0,0 +1,71 @@
use std::{fmt::Debug, marker::PhantomData};
use nomos_core::{
header::HeaderId,
mantle::{SignedMantleTx, Transaction as _, TxHash},
};
use overwatch::services::{ServiceData, relay::OutboundRelay};
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
use tx_service::{
MempoolMsg, TxMempoolService,
backend::{MemPool, RecoverableMempool},
network::NetworkAdapter as MempoolNetworkAdapter,
storage::MempoolStorageAdapter,
};
use super::{MempoolAdapterError, SdpMempoolAdapter};
type MempoolRelay<Item, Key> = OutboundRelay<MempoolMsg<HeaderId, Item, Item, Key>>;
pub struct SdpMempoolNetworkAdapter<MempoolNetAdapter, Mempool, RuntimeServiceId>
where
Mempool: MemPool<BlockId = HeaderId, Key = TxHash>,
MempoolNetAdapter: MempoolNetworkAdapter<RuntimeServiceId, Key = Mempool::Key>,
Mempool::Item: Clone + Eq + Debug + 'static,
Mempool::Key: Debug + 'static,
{
pub mempool_relay: MempoolRelay<Mempool::Item, Mempool::Key>,
_phantom: PhantomData<(MempoolNetAdapter, RuntimeServiceId)>,
}
#[async_trait::async_trait]
impl<MempoolNetAdapter, Mempool, RuntimeServiceId> SdpMempoolAdapter
for SdpMempoolNetworkAdapter<MempoolNetAdapter, Mempool, RuntimeServiceId>
where
Mempool:
RecoverableMempool<BlockId = HeaderId, Key = TxHash, Item = SignedMantleTx> + Send + Sync,
Mempool::RecoveryState: Serialize + for<'de> Deserialize<'de>,
Mempool::Settings: Clone + Send + Sync,
Mempool::Storage: MempoolStorageAdapter<RuntimeServiceId> + Send + Sync + Clone,
MempoolNetAdapter: MempoolNetworkAdapter<RuntimeServiceId, Payload = Mempool::Item, Key = Mempool::Key>
+ Send
+ Sync,
MempoolNetAdapter::Settings: Send + Sync,
RuntimeServiceId: Send + Sync,
{
type MempoolService =
TxMempoolService<MempoolNetAdapter, Mempool, Mempool::Storage, RuntimeServiceId>;
type Tx = SignedMantleTx;
fn new(mempool_relay: OutboundRelay<<Self::MempoolService as ServiceData>::Message>) -> Self {
Self {
mempool_relay,
_phantom: PhantomData,
}
}
async fn post_tx(&self, tx: Self::Tx) -> Result<(), MempoolAdapterError> {
let (reply_channel, receiver) = oneshot::channel();
self.mempool_relay
.send(MempoolMsg::Add {
key: tx.hash(),
payload: tx,
reply_channel,
})
.await
.map_err(|(e, _)| MempoolAdapterError::Other(Box::new(e)))?;
receiver.await?.map_err(MempoolAdapterError::Mempool)
}
}

View File

@ -1,12 +1,16 @@
pub mod adapters;
use std::{collections::BTreeSet, fmt::Display, pin::Pin};
use std::{
collections::BTreeSet,
fmt::{Debug, Display},
pin::Pin,
};
use async_trait::async_trait;
use futures::{Stream, StreamExt as _};
use nomos_core::{
block::BlockNumber,
mantle::{NoteId, keys::PublicKey, tx_builder::MantleTxBuilder},
mantle::{NoteId, SignedMantleTx, keys::PublicKey, tx_builder::MantleTxBuilder},
sdp::{
ActiveMessage, ActivityMetadata, DeclarationId, DeclarationMessage, Locator, ProviderId,
ServiceType, WithdrawMessage,
@ -24,7 +28,7 @@ use tokio::sync::{broadcast, oneshot};
use tokio_stream::wrappers::BroadcastStream;
use crate::adapters::{
mempool::{SdpMempoolAdapter as _, mock::MockMempoolAdapter},
mempool::SdpMempoolAdapter,
wallet::{SdpWalletAdapter as _, mock::MockWalletAdapter},
};
@ -85,14 +89,16 @@ pub enum SdpMessage {
},
}
pub struct SdpService<RuntimeServiceId> {
pub struct SdpService<MempoolAdapter, RuntimeServiceId> {
service_resources_handle: OpaqueServiceResourcesHandle<Self, RuntimeServiceId>,
finalized_update_tx: broadcast::Sender<BlockEvent>,
current_declaration: Option<Declaration>,
nonce: u64,
}
impl<RuntimeServiceId> ServiceData for SdpService<RuntimeServiceId> {
impl<MempoolAdapter, RuntimeServiceId> ServiceData
for SdpService<MempoolAdapter, RuntimeServiceId>
{
type Settings = SdpSettings;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
@ -100,9 +106,18 @@ impl<RuntimeServiceId> ServiceData for SdpService<RuntimeServiceId> {
}
#[async_trait]
impl<RuntimeServiceId> ServiceCore<RuntimeServiceId> for SdpService<RuntimeServiceId>
impl<MempoolAdapter, RuntimeServiceId> ServiceCore<RuntimeServiceId>
for SdpService<MempoolAdapter, RuntimeServiceId>
where
RuntimeServiceId: AsServiceId<Self> + Clone + Display + Send + Sync + 'static,
MempoolAdapter: SdpMempoolAdapter<Tx = SignedMantleTx> + Send + Sync + 'static,
RuntimeServiceId: Debug
+ AsServiceId<Self>
+ AsServiceId<MempoolAdapter::MempoolService>
+ Clone
+ Display
+ Send
+ Sync
+ 'static,
{
fn init(
service_resources_handle: OpaqueServiceResourcesHandle<Self, RuntimeServiceId>,
@ -131,7 +146,12 @@ where
);
let wallet_adapter = MockWalletAdapter::new();
let mempool_adapter = MockMempoolAdapter::new();
let mempool_relay = self
.service_resources_handle
.overwatch_handle
.relay::<MempoolAdapter::MempoolService>()
.await?;
let mempool_adapter = MempoolAdapter::new(mempool_relay);
while let Some(msg) = self.service_resources_handle.inbound_relay.recv().await {
match msg {
@ -173,15 +193,23 @@ where
}
}
impl<RuntimeServiceId> SdpService<RuntimeServiceId>
impl<MempoolAdapter, RuntimeServiceId> SdpService<MempoolAdapter, RuntimeServiceId>
where
RuntimeServiceId: AsServiceId<Self> + Clone + Display + Send + Sync + 'static,
MempoolAdapter: SdpMempoolAdapter<Tx = SignedMantleTx> + Send + Sync + 'static,
RuntimeServiceId: Debug
+ AsServiceId<Self>
+ AsServiceId<MempoolAdapter::MempoolService>
+ Clone
+ Display
+ Send
+ Sync
+ 'static,
{
async fn handle_post_declaration(
&self,
declaration: Box<DeclarationMessage>,
wallet_adapter: &MockWalletAdapter,
mempool_adapter: &MockMempoolAdapter,
mempool_adapter: &MempoolAdapter,
reply_channel: oneshot::Sender<Result<DeclarationId, DynError>>,
) {
let tx_builder = MantleTxBuilder::new();
@ -208,7 +236,7 @@ where
&mut self,
metadata: ActivityMetadata,
wallet_adapter: &MockWalletAdapter,
mempool_adapter: &MockMempoolAdapter,
mempool_adapter: &MempoolAdapter,
) {
// Check if we have a declaration_id
let Some(ref declaration) = self.current_declaration else {
@ -247,7 +275,7 @@ where
&mut self,
declaration_id: DeclarationId,
wallet_adapter: &MockWalletAdapter,
mempool_adapter: &MockMempoolAdapter,
mempool_adapter: &MempoolAdapter,
) {
if let Err(e) = self.validate_withdrawal(&declaration_id) {
tracing::error!("{}", e);