Add discriminant type to allow multiple mempool services (#441)
Overwatch requires all services to have a different service id. Unfortunately, such service id can't depend on generic parameters, which means that we can't have two instances of the mempool service even if they are instantiated with different types. This commit circuments this limitation by adding another type parameter.
This commit is contained in:
parent
f307e03ff4
commit
dda4a1365c
|
@ -17,6 +17,7 @@ use nomos_http::http::{HttpMethod, HttpRequest, HttpResponse};
|
||||||
use nomos_mempool::backend::mockpool::MockPool;
|
use nomos_mempool::backend::mockpool::MockPool;
|
||||||
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter;
|
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter;
|
||||||
use nomos_mempool::network::NetworkAdapter;
|
use nomos_mempool::network::NetworkAdapter;
|
||||||
|
use nomos_mempool::Transaction as TxDiscriminant;
|
||||||
use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService};
|
use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService};
|
||||||
use nomos_network::backends::libp2p::Libp2p;
|
use nomos_network::backends::libp2p::Libp2p;
|
||||||
use nomos_network::backends::NetworkBackend;
|
use nomos_network::backends::NetworkBackend;
|
||||||
|
@ -51,7 +52,7 @@ pub fn mempool_metrics_bridge(
|
||||||
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
|
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||||
) -> HttpBridgeRunner {
|
) -> HttpBridgeRunner {
|
||||||
Box::new(Box::pin(async move {
|
Box::new(Box::pin(async move {
|
||||||
get_handler!(handle, MempoolService<Libp2pAdapter<Tx, <Tx as Transaction>::Hash>, MockPool<Tx, <Tx as Transaction>::Hash>>, "metrics" => handle_mempool_metrics_req)
|
get_handler!(handle, MempoolService<Libp2pAdapter<Tx, <Tx as Transaction>::Hash>, MockPool<Tx, <Tx as Transaction>::Hash>, TxDiscriminant>, "metrics" => handle_mempool_metrics_req)
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -77,7 +78,7 @@ where
|
||||||
Box::new(Box::pin(async move {
|
Box::new(Box::pin(async move {
|
||||||
let (mempool_channel, mut http_request_channel) =
|
let (mempool_channel, mut http_request_channel) =
|
||||||
build_http_bridge::<
|
build_http_bridge::<
|
||||||
MempoolService<A, MockPool<Tx, <Tx as Transaction>::Hash>>,
|
MempoolService<A, MockPool<Tx, <Tx as Transaction>::Hash>, TxDiscriminant>,
|
||||||
AxumBackend,
|
AxumBackend,
|
||||||
_,
|
_,
|
||||||
>(handle.clone(), HttpMethod::POST, "addtx")
|
>(handle.clone(), HttpMethod::POST, "addtx")
|
||||||
|
|
|
@ -20,7 +20,7 @@ use nomos_http::bridge::HttpBridgeService;
|
||||||
use nomos_http::http::HttpService;
|
use nomos_http::http::HttpService;
|
||||||
use nomos_log::Logger;
|
use nomos_log::Logger;
|
||||||
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolLibp2pAdapter;
|
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolLibp2pAdapter;
|
||||||
|
use nomos_mempool::Transaction as TxDiscriminant;
|
||||||
use nomos_mempool::{backend::mockpool::MockPool, MempoolService};
|
use nomos_mempool::{backend::mockpool::MockPool, MempoolService};
|
||||||
use nomos_network::backends::libp2p::Libp2p;
|
use nomos_network::backends::libp2p::Libp2p;
|
||||||
|
|
||||||
|
@ -56,6 +56,7 @@ type DataAvailability = DataAvailabilityService<
|
||||||
type Mempool = MempoolService<
|
type Mempool = MempoolService<
|
||||||
MempoolLibp2pAdapter<Tx, <Tx as Transaction>::Hash>,
|
MempoolLibp2pAdapter<Tx, <Tx as Transaction>::Hash>,
|
||||||
MockPool<Tx, <Tx as Transaction>::Hash>,
|
MockPool<Tx, <Tx as Transaction>::Hash>,
|
||||||
|
TxDiscriminant,
|
||||||
>;
|
>;
|
||||||
|
|
||||||
#[derive(Services)]
|
#[derive(Services)]
|
||||||
|
|
|
@ -42,6 +42,7 @@ use nomos_core::tx::{Transaction, TxSelect};
|
||||||
use nomos_core::vote::Tally;
|
use nomos_core::vote::Tally;
|
||||||
use nomos_mempool::{
|
use nomos_mempool::{
|
||||||
backend::MemPool, network::NetworkAdapter as MempoolAdapter, MempoolMsg, MempoolService,
|
backend::MemPool, network::NetworkAdapter as MempoolAdapter, MempoolMsg, MempoolService,
|
||||||
|
Transaction as TxDiscriminant,
|
||||||
};
|
};
|
||||||
use nomos_network::NetworkService;
|
use nomos_network::NetworkService;
|
||||||
use overwatch_rs::services::relay::{OutboundRelay, Relay, RelayMessage};
|
use overwatch_rs::services::relay::{OutboundRelay, Relay, RelayMessage};
|
||||||
|
@ -119,7 +120,7 @@ where
|
||||||
// underlying networking backend. We need this so we can relay and check the types properly
|
// underlying networking backend. We need this so we can relay and check the types properly
|
||||||
// when implementing ServiceCore for CarnotConsensus
|
// when implementing ServiceCore for CarnotConsensus
|
||||||
network_relay: Relay<NetworkService<A::Backend>>,
|
network_relay: Relay<NetworkService<A::Backend>>,
|
||||||
mempool_relay: Relay<MempoolService<M, P>>,
|
mempool_relay: Relay<MempoolService<M, P, TxDiscriminant>>,
|
||||||
_overlay: std::marker::PhantomData<O>,
|
_overlay: std::marker::PhantomData<O>,
|
||||||
// this need to be substituted by some kind DA bo
|
// this need to be substituted by some kind DA bo
|
||||||
_blob_certificate: std::marker::PhantomData<C>,
|
_blob_certificate: std::marker::PhantomData<C>,
|
||||||
|
|
|
@ -2,7 +2,10 @@ pub mod backend;
|
||||||
pub mod network;
|
pub mod network;
|
||||||
|
|
||||||
// std
|
// std
|
||||||
use std::fmt::{Debug, Error, Formatter};
|
use std::{
|
||||||
|
fmt::{Debug, Error, Formatter},
|
||||||
|
marker::PhantomData,
|
||||||
|
};
|
||||||
|
|
||||||
// crates
|
// crates
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
|
@ -19,17 +22,23 @@ use overwatch_rs::services::{
|
||||||
ServiceCore, ServiceData, ServiceId,
|
ServiceCore, ServiceData, ServiceId,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct MempoolService<N, P>
|
pub struct MempoolService<N, P, D>
|
||||||
where
|
where
|
||||||
N: NetworkAdapter<Item = P::Item, Key = P::Key>,
|
N: NetworkAdapter<Item = P::Item, Key = P::Key>,
|
||||||
P: MemPool,
|
P: MemPool,
|
||||||
P::Settings: Clone,
|
P::Settings: Clone,
|
||||||
P::Item: Debug + 'static,
|
P::Item: Debug + 'static,
|
||||||
P::Key: Debug + 'static,
|
P::Key: Debug + 'static,
|
||||||
|
D: Discriminant,
|
||||||
{
|
{
|
||||||
service_state: ServiceStateHandle<Self>,
|
service_state: ServiceStateHandle<Self>,
|
||||||
network_relay: Relay<NetworkService<N::Backend>>,
|
network_relay: Relay<NetworkService<N::Backend>>,
|
||||||
pool: P,
|
pool: P,
|
||||||
|
// This is an hack because SERVICE_ID has to be univoque and associated const
|
||||||
|
// values can't depend on generic parameters.
|
||||||
|
// Unfortunately, this means that the mempools for certificates and transactions
|
||||||
|
// would have the same SERVICE_ID and break overwatch asumptions.
|
||||||
|
_d: PhantomData<D>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct MempoolMetrics {
|
pub struct MempoolMetrics {
|
||||||
|
@ -93,15 +102,31 @@ where
|
||||||
|
|
||||||
impl<Item: 'static, Key: 'static> RelayMessage for MempoolMsg<Item, Key> {}
|
impl<Item: 'static, Key: 'static> RelayMessage for MempoolMsg<Item, Key> {}
|
||||||
|
|
||||||
impl<N, P> ServiceData for MempoolService<N, P>
|
pub struct Transaction;
|
||||||
|
pub struct Certificate;
|
||||||
|
|
||||||
|
pub trait Discriminant {
|
||||||
|
const ID: &'static str;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Discriminant for Transaction {
|
||||||
|
const ID: &'static str = "mempool-txs";
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Discriminant for Certificate {
|
||||||
|
const ID: &'static str = "mempool-certs";
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<N, P, D> ServiceData for MempoolService<N, P, D>
|
||||||
where
|
where
|
||||||
N: NetworkAdapter<Item = P::Item, Key = P::Key>,
|
N: NetworkAdapter<Item = P::Item, Key = P::Key>,
|
||||||
P: MemPool,
|
P: MemPool,
|
||||||
P::Settings: Clone,
|
P::Settings: Clone,
|
||||||
P::Item: Debug + 'static,
|
P::Item: Debug + 'static,
|
||||||
P::Key: Debug + 'static,
|
P::Key: Debug + 'static,
|
||||||
|
D: Discriminant,
|
||||||
{
|
{
|
||||||
const SERVICE_ID: ServiceId = "Mempool";
|
const SERVICE_ID: ServiceId = D::ID;
|
||||||
type Settings = Settings<P::Settings, N::Settings>;
|
type Settings = Settings<P::Settings, N::Settings>;
|
||||||
type State = NoState<Self::Settings>;
|
type State = NoState<Self::Settings>;
|
||||||
type StateOperator = NoOperator<Self::State>;
|
type StateOperator = NoOperator<Self::State>;
|
||||||
|
@ -109,7 +134,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl<N, P> ServiceCore for MempoolService<N, P>
|
impl<N, P, D> ServiceCore for MempoolService<N, P, D>
|
||||||
where
|
where
|
||||||
P: MemPool + Send + 'static,
|
P: MemPool + Send + 'static,
|
||||||
P::Settings: Clone + Send + Sync + 'static,
|
P::Settings: Clone + Send + Sync + 'static,
|
||||||
|
@ -117,6 +142,7 @@ where
|
||||||
P::Item: Clone + Debug + Send + Sync + 'static,
|
P::Item: Clone + Debug + Send + Sync + 'static,
|
||||||
P::Key: Debug + Send + Sync + 'static,
|
P::Key: Debug + Send + Sync + 'static,
|
||||||
N: NetworkAdapter<Item = P::Item, Key = P::Key> + Send + Sync + 'static,
|
N: NetworkAdapter<Item = P::Item, Key = P::Key> + Send + Sync + 'static,
|
||||||
|
D: Discriminant + Send,
|
||||||
{
|
{
|
||||||
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
||||||
let network_relay = service_state.overwatch_handle.relay();
|
let network_relay = service_state.overwatch_handle.relay();
|
||||||
|
@ -125,6 +151,7 @@ where
|
||||||
service_state,
|
service_state,
|
||||||
network_relay,
|
network_relay,
|
||||||
pool: P::new(settings.backend),
|
pool: P::new(settings.backend),
|
||||||
|
_d: PhantomData,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -133,6 +160,7 @@ where
|
||||||
mut service_state,
|
mut service_state,
|
||||||
network_relay,
|
network_relay,
|
||||||
mut pool,
|
mut pool,
|
||||||
|
..
|
||||||
} = self;
|
} = self;
|
||||||
|
|
||||||
let network_relay: OutboundRelay<_> = network_relay
|
let network_relay: OutboundRelay<_> = network_relay
|
||||||
|
|
|
@ -13,7 +13,7 @@ use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle};
|
||||||
use nomos_mempool::{
|
use nomos_mempool::{
|
||||||
backend::mockpool::MockPool,
|
backend::mockpool::MockPool,
|
||||||
network::adapters::mock::{MockAdapter, MOCK_TX_CONTENT_TOPIC},
|
network::adapters::mock::{MockAdapter, MOCK_TX_CONTENT_TOPIC},
|
||||||
MempoolMsg, MempoolService, Settings,
|
MempoolMsg, MempoolService, Settings, Transaction,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Services)]
|
#[derive(Services)]
|
||||||
|
@ -21,7 +21,7 @@ struct MockPoolNode {
|
||||||
logging: ServiceHandle<Logger>,
|
logging: ServiceHandle<Logger>,
|
||||||
network: ServiceHandle<NetworkService<Mock>>,
|
network: ServiceHandle<NetworkService<Mock>>,
|
||||||
mockpool: ServiceHandle<
|
mockpool: ServiceHandle<
|
||||||
MempoolService<MockAdapter, MockPool<MockTransaction<MockMessage>, MockTxId>>,
|
MempoolService<MockAdapter, MockPool<MockTransaction<MockMessage>, MockTxId>, Transaction>,
|
||||||
>,
|
>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,9 +73,11 @@ fn test_mockmempool() {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let network = app.handle().relay::<NetworkService<Mock>>();
|
let network = app.handle().relay::<NetworkService<Mock>>();
|
||||||
let mempool = app
|
let mempool = app.handle().relay::<MempoolService<
|
||||||
.handle()
|
MockAdapter,
|
||||||
.relay::<MempoolService<MockAdapter, MockPool<MockTransaction<MockMessage>, MockTxId>>>();
|
MockPool<MockTransaction<MockMessage>, MockTxId>,
|
||||||
|
Transaction,
|
||||||
|
>>();
|
||||||
|
|
||||||
app.spawn(async move {
|
app.spawn(async move {
|
||||||
let network_outbound = network.connect().await.unwrap();
|
let network_outbound = network.connect().await.unwrap();
|
||||||
|
|
Loading…
Reference in New Issue