From d67d08e81d968d69783828153d62d8dbc98866df Mon Sep 17 00:00:00 2001 From: Giacomo Pasini <21265557+zeegomo@users.noreply.github.com> Date: Mon, 2 Oct 2023 10:38:05 +0200 Subject: [PATCH] Add a second mempool for DA certificates (#443) * Use more descriptive names for generic parameters We're starting to have tens on generic parameters, if we don't use descriptive names it'll be pretty hard to understand what they do. This commit changes the names of the mempool parameters in preparation for the insertion of the da certificates mempool to distinguish it from cl transactions. * Add mempool for da certificates * Add separate certificates mempool to binary * ignore clippy lints --- nodes/nomos-node/src/bridges/mod.rs | 143 ++++++++++++------ nodes/nomos-node/src/lib.rs | 34 +++-- nodes/nomos-node/src/main.rs | 34 ++++- nomos-services/consensus/src/lib.rs | 218 +++++++++++++++++----------- 4 files changed, 289 insertions(+), 140 deletions(-) diff --git a/nodes/nomos-node/src/bridges/mod.rs b/nodes/nomos-node/src/bridges/mod.rs index 2dbc46b7..26707427 100644 --- a/nodes/nomos-node/src/bridges/mod.rs +++ b/nodes/nomos-node/src/bridges/mod.rs @@ -6,18 +6,24 @@ use libp2p::*; use bytes::Bytes; use http::StatusCode; use nomos_consensus::{CarnotInfo, ConsensusMsg}; +use serde::de::DeserializeOwned; use tokio::sync::mpsc::Sender; use tokio::sync::oneshot; use tracing::error; // internal -use nomos_core::tx::Transaction; +use full_replication::{Blob, Certificate}; +use nomos_core::wire; +use nomos_core::{ + da::{blob, certificate::Certificate as _}, + tx::Transaction, +}; use nomos_http::backends::axum::AxumBackend; use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner}; use nomos_http::http::{HttpMethod, HttpRequest, HttpResponse}; use nomos_mempool::backend::mockpool::MockPool; use nomos_mempool::network::adapters::libp2p::Libp2pAdapter; use nomos_mempool::network::NetworkAdapter; -use nomos_mempool::Transaction as TxDiscriminant; +use nomos_mempool::{Certificate as CertDiscriminant, Transaction as TxDiscriminant}; use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService}; use nomos_network::backends::libp2p::Libp2p; use nomos_network::backends::NetworkBackend; @@ -48,11 +54,19 @@ pub fn carnot_info_bridge( })) } -pub fn mempool_metrics_bridge( +pub fn cl_mempool_metrics_bridge( handle: overwatch_rs::overwatch::handle::OverwatchHandle, ) -> HttpBridgeRunner { Box::new(Box::pin(async move { - get_handler!(handle, MempoolService::Hash>, MockPool::Hash>, TxDiscriminant>, "metrics" => handle_mempool_metrics_req) + get_handler!(handle, MempoolService::Hash>, MockPool::Hash>, TxDiscriminant>, "cl_metrics" => handle_mempool_metrics_req) + })) +} + +pub fn da_mempool_metrics_bridge( + handle: overwatch_rs::overwatch::handle::OverwatchHandle, +) -> HttpBridgeRunner { + Box::new(Box::pin(async move { + get_handler!(handle, MempoolService::Hash>, MockPool::Hash>, CertDiscriminant>, "da_metrics" => handle_mempool_metrics_req) })) } @@ -89,7 +103,57 @@ where res_tx, payload, .. }) = http_request_channel.recv().await { - if let Err(e) = handle_mempool_add_tx_req(&mempool_channel, res_tx, payload).await { + if let Err(e) = handle_mempool_add_req( + &mempool_channel, + res_tx, + payload.unwrap_or_default(), + |tx| tx.hash(), + ) + .await + { + error!(e); + } + } + Ok(()) + })) +} + +pub fn mempool_add_cert_bridge( + handle: overwatch_rs::overwatch::handle::OverwatchHandle, +) -> HttpBridgeRunner +where + N: NetworkBackend, + A: NetworkAdapter::Hash> + + Send + + Sync + + 'static, + A::Settings: Send + Sync, +{ + Box::new(Box::pin(async move { + let (mempool_channel, mut http_request_channel) = build_http_bridge::< + MempoolService::Hash>, CertDiscriminant>, + AxumBackend, + _, + >( + handle.clone(), + HttpMethod::POST, + "addcert", + ) + .await + .unwrap(); + + while let Some(HttpRequest { + res_tx, payload, .. + }) = http_request_channel.recv().await + { + if let Err(e) = handle_mempool_add_req( + &mempool_channel, + res_tx, + payload.unwrap_or_default(), + |cert| cert.blob(), + ) + .await + { error!(e); } } @@ -114,8 +178,8 @@ async fn handle_carnot_info_req( Ok(()) } -async fn handle_mempool_metrics_req( - mempool_channel: &OutboundRelay::Hash>>, +async fn handle_mempool_metrics_req( + mempool_channel: &OutboundRelay>, res_tx: Sender, ) -> Result<(), overwatch_rs::DynError> { let (sender, receiver) = oneshot::channel(); @@ -139,42 +203,37 @@ async fn handle_mempool_metrics_req( Ok(()) } -pub(super) async fn handle_mempool_add_tx_req( - mempool_channel: &OutboundRelay::Hash>>, +pub(super) async fn handle_mempool_add_req( + mempool_channel: &OutboundRelay>, res_tx: Sender, - payload: Option, -) -> Result<(), overwatch_rs::DynError> { - if let Some(data) = payload - .as_ref() - .and_then(|b| String::from_utf8(b.to_vec()).ok()) - { - let tx = Tx(data); - let (sender, receiver) = oneshot::channel(); - mempool_channel - .send(MempoolMsg::Add { - item: tx.clone(), - key: tx.hash(), - reply_channel: sender, - }) - .await - .map_err(|(e, _)| e)?; + wire_item: Bytes, + key: impl Fn(&K) -> V, +) -> Result<(), overwatch_rs::DynError> +where + K: DeserializeOwned, +{ + let item = wire::deserialize::(&wire_item)?; + let (sender, receiver) = oneshot::channel(); + let key = key(&item); + mempool_channel + .send(MempoolMsg::Add { + item, + key, + reply_channel: sender, + }) + .await + .map_err(|(e, _)| e)?; - match receiver.await { - Ok(Ok(())) => Ok(res_tx.send(Ok(b"".to_vec().into())).await?), - Ok(Err(())) => Ok(res_tx - .send(Err(( - StatusCode::CONFLICT, - "error: unable to add tx".into(), - ))) - .await?), - Err(err) => Ok(res_tx - .send(Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))) - .await?), - } - } else { - Err( - format!("Invalid payload, {payload:?}. Empty or couldn't transform into a utf8 String") - .into(), - ) + match receiver.await { + Ok(Ok(())) => Ok(res_tx.send(Ok(b"".to_vec().into())).await?), + Ok(Err(())) => Ok(res_tx + .send(Err(( + StatusCode::CONFLICT, + "error: unable to add tx".into(), + ))) + .await?), + Err(err) => Ok(res_tx + .send(Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))) + .await?), } } diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index 11142ca9..1dd68c31 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -10,7 +10,10 @@ use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsServic use nomos_consensus::network::adapters::libp2p::Libp2pAdapter as ConsensusLibp2pAdapter; use nomos_consensus::CarnotConsensus; -use nomos_core::tx::Transaction; +use nomos_core::{ + da::{blob, certificate}, + tx::Transaction, +}; use nomos_da::{ backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaLibp2pAdapter, DataAvailabilityService, @@ -20,8 +23,10 @@ use nomos_http::bridge::HttpBridgeService; use nomos_http::http::HttpService; use nomos_log::Logger; use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolLibp2pAdapter; -use nomos_mempool::Transaction as TxDiscriminant; -use nomos_mempool::{backend::mockpool::MockPool, MempoolService}; +use nomos_mempool::{ + backend::mockpool::MockPool, Certificate as CertDiscriminant, MempoolService, + Transaction as TxDiscriminant, +}; use nomos_network::backends::libp2p::Libp2p; use nomos_network::NetworkService; @@ -35,14 +40,20 @@ use nomos_core::{ }; pub use tx::Tx; +pub const CL_TOPIC: &str = "cl"; +pub const DA_TOPIC: &str = "da"; const MB16: usize = 1024 * 1024 * 16; pub type Carnot = CarnotConsensus< ConsensusLibp2pAdapter, MockPool::Hash>, MempoolLibp2pAdapter::Hash>, + MockPool::Blob as blob::Blob>::Hash>, + MempoolLibp2pAdapter< + Certificate, + <::Blob as blob::Blob>::Hash, + >, FlatOverlay, - Certificate, FillSizeWithTx, FillSizeWithBlobsCertificate, >; @@ -53,17 +64,20 @@ type DataAvailability = DataAvailabilityService< DaLibp2pAdapter, >; -type Mempool = MempoolService< - MempoolLibp2pAdapter::Hash>, - MockPool::Hash>, - TxDiscriminant, ->; +type Mempool = MempoolService, MockPool, D>; #[derive(Services)] pub struct Nomos { logging: ServiceHandle, network: ServiceHandle>, - mockpool: ServiceHandle, + cl_mempool: ServiceHandle::Hash, TxDiscriminant>>, + da_mempool: ServiceHandle< + Mempool< + Certificate, + <::Blob as blob::Blob>::Hash, + CertDiscriminant, + >, + >, consensus: ServiceHandle, http: ServiceHandle>, bridges: ServiceHandle, diff --git a/nodes/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs index a3307db6..374645a3 100644 --- a/nodes/nomos-node/src/main.rs +++ b/nodes/nomos-node/src/main.rs @@ -1,3 +1,4 @@ +use full_replication::{Blob, Certificate}; use nomos_node::{ Config, ConsensusArgs, HttpArgs, LogArgs, NetworkArgs, Nomos, NomosServiceSettings, OverlayArgs, Tx, @@ -7,14 +8,16 @@ mod bridges; use clap::Parser; use color_eyre::eyre::{eyre, Result}; +use nomos_core::{ + da::{blob, certificate}, + tx::Transaction, +}; use nomos_http::bridge::{HttpBridge, HttpBridgeSettings}; use nomos_mempool::network::adapters::libp2p::{Libp2pAdapter, Settings as AdapterSettings}; use nomos_network::backends::libp2p::Libp2p; use overwatch_rs::overwatch::*; use std::sync::Arc; -use nomos_core::tx::Transaction; - #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { @@ -55,24 +58,40 @@ fn main() -> Result<()> { let bridges: Vec = vec![ Arc::new(Box::new(bridges::carnot_info_bridge)), - Arc::new(Box::new(bridges::mempool_metrics_bridge)), + // Due to a limitation in the current api system, we can't connect a single endopint to multiple services + // which means we need two different paths for complete mempool metrics. + Arc::new(Box::new(bridges::cl_mempool_metrics_bridge)), + Arc::new(Box::new(bridges::da_mempool_metrics_bridge)), Arc::new(Box::new(bridges::network_info_bridge)), Arc::new(Box::new( bridges::mempool_add_tx_bridge::::Hash>>, )), + Arc::new(Box::new( + bridges::mempool_add_cert_bridge::< + Libp2p, + Libp2pAdapter::Hash>, + >, + )), ]; let app = OverwatchRunner::::run( NomosServiceSettings { network: config.network, logging: config.log, http: config.http, - mockpool: nomos_mempool::Settings { + cl_mempool: nomos_mempool::Settings { backend: (), network: AdapterSettings { - topic: String::from("tx"), + topic: String::from(nomos_node::CL_TOPIC), id: ::hash, }, }, + da_mempool: nomos_mempool::Settings { + backend: (), + network: AdapterSettings { + topic: String::from(nomos_node::DA_TOPIC), + id: cert_id, + }, + }, consensus: config.consensus, bridges: HttpBridgeSettings { bridges }, #[cfg(feature = "metrics")] @@ -85,3 +104,8 @@ fn main() -> Result<()> { app.wait_finished(); Ok(()) } + +fn cert_id(cert: &Certificate) -> ::Hash { + use certificate::Certificate; + cert.blob() +} diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 111062cb..4d540392 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -41,8 +41,8 @@ use nomos_core::da::certificate::{BlobCertificateSelect, Certificate}; use nomos_core::tx::{Transaction, TxSelect}; use nomos_core::vote::Tally; use nomos_mempool::{ - backend::MemPool, network::NetworkAdapter as MempoolAdapter, MempoolMsg, MempoolService, - Transaction as TxDiscriminant, + backend::MemPool, network::NetworkAdapter as MempoolAdapter, Certificate as CertDiscriminant, + MempoolMsg, MempoolService, Transaction as TxDiscriminant, }; use nomos_network::NetworkService; use overwatch_rs::services::relay::{OutboundRelay, Relay, RelayMessage}; @@ -104,38 +104,46 @@ impl CarnotSettings { } } -pub struct CarnotConsensus +pub struct CarnotConsensus where A: NetworkAdapter, - M: MempoolAdapter, - P: MemPool, + ClPoolAdapter: MempoolAdapter, + ClPool: MemPool, + DaPool: MemPool, + DaPoolAdapter: MempoolAdapter, O: Overlay + Debug, - P::Item: Debug + 'static, - P::Key: Debug + 'static, + ClPool::Item: Debug + 'static, + ClPool::Key: Debug + 'static, + DaPool::Item: Debug + 'static, + DaPool::Key: Debug + 'static, A::Backend: 'static, - TxS: TxSelect, - BS: BlobCertificateSelect, + TxS: TxSelect, + BS: BlobCertificateSelect, { service_state: ServiceStateHandle, // underlying networking backend. We need this so we can relay and check the types properly // when implementing ServiceCore for CarnotConsensus network_relay: Relay>, - mempool_relay: Relay>, + cl_mempool_relay: Relay>, + da_mempool_relay: Relay>, _overlay: std::marker::PhantomData, - // this need to be substituted by some kind DA bo - _blob_certificate: std::marker::PhantomData, } -impl ServiceData for CarnotConsensus +impl ServiceData + for CarnotConsensus where A: NetworkAdapter, - P: MemPool, - P::Item: Debug, - P::Key: Debug, - M: MempoolAdapter, + ClPool: MemPool, + ClPool::Item: Debug, + ClPool::Key: Debug, + DaPool: MemPool, + DaPool::Item: Debug, + DaPool::Key: Debug, + ClPoolAdapter: MempoolAdapter, + DaPoolAdapter: MempoolAdapter, O: Overlay + Debug, - TxS: TxSelect, - BS: BlobCertificateSelect, + TxS: TxSelect, + BS: BlobCertificateSelect, { const SERVICE_ID: ServiceId = "Carnot"; type Settings = CarnotSettings; @@ -145,12 +153,15 @@ where } #[async_trait::async_trait] -impl ServiceCore for CarnotConsensus +impl ServiceCore + for CarnotConsensus where A: NetworkAdapter + Clone + Send + Sync + 'static, - P: MemPool + Send + Sync + 'static, - P::Settings: Send + Sync + 'static, - P::Item: Transaction + ClPool: MemPool + Send + Sync + 'static, + ClPool::Settings: Send + Sync + 'static, + DaPool: MemPool + Send + Sync + 'static, + DaPool::Settings: Send + Sync + 'static, + ClPool::Item: Transaction + Debug + Clone + Eq @@ -160,7 +171,7 @@ where + Send + Sync + 'static, - C: Certificate + DaPool::Item: Certificate + Debug + Clone + Eq @@ -170,25 +181,28 @@ where + Send + Sync + 'static, - P::Key: Debug + Send + Sync, - M: MempoolAdapter + Send + Sync + 'static, + ClPool::Key: Debug + Send + Sync, + DaPool::Key: Debug + Send + Sync, + ClPoolAdapter: MempoolAdapter + Send + Sync + 'static, + DaPoolAdapter: MempoolAdapter + Send + Sync + 'static, O: Overlay + Debug + Send + Sync + 'static, O::LeaderSelection: UpdateableLeaderSelection, O::CommitteeMembership: UpdateableCommitteeMembership, - TxS: TxSelect + Clone + Send + Sync + 'static, + TxS: TxSelect + Clone + Send + Sync + 'static, TxS::Settings: Send + Sync + 'static, - BS: BlobCertificateSelect + Clone + Send + Sync + 'static, + BS: BlobCertificateSelect + Clone + Send + Sync + 'static, BS::Settings: Send + Sync + 'static, { fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); - let mempool_relay = service_state.overwatch_handle.relay(); + let cl_mempool_relay = service_state.overwatch_handle.relay(); + let da_mempool_relay = service_state.overwatch_handle.relay(); Ok(Self { service_state, network_relay, _overlay: Default::default(), - _blob_certificate: Default::default(), - mempool_relay, + cl_mempool_relay, + da_mempool_relay, }) } @@ -199,8 +213,14 @@ where .await .expect("Relay connection with NetworkService should succeed"); - let mempool_relay: OutboundRelay<_> = self - .mempool_relay + let cl_mempool_relay: OutboundRelay<_> = self + .cl_mempool_relay + .connect() + .await + .expect("Relay connection with MemPoolService should succeed"); + + let da_mempool_relay: OutboundRelay<_> = self + .da_mempool_relay .connect() .await .expect("Relay connection with MemPoolService should succeed"); @@ -287,7 +307,8 @@ where &mut task_manager, adapter.clone(), private_key, - mempool_relay.clone(), + cl_mempool_relay.clone(), + da_mempool_relay.clone(), tx_selector.clone(), blob_selector.clone(), timeout, @@ -314,12 +335,15 @@ enum Output { }, } -impl CarnotConsensus +impl + CarnotConsensus where A: NetworkAdapter + Clone + Send + Sync + 'static, - P: MemPool + Send + Sync + 'static, - P::Settings: Send + Sync + 'static, - P::Item: Transaction + ClPool: MemPool + Send + Sync + 'static, + ClPool::Settings: Send + Sync + 'static, + DaPool: MemPool + Send + Sync + 'static, + DaPool::Settings: Send + Sync + 'static, + ClPool::Item: Transaction + Debug + Clone + Eq @@ -329,7 +353,7 @@ where + Send + Sync + 'static, - C: Certificate + DaPool::Item: Certificate + Debug + Clone + Eq @@ -342,14 +366,12 @@ where O: Overlay + Debug + Send + Sync + 'static, O::LeaderSelection: UpdateableLeaderSelection, O::CommitteeMembership: UpdateableCommitteeMembership, - TxS: TxSelect + Clone + Send + Sync + 'static, - BS: BlobCertificateSelect + Clone + Send + Sync + 'static, - P::Key: Debug + Send + Sync, - M: MempoolAdapter + Send + Sync + 'static, - O: Overlay + Debug + Send + Sync + 'static, - O::LeaderSelection: UpdateableLeaderSelection, - O::CommitteeMembership: UpdateableCommitteeMembership, - TxS: TxSelect + Clone + Send + Sync + 'static, + TxS: TxSelect + Clone + Send + Sync + 'static, + BS: BlobCertificateSelect + Clone + Send + Sync + 'static, + ClPool::Key: Debug + Send + Sync, + DaPool::Key: Debug + Send + Sync, + ClPoolAdapter: MempoolAdapter + Send + Sync + 'static, + DaPoolAdapter: MempoolAdapter + Send + Sync + 'static, { fn process_message(carnot: &Carnot, msg: ConsensusMsg) { match msg { @@ -373,11 +395,12 @@ where #[allow(clippy::too_many_arguments)] async fn process_carnot_event( mut carnot: Carnot, - event: Event, - task_manager: &mut TaskManager>, + event: Event, + task_manager: &mut TaskManager>, adapter: A, private_key: PrivateKey, - mempool_relay: OutboundRelay>, + cl_mempool_relay: OutboundRelay>, + da_mempool_relay: OutboundRelay>, tx_selector: TxS, blobl_selector: BS, timeout: Duration, @@ -393,7 +416,7 @@ where tracing::debug!("approving proposal {:?}", block); let (new_carnot, out) = carnot.approve_block(block); carnot = new_carnot; - output = Some(Output::Send::(out)); + output = Some(Output::Send::(out)); } Event::LocalTimeout { view } => { tracing::debug!("local timeout"); @@ -434,7 +457,8 @@ where qc, tx_selector.clone(), blobl_selector.clone(), - mempool_relay, + cl_mempool_relay, + da_mempool_relay, ) .await; } @@ -460,14 +484,15 @@ where carnot } + #[allow(clippy::type_complexity)] #[instrument(level = "debug", skip(adapter, task_manager, stream))] async fn process_block( mut carnot: Carnot, - block: Block, - mut stream: Pin> + Send>>, - task_manager: &mut TaskManager>, + block: Block, + mut stream: Pin> + Send>>, + task_manager: &mut TaskManager>, adapter: A, - ) -> (Carnot, Option>) { + ) -> (Carnot, Option>) { tracing::debug!("received proposal {:?}", block); if carnot.highest_voted_view() >= block.header().view { tracing::debug!("already voted for view {}", block.header().view); @@ -538,14 +563,15 @@ where (carnot, None) } + #[allow(clippy::type_complexity)] #[instrument(level = "debug", skip(task_manager, adapter))] async fn approve_new_view( carnot: Carnot, timeout_qc: TimeoutQc, new_views: HashSet, - task_manager: &mut TaskManager>, + task_manager: &mut TaskManager>, adapter: A, - ) -> (Carnot, Option>) { + ) -> (Carnot, Option>) { let leader_committee = [carnot.id()].into_iter().collect(); let leader_tally_settings = CarnotTallySettings { threshold: carnot.leader_super_majority_threshold(), @@ -576,13 +602,14 @@ where (new_carnot, Some(Output::Send(out))) } + #[allow(clippy::type_complexity)] #[instrument(level = "debug", skip(task_manager, adapter))] async fn receive_timeout_qc( carnot: Carnot, timeout_qc: TimeoutQc, - task_manager: &mut TaskManager>, + task_manager: &mut TaskManager>, adapter: A, - ) -> (Carnot, Option>) { + ) -> (Carnot, Option>) { let mut new_state = carnot.receive_timeout_qc(timeout_qc.clone()); let self_committee = carnot.self_committee(); let tally_settings = CarnotTallySettings { @@ -603,11 +630,12 @@ where (new_state, None) } + #[allow(clippy::type_complexity)] #[instrument(level = "debug")] async fn process_root_timeout( carnot: Carnot, timeouts: HashSet, - ) -> (Carnot, Option>) { + ) -> (Carnot, Option>) { // we might have received a timeout_qc sent by some other node and advanced the view // already, in which case we should ignore the timeout if carnot.current_view() @@ -643,7 +671,13 @@ where #[instrument( level = "debug", - skip(mempool_relay, private_key, tx_selector, blob_selector) + skip( + cl_mempool_relay, + da_mempool_relay, + private_key, + tx_selector, + blob_selector + ) )] async fn propose_block( id: NodeId, @@ -651,35 +685,31 @@ where qc: Qc, tx_selector: TxS, blob_selector: BS, - mempool_relay: OutboundRelay>, - ) -> Option> { - let (reply_channel, rx) = tokio::sync::oneshot::channel(); + cl_mempool_relay: OutboundRelay>, + da_mempool_relay: OutboundRelay>, + ) -> Option> { let mut output = None; - mempool_relay - .send(MempoolMsg::View { - ancestor_hint: BlockId::zeros(), - reply_channel, - }) - .await - .unwrap_or_else(|(e, _)| eprintln!("Could not get transactions from mempool {e}")); + let cl_txs = get_mempool_contents(cl_mempool_relay); + let da_certs = get_mempool_contents(da_mempool_relay); - match rx.await { - Ok(txs) => { + match futures::join!(cl_txs, da_certs) { + (Ok(cl_txs), Ok(da_certs)) => { let beacon = RandomBeaconState::generate_happy(qc.view(), &private_key); let Ok(proposal) = BlockBuilder::new(tx_selector, blob_selector) .with_view(qc.view().next()) .with_parent_qc(qc) .with_proposer(id) .with_beacon_state(beacon) - .with_transactions(txs) - .with_blobs_certificates([].into_iter()) + .with_transactions(cl_txs) + .with_blobs_certificates(da_certs) .build() else { panic!("Proposal block should always succeed to be built") }; output = Some(Output::BroadcastProposal { proposal }); } - Err(e) => tracing::error!("Could not fetch txs {e}"), + (Err(_), _) => tracing::error!("Could not fetch block cl transactions"), + (_, Err(_)) => tracing::error!("Could not fetch block da certificates"), } output } @@ -687,7 +717,7 @@ where async fn process_view_change( carnot: Carnot, prev_view: View, - task_manager: &mut TaskManager>, + task_manager: &mut TaskManager>, adapter: A, timeout: Duration, ) { @@ -724,7 +754,10 @@ where } } - async fn gather_timeout_qc(adapter: A, view: consensus_engine::View) -> Event { + async fn gather_timeout_qc( + adapter: A, + view: consensus_engine::View, + ) -> Event { if let Some(timeout_qc) = adapter .timeout_qc_stream(view) .await @@ -744,7 +777,7 @@ where committee: Committee, block: consensus_engine::Block, tally: CarnotTallySettings, - ) -> Event { + ) -> Event { let tally = CarnotTally::new(tally); let votes_stream = adapter.votes_stream(&committee, block.view, block.id).await; match tally.tally(block.clone(), votes_stream).await { @@ -761,7 +794,7 @@ where committee: Committee, timeout_qc: TimeoutQc, tally: CarnotTallySettings, - ) -> Event { + ) -> Event { let tally = NewViewTally::new(tally); let stream = adapter .new_view_stream(&committee, timeout_qc.view().next()) @@ -783,7 +816,7 @@ where committee: Committee, view: consensus_engine::View, tally: CarnotTallySettings, - ) -> Event { + ) -> Event { let tally = TimeoutTally::new(tally); let stream = adapter.timeout_stream(&committee, view).await; match tally.tally(view, stream).await { @@ -795,7 +828,10 @@ where } #[instrument(level = "debug", skip(adapter))] - async fn gather_block(adapter: A, view: consensus_engine::View) -> Event { + async fn gather_block( + adapter: A, + view: consensus_engine::View, + ) -> Event { let stream = adapter .proposal_chunks_stream(view) .await @@ -1016,3 +1052,19 @@ mod tests { assert_eq!(deserialized, info); } } + +async fn get_mempool_contents( + mempool: OutboundRelay>, +) -> Result + Send>, tokio::sync::oneshot::error::RecvError> { + let (reply_channel, rx) = tokio::sync::oneshot::channel(); + + mempool + .send(MempoolMsg::View { + ancestor_hint: BlockId::zeros(), + reply_channel, + }) + .await + .unwrap_or_else(|(e, _)| eprintln!("Could not get transactions from mempool {e}")); + + rx.await +}