Add a second mempool for DA certificates (#443)

* Use more descriptive names for generic parameters

We're starting to have tens on generic parameters, if we don't use
descriptive names it'll be pretty hard to understand what they do.
This commit changes the names of the mempool parameters in
preparation for the insertion of the da certificates mempool to
distinguish it from cl transactions.

* Add mempool for da certificates

* Add separate certificates mempool to binary

* ignore clippy lints
This commit is contained in:
Giacomo Pasini 2023-10-02 10:38:05 +02:00 committed by GitHub
parent 36b3ccc043
commit d67d08e81d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 289 additions and 140 deletions

View File

@ -6,18 +6,24 @@ use libp2p::*;
use bytes::Bytes; use bytes::Bytes;
use http::StatusCode; use http::StatusCode;
use nomos_consensus::{CarnotInfo, ConsensusMsg}; use nomos_consensus::{CarnotInfo, ConsensusMsg};
use serde::de::DeserializeOwned;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tracing::error; use tracing::error;
// internal // internal
use nomos_core::tx::Transaction; use full_replication::{Blob, Certificate};
use nomos_core::wire;
use nomos_core::{
da::{blob, certificate::Certificate as _},
tx::Transaction,
};
use nomos_http::backends::axum::AxumBackend; use nomos_http::backends::axum::AxumBackend;
use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner}; use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner};
use nomos_http::http::{HttpMethod, HttpRequest, HttpResponse}; use nomos_http::http::{HttpMethod, HttpRequest, HttpResponse};
use nomos_mempool::backend::mockpool::MockPool; use nomos_mempool::backend::mockpool::MockPool;
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter; use nomos_mempool::network::adapters::libp2p::Libp2pAdapter;
use nomos_mempool::network::NetworkAdapter; use nomos_mempool::network::NetworkAdapter;
use nomos_mempool::Transaction as TxDiscriminant; use nomos_mempool::{Certificate as CertDiscriminant, Transaction as TxDiscriminant};
use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService}; use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService};
use nomos_network::backends::libp2p::Libp2p; use nomos_network::backends::libp2p::Libp2p;
use nomos_network::backends::NetworkBackend; use nomos_network::backends::NetworkBackend;
@ -48,11 +54,19 @@ pub fn carnot_info_bridge(
})) }))
} }
pub fn mempool_metrics_bridge( pub fn cl_mempool_metrics_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle, handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner { ) -> HttpBridgeRunner {
Box::new(Box::pin(async move { Box::new(Box::pin(async move {
get_handler!(handle, MempoolService<Libp2pAdapter<Tx, <Tx as Transaction>::Hash>, MockPool<Tx, <Tx as Transaction>::Hash>, TxDiscriminant>, "metrics" => handle_mempool_metrics_req) get_handler!(handle, MempoolService<Libp2pAdapter<Tx, <Tx as Transaction>::Hash>, MockPool<Tx, <Tx as Transaction>::Hash>, TxDiscriminant>, "cl_metrics" => handle_mempool_metrics_req)
}))
}
pub fn da_mempool_metrics_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
get_handler!(handle, MempoolService<Libp2pAdapter<Certificate, <Blob as blob::Blob>::Hash>, MockPool<Certificate, <Blob as blob::Blob>::Hash>, CertDiscriminant>, "da_metrics" => handle_mempool_metrics_req)
})) }))
} }
@ -89,7 +103,57 @@ where
res_tx, payload, .. res_tx, payload, ..
}) = http_request_channel.recv().await }) = http_request_channel.recv().await
{ {
if let Err(e) = handle_mempool_add_tx_req(&mempool_channel, res_tx, payload).await { if let Err(e) = handle_mempool_add_req(
&mempool_channel,
res_tx,
payload.unwrap_or_default(),
|tx| tx.hash(),
)
.await
{
error!(e);
}
}
Ok(())
}))
}
pub fn mempool_add_cert_bridge<N, A>(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner
where
N: NetworkBackend,
A: NetworkAdapter<Backend = N, Item = Certificate, Key = <Blob as blob::Blob>::Hash>
+ Send
+ Sync
+ 'static,
A::Settings: Send + Sync,
{
Box::new(Box::pin(async move {
let (mempool_channel, mut http_request_channel) = build_http_bridge::<
MempoolService<A, MockPool<Certificate, <Blob as blob::Blob>::Hash>, CertDiscriminant>,
AxumBackend,
_,
>(
handle.clone(),
HttpMethod::POST,
"addcert",
)
.await
.unwrap();
while let Some(HttpRequest {
res_tx, payload, ..
}) = http_request_channel.recv().await
{
if let Err(e) = handle_mempool_add_req(
&mempool_channel,
res_tx,
payload.unwrap_or_default(),
|cert| cert.blob(),
)
.await
{
error!(e); error!(e);
} }
} }
@ -114,8 +178,8 @@ async fn handle_carnot_info_req(
Ok(()) Ok(())
} }
async fn handle_mempool_metrics_req( async fn handle_mempool_metrics_req<K, V>(
mempool_channel: &OutboundRelay<MempoolMsg<Tx, <Tx as Transaction>::Hash>>, mempool_channel: &OutboundRelay<MempoolMsg<K, V>>,
res_tx: Sender<HttpResponse>, res_tx: Sender<HttpResponse>,
) -> Result<(), overwatch_rs::DynError> { ) -> Result<(), overwatch_rs::DynError> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
@ -139,42 +203,37 @@ async fn handle_mempool_metrics_req(
Ok(()) Ok(())
} }
pub(super) async fn handle_mempool_add_tx_req( pub(super) async fn handle_mempool_add_req<K, V>(
mempool_channel: &OutboundRelay<MempoolMsg<Tx, <Tx as Transaction>::Hash>>, mempool_channel: &OutboundRelay<MempoolMsg<K, V>>,
res_tx: Sender<HttpResponse>, res_tx: Sender<HttpResponse>,
payload: Option<Bytes>, wire_item: Bytes,
) -> Result<(), overwatch_rs::DynError> { key: impl Fn(&K) -> V,
if let Some(data) = payload ) -> Result<(), overwatch_rs::DynError>
.as_ref() where
.and_then(|b| String::from_utf8(b.to_vec()).ok()) K: DeserializeOwned,
{ {
let tx = Tx(data); let item = wire::deserialize::<K>(&wire_item)?;
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
mempool_channel let key = key(&item);
.send(MempoolMsg::Add { mempool_channel
item: tx.clone(), .send(MempoolMsg::Add {
key: tx.hash(), item,
reply_channel: sender, key,
}) reply_channel: sender,
.await })
.map_err(|(e, _)| e)?; .await
.map_err(|(e, _)| e)?;
match receiver.await { match receiver.await {
Ok(Ok(())) => Ok(res_tx.send(Ok(b"".to_vec().into())).await?), Ok(Ok(())) => Ok(res_tx.send(Ok(b"".to_vec().into())).await?),
Ok(Err(())) => Ok(res_tx Ok(Err(())) => Ok(res_tx
.send(Err(( .send(Err((
StatusCode::CONFLICT, StatusCode::CONFLICT,
"error: unable to add tx".into(), "error: unable to add tx".into(),
))) )))
.await?), .await?),
Err(err) => Ok(res_tx Err(err) => Ok(res_tx
.send(Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))) .send(Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string())))
.await?), .await?),
}
} else {
Err(
format!("Invalid payload, {payload:?}. Empty or couldn't transform into a utf8 String")
.into(),
)
} }
} }

View File

@ -10,7 +10,10 @@ use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsServic
use nomos_consensus::network::adapters::libp2p::Libp2pAdapter as ConsensusLibp2pAdapter; use nomos_consensus::network::adapters::libp2p::Libp2pAdapter as ConsensusLibp2pAdapter;
use nomos_consensus::CarnotConsensus; use nomos_consensus::CarnotConsensus;
use nomos_core::tx::Transaction; use nomos_core::{
da::{blob, certificate},
tx::Transaction,
};
use nomos_da::{ use nomos_da::{
backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaLibp2pAdapter, backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaLibp2pAdapter,
DataAvailabilityService, DataAvailabilityService,
@ -20,8 +23,10 @@ use nomos_http::bridge::HttpBridgeService;
use nomos_http::http::HttpService; use nomos_http::http::HttpService;
use nomos_log::Logger; use nomos_log::Logger;
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolLibp2pAdapter; use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolLibp2pAdapter;
use nomos_mempool::Transaction as TxDiscriminant; use nomos_mempool::{
use nomos_mempool::{backend::mockpool::MockPool, MempoolService}; backend::mockpool::MockPool, Certificate as CertDiscriminant, MempoolService,
Transaction as TxDiscriminant,
};
use nomos_network::backends::libp2p::Libp2p; use nomos_network::backends::libp2p::Libp2p;
use nomos_network::NetworkService; use nomos_network::NetworkService;
@ -35,14 +40,20 @@ use nomos_core::{
}; };
pub use tx::Tx; pub use tx::Tx;
pub const CL_TOPIC: &str = "cl";
pub const DA_TOPIC: &str = "da";
const MB16: usize = 1024 * 1024 * 16; const MB16: usize = 1024 * 1024 * 16;
pub type Carnot = CarnotConsensus< pub type Carnot = CarnotConsensus<
ConsensusLibp2pAdapter, ConsensusLibp2pAdapter,
MockPool<Tx, <Tx as Transaction>::Hash>, MockPool<Tx, <Tx as Transaction>::Hash>,
MempoolLibp2pAdapter<Tx, <Tx as Transaction>::Hash>, MempoolLibp2pAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<Certificate, <<Certificate as certificate::Certificate>::Blob as blob::Blob>::Hash>,
MempoolLibp2pAdapter<
Certificate,
<<Certificate as certificate::Certificate>::Blob as blob::Blob>::Hash,
>,
FlatOverlay<RoundRobin, RandomBeaconState>, FlatOverlay<RoundRobin, RandomBeaconState>,
Certificate,
FillSizeWithTx<MB16, Tx>, FillSizeWithTx<MB16, Tx>,
FillSizeWithBlobsCertificate<MB16, Certificate>, FillSizeWithBlobsCertificate<MB16, Certificate>,
>; >;
@ -53,17 +64,20 @@ type DataAvailability = DataAvailabilityService<
DaLibp2pAdapter<Blob, Attestation>, DaLibp2pAdapter<Blob, Attestation>,
>; >;
type Mempool = MempoolService< type Mempool<K, V, D> = MempoolService<MempoolLibp2pAdapter<K, V>, MockPool<K, V>, D>;
MempoolLibp2pAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<Tx, <Tx as Transaction>::Hash>,
TxDiscriminant,
>;
#[derive(Services)] #[derive(Services)]
pub struct Nomos { pub struct Nomos {
logging: ServiceHandle<Logger>, logging: ServiceHandle<Logger>,
network: ServiceHandle<NetworkService<Libp2p>>, network: ServiceHandle<NetworkService<Libp2p>>,
mockpool: ServiceHandle<Mempool>, cl_mempool: ServiceHandle<Mempool<Tx, <Tx as Transaction>::Hash, TxDiscriminant>>,
da_mempool: ServiceHandle<
Mempool<
Certificate,
<<Certificate as certificate::Certificate>::Blob as blob::Blob>::Hash,
CertDiscriminant,
>,
>,
consensus: ServiceHandle<Carnot>, consensus: ServiceHandle<Carnot>,
http: ServiceHandle<HttpService<AxumBackend>>, http: ServiceHandle<HttpService<AxumBackend>>,
bridges: ServiceHandle<HttpBridgeService>, bridges: ServiceHandle<HttpBridgeService>,

View File

@ -1,3 +1,4 @@
use full_replication::{Blob, Certificate};
use nomos_node::{ use nomos_node::{
Config, ConsensusArgs, HttpArgs, LogArgs, NetworkArgs, Nomos, NomosServiceSettings, Config, ConsensusArgs, HttpArgs, LogArgs, NetworkArgs, Nomos, NomosServiceSettings,
OverlayArgs, Tx, OverlayArgs, Tx,
@ -7,14 +8,16 @@ mod bridges;
use clap::Parser; use clap::Parser;
use color_eyre::eyre::{eyre, Result}; use color_eyre::eyre::{eyre, Result};
use nomos_core::{
da::{blob, certificate},
tx::Transaction,
};
use nomos_http::bridge::{HttpBridge, HttpBridgeSettings}; use nomos_http::bridge::{HttpBridge, HttpBridgeSettings};
use nomos_mempool::network::adapters::libp2p::{Libp2pAdapter, Settings as AdapterSettings}; use nomos_mempool::network::adapters::libp2p::{Libp2pAdapter, Settings as AdapterSettings};
use nomos_network::backends::libp2p::Libp2p; use nomos_network::backends::libp2p::Libp2p;
use overwatch_rs::overwatch::*; use overwatch_rs::overwatch::*;
use std::sync::Arc; use std::sync::Arc;
use nomos_core::tx::Transaction;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)] #[command(author, version, about, long_about = None)]
struct Args { struct Args {
@ -55,24 +58,40 @@ fn main() -> Result<()> {
let bridges: Vec<HttpBridge> = vec![ let bridges: Vec<HttpBridge> = vec![
Arc::new(Box::new(bridges::carnot_info_bridge)), Arc::new(Box::new(bridges::carnot_info_bridge)),
Arc::new(Box::new(bridges::mempool_metrics_bridge)), // Due to a limitation in the current api system, we can't connect a single endopint to multiple services
// which means we need two different paths for complete mempool metrics.
Arc::new(Box::new(bridges::cl_mempool_metrics_bridge)),
Arc::new(Box::new(bridges::da_mempool_metrics_bridge)),
Arc::new(Box::new(bridges::network_info_bridge)), Arc::new(Box::new(bridges::network_info_bridge)),
Arc::new(Box::new( Arc::new(Box::new(
bridges::mempool_add_tx_bridge::<Libp2p, Libp2pAdapter<Tx, <Tx as Transaction>::Hash>>, bridges::mempool_add_tx_bridge::<Libp2p, Libp2pAdapter<Tx, <Tx as Transaction>::Hash>>,
)), )),
Arc::new(Box::new(
bridges::mempool_add_cert_bridge::<
Libp2p,
Libp2pAdapter<Certificate, <Blob as blob::Blob>::Hash>,
>,
)),
]; ];
let app = OverwatchRunner::<Nomos>::run( let app = OverwatchRunner::<Nomos>::run(
NomosServiceSettings { NomosServiceSettings {
network: config.network, network: config.network,
logging: config.log, logging: config.log,
http: config.http, http: config.http,
mockpool: nomos_mempool::Settings { cl_mempool: nomos_mempool::Settings {
backend: (), backend: (),
network: AdapterSettings { network: AdapterSettings {
topic: String::from("tx"), topic: String::from(nomos_node::CL_TOPIC),
id: <Tx as Transaction>::hash, id: <Tx as Transaction>::hash,
}, },
}, },
da_mempool: nomos_mempool::Settings {
backend: (),
network: AdapterSettings {
topic: String::from(nomos_node::DA_TOPIC),
id: cert_id,
},
},
consensus: config.consensus, consensus: config.consensus,
bridges: HttpBridgeSettings { bridges }, bridges: HttpBridgeSettings { bridges },
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
@ -85,3 +104,8 @@ fn main() -> Result<()> {
app.wait_finished(); app.wait_finished();
Ok(()) Ok(())
} }
fn cert_id(cert: &Certificate) -> <Blob as blob::Blob>::Hash {
use certificate::Certificate;
cert.blob()
}

View File

@ -41,8 +41,8 @@ use nomos_core::da::certificate::{BlobCertificateSelect, Certificate};
use nomos_core::tx::{Transaction, TxSelect}; use nomos_core::tx::{Transaction, TxSelect};
use nomos_core::vote::Tally; use nomos_core::vote::Tally;
use nomos_mempool::{ use nomos_mempool::{
backend::MemPool, network::NetworkAdapter as MempoolAdapter, MempoolMsg, MempoolService, backend::MemPool, network::NetworkAdapter as MempoolAdapter, Certificate as CertDiscriminant,
Transaction as TxDiscriminant, MempoolMsg, MempoolService, Transaction as TxDiscriminant,
}; };
use nomos_network::NetworkService; use nomos_network::NetworkService;
use overwatch_rs::services::relay::{OutboundRelay, Relay, RelayMessage}; use overwatch_rs::services::relay::{OutboundRelay, Relay, RelayMessage};
@ -104,38 +104,46 @@ impl<O: Overlay, Ts, Bs> CarnotSettings<O, Ts, Bs> {
} }
} }
pub struct CarnotConsensus<A, P, M, O, C, TxS, BS> pub struct CarnotConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS>
where where
A: NetworkAdapter, A: NetworkAdapter,
M: MempoolAdapter<Item = P::Item, Key = P::Key>, ClPoolAdapter: MempoolAdapter<Item = ClPool::Item, Key = ClPool::Key>,
P: MemPool, ClPool: MemPool,
DaPool: MemPool,
DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key>,
O: Overlay + Debug, O: Overlay + Debug,
P::Item: Debug + 'static, ClPool::Item: Debug + 'static,
P::Key: Debug + 'static, ClPool::Key: Debug + 'static,
DaPool::Item: Debug + 'static,
DaPool::Key: Debug + 'static,
A::Backend: 'static, A::Backend: 'static,
TxS: TxSelect<Tx = P::Item>, TxS: TxSelect<Tx = ClPool::Item>,
BS: BlobCertificateSelect<Certificate = C>, BS: BlobCertificateSelect<Certificate = DaPool::Item>,
{ {
service_state: ServiceStateHandle<Self>, service_state: ServiceStateHandle<Self>,
// underlying networking backend. We need this so we can relay and check the types properly // underlying networking backend. We need this so we can relay and check the types properly
// when implementing ServiceCore for CarnotConsensus // when implementing ServiceCore for CarnotConsensus
network_relay: Relay<NetworkService<A::Backend>>, network_relay: Relay<NetworkService<A::Backend>>,
mempool_relay: Relay<MempoolService<M, P, TxDiscriminant>>, cl_mempool_relay: Relay<MempoolService<ClPoolAdapter, ClPool, TxDiscriminant>>,
da_mempool_relay: Relay<MempoolService<DaPoolAdapter, DaPool, CertDiscriminant>>,
_overlay: std::marker::PhantomData<O>, _overlay: std::marker::PhantomData<O>,
// this need to be substituted by some kind DA bo
_blob_certificate: std::marker::PhantomData<C>,
} }
impl<A, P, M, O, C, TxS, BS> ServiceData for CarnotConsensus<A, P, M, O, C, TxS, BS> impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS> ServiceData
for CarnotConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS>
where where
A: NetworkAdapter, A: NetworkAdapter,
P: MemPool, ClPool: MemPool,
P::Item: Debug, ClPool::Item: Debug,
P::Key: Debug, ClPool::Key: Debug,
M: MempoolAdapter<Item = P::Item, Key = P::Key>, DaPool: MemPool,
DaPool::Item: Debug,
DaPool::Key: Debug,
ClPoolAdapter: MempoolAdapter<Item = ClPool::Item, Key = ClPool::Key>,
DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key>,
O: Overlay + Debug, O: Overlay + Debug,
TxS: TxSelect<Tx = P::Item>, TxS: TxSelect<Tx = ClPool::Item>,
BS: BlobCertificateSelect<Certificate = C>, BS: BlobCertificateSelect<Certificate = DaPool::Item>,
{ {
const SERVICE_ID: ServiceId = "Carnot"; const SERVICE_ID: ServiceId = "Carnot";
type Settings = CarnotSettings<O, TxS::Settings, BS::Settings>; type Settings = CarnotSettings<O, TxS::Settings, BS::Settings>;
@ -145,12 +153,15 @@ where
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl<A, P, M, O, C, TxS, BS> ServiceCore for CarnotConsensus<A, P, M, O, C, TxS, BS> impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS> ServiceCore
for CarnotConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS>
where where
A: NetworkAdapter + Clone + Send + Sync + 'static, A: NetworkAdapter + Clone + Send + Sync + 'static,
P: MemPool + Send + Sync + 'static, ClPool: MemPool + Send + Sync + 'static,
P::Settings: Send + Sync + 'static, ClPool::Settings: Send + Sync + 'static,
P::Item: Transaction DaPool: MemPool + Send + Sync + 'static,
DaPool::Settings: Send + Sync + 'static,
ClPool::Item: Transaction
+ Debug + Debug
+ Clone + Clone
+ Eq + Eq
@ -160,7 +171,7 @@ where
+ Send + Send
+ Sync + Sync
+ 'static, + 'static,
C: Certificate DaPool::Item: Certificate
+ Debug + Debug
+ Clone + Clone
+ Eq + Eq
@ -170,25 +181,28 @@ where
+ Send + Send
+ Sync + Sync
+ 'static, + 'static,
P::Key: Debug + Send + Sync, ClPool::Key: Debug + Send + Sync,
M: MempoolAdapter<Item = P::Item, Key = P::Key> + Send + Sync + 'static, DaPool::Key: Debug + Send + Sync,
ClPoolAdapter: MempoolAdapter<Item = ClPool::Item, Key = ClPool::Key> + Send + Sync + 'static,
DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key> + Send + Sync + 'static,
O: Overlay + Debug + Send + Sync + 'static, O: Overlay + Debug + Send + Sync + 'static,
O::LeaderSelection: UpdateableLeaderSelection, O::LeaderSelection: UpdateableLeaderSelection,
O::CommitteeMembership: UpdateableCommitteeMembership, O::CommitteeMembership: UpdateableCommitteeMembership,
TxS: TxSelect<Tx = P::Item> + Clone + Send + Sync + 'static, TxS: TxSelect<Tx = ClPool::Item> + Clone + Send + Sync + 'static,
TxS::Settings: Send + Sync + 'static, TxS::Settings: Send + Sync + 'static,
BS: BlobCertificateSelect<Certificate = C> + Clone + Send + Sync + 'static, BS: BlobCertificateSelect<Certificate = DaPool::Item> + Clone + Send + Sync + 'static,
BS::Settings: Send + Sync + 'static, BS::Settings: Send + Sync + 'static,
{ {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> { fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay(); let network_relay = service_state.overwatch_handle.relay();
let mempool_relay = service_state.overwatch_handle.relay(); let cl_mempool_relay = service_state.overwatch_handle.relay();
let da_mempool_relay = service_state.overwatch_handle.relay();
Ok(Self { Ok(Self {
service_state, service_state,
network_relay, network_relay,
_overlay: Default::default(), _overlay: Default::default(),
_blob_certificate: Default::default(), cl_mempool_relay,
mempool_relay, da_mempool_relay,
}) })
} }
@ -199,8 +213,14 @@ where
.await .await
.expect("Relay connection with NetworkService should succeed"); .expect("Relay connection with NetworkService should succeed");
let mempool_relay: OutboundRelay<_> = self let cl_mempool_relay: OutboundRelay<_> = self
.mempool_relay .cl_mempool_relay
.connect()
.await
.expect("Relay connection with MemPoolService should succeed");
let da_mempool_relay: OutboundRelay<_> = self
.da_mempool_relay
.connect() .connect()
.await .await
.expect("Relay connection with MemPoolService should succeed"); .expect("Relay connection with MemPoolService should succeed");
@ -287,7 +307,8 @@ where
&mut task_manager, &mut task_manager,
adapter.clone(), adapter.clone(),
private_key, private_key,
mempool_relay.clone(), cl_mempool_relay.clone(),
da_mempool_relay.clone(),
tx_selector.clone(), tx_selector.clone(),
blob_selector.clone(), blob_selector.clone(),
timeout, timeout,
@ -314,12 +335,15 @@ enum Output<Tx: Clone + Eq + Hash, BlobCertificate: Clone + Eq + Hash> {
}, },
} }
impl<A, P, M, O, C, TxS, BS> CarnotConsensus<A, P, M, O, C, TxS, BS> impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS>
CarnotConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS>
where where
A: NetworkAdapter + Clone + Send + Sync + 'static, A: NetworkAdapter + Clone + Send + Sync + 'static,
P: MemPool + Send + Sync + 'static, ClPool: MemPool + Send + Sync + 'static,
P::Settings: Send + Sync + 'static, ClPool::Settings: Send + Sync + 'static,
P::Item: Transaction DaPool: MemPool + Send + Sync + 'static,
DaPool::Settings: Send + Sync + 'static,
ClPool::Item: Transaction
+ Debug + Debug
+ Clone + Clone
+ Eq + Eq
@ -329,7 +353,7 @@ where
+ Send + Send
+ Sync + Sync
+ 'static, + 'static,
C: Certificate DaPool::Item: Certificate
+ Debug + Debug
+ Clone + Clone
+ Eq + Eq
@ -342,14 +366,12 @@ where
O: Overlay + Debug + Send + Sync + 'static, O: Overlay + Debug + Send + Sync + 'static,
O::LeaderSelection: UpdateableLeaderSelection, O::LeaderSelection: UpdateableLeaderSelection,
O::CommitteeMembership: UpdateableCommitteeMembership, O::CommitteeMembership: UpdateableCommitteeMembership,
TxS: TxSelect<Tx = P::Item> + Clone + Send + Sync + 'static, TxS: TxSelect<Tx = ClPool::Item> + Clone + Send + Sync + 'static,
BS: BlobCertificateSelect<Certificate = C> + Clone + Send + Sync + 'static, BS: BlobCertificateSelect<Certificate = DaPool::Item> + Clone + Send + Sync + 'static,
P::Key: Debug + Send + Sync, ClPool::Key: Debug + Send + Sync,
M: MempoolAdapter<Item = P::Item, Key = P::Key> + Send + Sync + 'static, DaPool::Key: Debug + Send + Sync,
O: Overlay + Debug + Send + Sync + 'static, ClPoolAdapter: MempoolAdapter<Item = ClPool::Item, Key = ClPool::Key> + Send + Sync + 'static,
O::LeaderSelection: UpdateableLeaderSelection, DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key> + Send + Sync + 'static,
O::CommitteeMembership: UpdateableCommitteeMembership,
TxS: TxSelect<Tx = P::Item> + Clone + Send + Sync + 'static,
{ {
fn process_message(carnot: &Carnot<O>, msg: ConsensusMsg) { fn process_message(carnot: &Carnot<O>, msg: ConsensusMsg) {
match msg { match msg {
@ -373,11 +395,12 @@ where
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
async fn process_carnot_event( async fn process_carnot_event(
mut carnot: Carnot<O>, mut carnot: Carnot<O>,
event: Event<P::Item, C>, event: Event<ClPool::Item, DaPool::Item>,
task_manager: &mut TaskManager<View, Event<P::Item, C>>, task_manager: &mut TaskManager<View, Event<ClPool::Item, DaPool::Item>>,
adapter: A, adapter: A,
private_key: PrivateKey, private_key: PrivateKey,
mempool_relay: OutboundRelay<MempoolMsg<P::Item, P::Key>>, cl_mempool_relay: OutboundRelay<MempoolMsg<ClPool::Item, ClPool::Key>>,
da_mempool_relay: OutboundRelay<MempoolMsg<DaPool::Item, DaPool::Key>>,
tx_selector: TxS, tx_selector: TxS,
blobl_selector: BS, blobl_selector: BS,
timeout: Duration, timeout: Duration,
@ -393,7 +416,7 @@ where
tracing::debug!("approving proposal {:?}", block); tracing::debug!("approving proposal {:?}", block);
let (new_carnot, out) = carnot.approve_block(block); let (new_carnot, out) = carnot.approve_block(block);
carnot = new_carnot; carnot = new_carnot;
output = Some(Output::Send::<P::Item, C>(out)); output = Some(Output::Send::<ClPool::Item, DaPool::Item>(out));
} }
Event::LocalTimeout { view } => { Event::LocalTimeout { view } => {
tracing::debug!("local timeout"); tracing::debug!("local timeout");
@ -434,7 +457,8 @@ where
qc, qc,
tx_selector.clone(), tx_selector.clone(),
blobl_selector.clone(), blobl_selector.clone(),
mempool_relay, cl_mempool_relay,
da_mempool_relay,
) )
.await; .await;
} }
@ -460,14 +484,15 @@ where
carnot carnot
} }
#[allow(clippy::type_complexity)]
#[instrument(level = "debug", skip(adapter, task_manager, stream))] #[instrument(level = "debug", skip(adapter, task_manager, stream))]
async fn process_block( async fn process_block(
mut carnot: Carnot<O>, mut carnot: Carnot<O>,
block: Block<P::Item, C>, block: Block<ClPool::Item, DaPool::Item>,
mut stream: Pin<Box<dyn Stream<Item = Block<P::Item, C>> + Send>>, mut stream: Pin<Box<dyn Stream<Item = Block<ClPool::Item, DaPool::Item>> + Send>>,
task_manager: &mut TaskManager<View, Event<P::Item, C>>, task_manager: &mut TaskManager<View, Event<ClPool::Item, DaPool::Item>>,
adapter: A, adapter: A,
) -> (Carnot<O>, Option<Output<P::Item, C>>) { ) -> (Carnot<O>, Option<Output<ClPool::Item, DaPool::Item>>) {
tracing::debug!("received proposal {:?}", block); tracing::debug!("received proposal {:?}", block);
if carnot.highest_voted_view() >= block.header().view { if carnot.highest_voted_view() >= block.header().view {
tracing::debug!("already voted for view {}", block.header().view); tracing::debug!("already voted for view {}", block.header().view);
@ -538,14 +563,15 @@ where
(carnot, None) (carnot, None)
} }
#[allow(clippy::type_complexity)]
#[instrument(level = "debug", skip(task_manager, adapter))] #[instrument(level = "debug", skip(task_manager, adapter))]
async fn approve_new_view( async fn approve_new_view(
carnot: Carnot<O>, carnot: Carnot<O>,
timeout_qc: TimeoutQc, timeout_qc: TimeoutQc,
new_views: HashSet<NewView>, new_views: HashSet<NewView>,
task_manager: &mut TaskManager<View, Event<P::Item, C>>, task_manager: &mut TaskManager<View, Event<ClPool::Item, DaPool::Item>>,
adapter: A, adapter: A,
) -> (Carnot<O>, Option<Output<P::Item, C>>) { ) -> (Carnot<O>, Option<Output<ClPool::Item, DaPool::Item>>) {
let leader_committee = [carnot.id()].into_iter().collect(); let leader_committee = [carnot.id()].into_iter().collect();
let leader_tally_settings = CarnotTallySettings { let leader_tally_settings = CarnotTallySettings {
threshold: carnot.leader_super_majority_threshold(), threshold: carnot.leader_super_majority_threshold(),
@ -576,13 +602,14 @@ where
(new_carnot, Some(Output::Send(out))) (new_carnot, Some(Output::Send(out)))
} }
#[allow(clippy::type_complexity)]
#[instrument(level = "debug", skip(task_manager, adapter))] #[instrument(level = "debug", skip(task_manager, adapter))]
async fn receive_timeout_qc( async fn receive_timeout_qc(
carnot: Carnot<O>, carnot: Carnot<O>,
timeout_qc: TimeoutQc, timeout_qc: TimeoutQc,
task_manager: &mut TaskManager<View, Event<P::Item, C>>, task_manager: &mut TaskManager<View, Event<ClPool::Item, DaPool::Item>>,
adapter: A, adapter: A,
) -> (Carnot<O>, Option<Output<P::Item, C>>) { ) -> (Carnot<O>, Option<Output<ClPool::Item, DaPool::Item>>) {
let mut new_state = carnot.receive_timeout_qc(timeout_qc.clone()); let mut new_state = carnot.receive_timeout_qc(timeout_qc.clone());
let self_committee = carnot.self_committee(); let self_committee = carnot.self_committee();
let tally_settings = CarnotTallySettings { let tally_settings = CarnotTallySettings {
@ -603,11 +630,12 @@ where
(new_state, None) (new_state, None)
} }
#[allow(clippy::type_complexity)]
#[instrument(level = "debug")] #[instrument(level = "debug")]
async fn process_root_timeout( async fn process_root_timeout(
carnot: Carnot<O>, carnot: Carnot<O>,
timeouts: HashSet<Timeout>, timeouts: HashSet<Timeout>,
) -> (Carnot<O>, Option<Output<P::Item, C>>) { ) -> (Carnot<O>, Option<Output<ClPool::Item, DaPool::Item>>) {
// we might have received a timeout_qc sent by some other node and advanced the view // we might have received a timeout_qc sent by some other node and advanced the view
// already, in which case we should ignore the timeout // already, in which case we should ignore the timeout
if carnot.current_view() if carnot.current_view()
@ -643,7 +671,13 @@ where
#[instrument( #[instrument(
level = "debug", level = "debug",
skip(mempool_relay, private_key, tx_selector, blob_selector) skip(
cl_mempool_relay,
da_mempool_relay,
private_key,
tx_selector,
blob_selector
)
)] )]
async fn propose_block( async fn propose_block(
id: NodeId, id: NodeId,
@ -651,35 +685,31 @@ where
qc: Qc, qc: Qc,
tx_selector: TxS, tx_selector: TxS,
blob_selector: BS, blob_selector: BS,
mempool_relay: OutboundRelay<MempoolMsg<P::Item, P::Key>>, cl_mempool_relay: OutboundRelay<MempoolMsg<ClPool::Item, ClPool::Key>>,
) -> Option<Output<P::Item, C>> { da_mempool_relay: OutboundRelay<MempoolMsg<DaPool::Item, DaPool::Key>>,
let (reply_channel, rx) = tokio::sync::oneshot::channel(); ) -> Option<Output<ClPool::Item, DaPool::Item>> {
let mut output = None; let mut output = None;
mempool_relay let cl_txs = get_mempool_contents(cl_mempool_relay);
.send(MempoolMsg::View { let da_certs = get_mempool_contents(da_mempool_relay);
ancestor_hint: BlockId::zeros(),
reply_channel,
})
.await
.unwrap_or_else(|(e, _)| eprintln!("Could not get transactions from mempool {e}"));
match rx.await { match futures::join!(cl_txs, da_certs) {
Ok(txs) => { (Ok(cl_txs), Ok(da_certs)) => {
let beacon = RandomBeaconState::generate_happy(qc.view(), &private_key); let beacon = RandomBeaconState::generate_happy(qc.view(), &private_key);
let Ok(proposal) = BlockBuilder::new(tx_selector, blob_selector) let Ok(proposal) = BlockBuilder::new(tx_selector, blob_selector)
.with_view(qc.view().next()) .with_view(qc.view().next())
.with_parent_qc(qc) .with_parent_qc(qc)
.with_proposer(id) .with_proposer(id)
.with_beacon_state(beacon) .with_beacon_state(beacon)
.with_transactions(txs) .with_transactions(cl_txs)
.with_blobs_certificates([].into_iter()) .with_blobs_certificates(da_certs)
.build() .build()
else { else {
panic!("Proposal block should always succeed to be built") panic!("Proposal block should always succeed to be built")
}; };
output = Some(Output::BroadcastProposal { proposal }); output = Some(Output::BroadcastProposal { proposal });
} }
Err(e) => tracing::error!("Could not fetch txs {e}"), (Err(_), _) => tracing::error!("Could not fetch block cl transactions"),
(_, Err(_)) => tracing::error!("Could not fetch block da certificates"),
} }
output output
} }
@ -687,7 +717,7 @@ where
async fn process_view_change( async fn process_view_change(
carnot: Carnot<O>, carnot: Carnot<O>,
prev_view: View, prev_view: View,
task_manager: &mut TaskManager<View, Event<P::Item, C>>, task_manager: &mut TaskManager<View, Event<ClPool::Item, DaPool::Item>>,
adapter: A, adapter: A,
timeout: Duration, timeout: Duration,
) { ) {
@ -724,7 +754,10 @@ where
} }
} }
async fn gather_timeout_qc(adapter: A, view: consensus_engine::View) -> Event<P::Item, C> { async fn gather_timeout_qc(
adapter: A,
view: consensus_engine::View,
) -> Event<ClPool::Item, DaPool::Item> {
if let Some(timeout_qc) = adapter if let Some(timeout_qc) = adapter
.timeout_qc_stream(view) .timeout_qc_stream(view)
.await .await
@ -744,7 +777,7 @@ where
committee: Committee, committee: Committee,
block: consensus_engine::Block, block: consensus_engine::Block,
tally: CarnotTallySettings, tally: CarnotTallySettings,
) -> Event<P::Item, C> { ) -> Event<ClPool::Item, DaPool::Item> {
let tally = CarnotTally::new(tally); let tally = CarnotTally::new(tally);
let votes_stream = adapter.votes_stream(&committee, block.view, block.id).await; let votes_stream = adapter.votes_stream(&committee, block.view, block.id).await;
match tally.tally(block.clone(), votes_stream).await { match tally.tally(block.clone(), votes_stream).await {
@ -761,7 +794,7 @@ where
committee: Committee, committee: Committee,
timeout_qc: TimeoutQc, timeout_qc: TimeoutQc,
tally: CarnotTallySettings, tally: CarnotTallySettings,
) -> Event<P::Item, C> { ) -> Event<ClPool::Item, DaPool::Item> {
let tally = NewViewTally::new(tally); let tally = NewViewTally::new(tally);
let stream = adapter let stream = adapter
.new_view_stream(&committee, timeout_qc.view().next()) .new_view_stream(&committee, timeout_qc.view().next())
@ -783,7 +816,7 @@ where
committee: Committee, committee: Committee,
view: consensus_engine::View, view: consensus_engine::View,
tally: CarnotTallySettings, tally: CarnotTallySettings,
) -> Event<P::Item, C> { ) -> Event<ClPool::Item, DaPool::Item> {
let tally = TimeoutTally::new(tally); let tally = TimeoutTally::new(tally);
let stream = adapter.timeout_stream(&committee, view).await; let stream = adapter.timeout_stream(&committee, view).await;
match tally.tally(view, stream).await { match tally.tally(view, stream).await {
@ -795,7 +828,10 @@ where
} }
#[instrument(level = "debug", skip(adapter))] #[instrument(level = "debug", skip(adapter))]
async fn gather_block(adapter: A, view: consensus_engine::View) -> Event<P::Item, C> { async fn gather_block(
adapter: A,
view: consensus_engine::View,
) -> Event<ClPool::Item, DaPool::Item> {
let stream = adapter let stream = adapter
.proposal_chunks_stream(view) .proposal_chunks_stream(view)
.await .await
@ -1016,3 +1052,19 @@ mod tests {
assert_eq!(deserialized, info); assert_eq!(deserialized, info);
} }
} }
async fn get_mempool_contents<Item, Key>(
mempool: OutboundRelay<MempoolMsg<Item, Key>>,
) -> Result<Box<dyn Iterator<Item = Item> + Send>, tokio::sync::oneshot::error::RecvError> {
let (reply_channel, rx) = tokio::sync::oneshot::channel();
mempool
.send(MempoolMsg::View {
ancestor_hint: BlockId::zeros(),
reply_channel,
})
.await
.unwrap_or_else(|(e, _)| eprintln!("Could not get transactions from mempool {e}"));
rx.await
}