Use selection for blob certificates (#427)

* Use selection for blob certificates

* Fix bin imports

* Fix rebase

* Missing blobs -> certificates refactor

* Fix attestation and certificate as_bytes

* More naming refactors
This commit is contained in:
Daniel Sanchez 2023-09-25 15:34:05 +02:00 committed by GitHub
parent 4da06a9740
commit 95618c0a72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 137 additions and 81 deletions

View File

@ -3,9 +3,8 @@ mod tx;
use color_eyre::eyre::Result; use color_eyre::eyre::Result;
use consensus_engine::overlay::{FlatOverlay, RandomBeaconState, RoundRobin}; use consensus_engine::overlay::{FlatOverlay, RandomBeaconState, RoundRobin};
use full_replication::Blob; use full_replication::Certificate;
use full_replication::{AbsoluteNumber, Attestation, Blob, FullReplication};
use full_replication::{AbsoluteNumber, Attestation, Certificate, FullReplication};
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsService}; use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsService};
use nomos_consensus::network::adapters::libp2p::Libp2pAdapter as ConsensusLibp2pAdapter; use nomos_consensus::network::adapters::libp2p::Libp2pAdapter as ConsensusLibp2pAdapter;
@ -31,7 +30,8 @@ use overwatch_rs::services::handle::ServiceHandle;
pub use config::{Config, ConsensusArgs, HttpArgs, LogArgs, NetworkArgs, OverlayArgs}; pub use config::{Config, ConsensusArgs, HttpArgs, LogArgs, NetworkArgs, OverlayArgs};
use nomos_core::{ use nomos_core::{
da::blob::select::FillSize as FillSizeWithBlobs, tx::select::FillSize as FillSizeWithTx, da::certificate::select::FillSize as FillSizeWithBlobsCertificate,
tx::select::FillSize as FillSizeWithTx,
}; };
pub use tx::Tx; pub use tx::Tx;
@ -42,9 +42,9 @@ pub type Carnot = CarnotConsensus<
MockPool<Tx>, MockPool<Tx>,
MempoolLibp2pAdapter<Tx>, MempoolLibp2pAdapter<Tx>,
FlatOverlay<RoundRobin, RandomBeaconState>, FlatOverlay<RoundRobin, RandomBeaconState>,
Blob, Certificate,
FillSizeWithTx<MB16, Tx>, FillSizeWithTx<MB16, Tx>,
FillSizeWithBlobs<MB16, Blob>, FillSizeWithBlobsCertificate<MB16, Certificate>,
>; >;
type DataAvailability = DataAvailabilityService< type DataAvailability = DataAvailabilityService<

View File

@ -5,7 +5,8 @@ use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
// internal // internal
use crate::block::Block; use crate::block::Block;
use crate::da::blob::{Blob, BlobSelect}; use crate::da::certificate::BlobCertificateSelect;
use crate::da::certificate::Certificate;
use crate::tx::{Transaction, TxSelect}; use crate::tx::{Transaction, TxSelect};
use consensus_engine::overlay::RandomBeaconState; use consensus_engine::overlay::RandomBeaconState;
use consensus_engine::{NodeId, Qc, View}; use consensus_engine::{NodeId, Qc, View};
@ -39,12 +40,12 @@ pub struct BlockBuilder<Tx, Blob, TxSelector, BlobSelector> {
blobs: Option<Box<dyn Iterator<Item = Blob>>>, blobs: Option<Box<dyn Iterator<Item = Blob>>>,
} }
impl<Tx, B, TxSelector, BlobSelector> BlockBuilder<Tx, B, TxSelector, BlobSelector> impl<Tx, C, TxSelector, BlobSelector> BlockBuilder<Tx, C, TxSelector, BlobSelector>
where where
Tx: Transaction + Clone + Eq + Hash + Serialize + DeserializeOwned, Tx: Transaction + Clone + Eq + Hash + Serialize + DeserializeOwned,
B: Blob + Clone + Eq + Hash + Serialize + DeserializeOwned, C: Certificate + Clone + Eq + Hash + Serialize + DeserializeOwned,
TxSelector: TxSelect<Tx = Tx>, TxSelector: TxSelect<Tx = Tx>,
BlobSelector: BlobSelect<Blob = B>, BlobSelector: BlobCertificateSelect<Certificate = C>,
{ {
pub fn new(tx_selector: TxSelector, blob_selector: BlobSelector) -> Self { pub fn new(tx_selector: TxSelector, blob_selector: BlobSelector) -> Self {
Self { Self {
@ -90,13 +91,16 @@ where
} }
#[must_use] #[must_use]
pub fn with_blobs(mut self, blobs: impl Iterator<Item = B> + 'static) -> Self { pub fn with_blobs_certificates(
self.blobs = Some(Box::new(blobs)); mut self,
blobs_certificates: impl Iterator<Item = C> + 'static,
) -> Self {
self.blobs = Some(Box::new(blobs_certificates));
self self
} }
#[allow(clippy::result_large_err)] #[allow(clippy::result_large_err)]
pub fn build(self) -> Result<Block<Tx, B>, Self> { pub fn build(self) -> Result<Block<Tx, C>, Self> {
if let Self { if let Self {
tx_selector, tx_selector,
blob_selector, blob_selector,

View File

@ -1,5 +1,3 @@
pub mod select;
use bytes::Bytes; use bytes::Bytes;
use std::hash::Hash; use std::hash::Hash;
@ -13,14 +11,3 @@ pub trait Blob {
} }
fn as_bytes(&self) -> Bytes; fn as_bytes(&self) -> Bytes;
} }
pub trait BlobSelect {
type Blob: Blob;
type Settings: Clone;
fn new(settings: Self::Settings) -> Self;
fn select_blob_from<'i, I: Iterator<Item = Self::Blob> + 'i>(
&self,
blobs: I,
) -> Box<dyn Iterator<Item = Self::Blob> + 'i>;
}

View File

@ -1 +1,22 @@
pub trait Certificate {} pub mod select;
use crate::da::blob::Blob;
use bytes::Bytes;
pub trait Certificate {
type Blob: Blob;
fn blob(&self) -> <Self::Blob as Blob>::Hash;
fn as_bytes(&self) -> Bytes;
}
pub trait BlobCertificateSelect {
type Certificate: Certificate;
type Settings: Clone;
fn new(settings: Self::Settings) -> Self;
fn select_blob_from<'i, I: Iterator<Item = Self::Certificate> + 'i>(
&self,
certificates: I,
) -> Box<dyn Iterator<Item = Self::Certificate> + 'i>;
}

View File

@ -3,7 +3,7 @@ use std::marker::PhantomData;
// crates // crates
// internal // internal
use crate::da::blob::{Blob, BlobSelect}; use crate::da::certificate::{BlobCertificateSelect, Certificate};
use crate::utils; use crate::utils;
#[derive(Default, Clone, Copy)] #[derive(Default, Clone, Copy)]
@ -19,21 +19,21 @@ impl<const SIZE: usize, B> FillSize<SIZE, B> {
} }
} }
impl<const SIZE: usize, B: Blob> BlobSelect for FillSize<SIZE, B> { impl<const SIZE: usize, C: Certificate> BlobCertificateSelect for FillSize<SIZE, C> {
type Blob = B; type Certificate = C;
type Settings = (); type Settings = ();
fn new(_settings: Self::Settings) -> Self { fn new(_settings: Self::Settings) -> Self {
FillSize::new() FillSize::new()
} }
fn select_blob_from<'i, I: Iterator<Item = Self::Blob> + 'i>( fn select_blob_from<'i, I: Iterator<Item = Self::Certificate> + 'i>(
&self, &self,
blobs: I, certificates: I,
) -> Box<dyn Iterator<Item = Self::Blob> + 'i> { ) -> Box<dyn Iterator<Item = Self::Certificate> + 'i> {
utils::select::select_from_till_fill_size::<SIZE, Self::Blob>( utils::select::select_from_till_fill_size::<SIZE, Self::Certificate>(
|blob| blob.as_bytes().len(), |blob| blob.as_bytes().len(),
blobs, certificates,
) )
} }
} }

View File

@ -6,12 +6,14 @@ use nomos_core::da::{
}; };
// std // std
use std::collections::HashSet; use std::collections::HashSet;
use std::hash::{Hash, Hasher};
// crates // crates
use blake2::{ use blake2::{
digest::{Update, VariableOutput}, digest::{Update, VariableOutput},
Blake2bVar, Blake2bVar,
}; };
use bytes::Bytes; use bytes::Bytes;
use nomos_core::wire;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -99,15 +101,15 @@ fn hasher(blob: &Blob) -> [u8; 32] {
} }
impl blob::Blob for Blob { impl blob::Blob for Blob {
type Hash = [u8; 32];
const HASHER: BlobHasher<Self> = hasher as BlobHasher<Self>; const HASHER: BlobHasher<Self> = hasher as BlobHasher<Self>;
type Hash = [u8; 32];
fn as_bytes(&self) -> bytes::Bytes { fn as_bytes(&self) -> bytes::Bytes {
self.data.clone() self.data.clone()
} }
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct Attestation { pub struct Attestation {
blob: [u8; 32], blob: [u8; 32],
voter: [u8; 32], voter: [u8; 32],
@ -119,16 +121,36 @@ impl attestation::Attestation for Attestation {
self.blob self.blob
} }
fn as_bytes(&self) -> Bytes { fn as_bytes(&self) -> Bytes {
Bytes::from([self.blob.as_ref(), self.voter.as_ref()].concat()) wire::serialize(self)
.expect("Attestation shouldn't fail to be serialized")
.into()
} }
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct Certificate { pub struct Certificate {
attestations: Vec<Attestation>, attestations: Vec<Attestation>,
} }
impl certificate::Certificate for Certificate {} impl Hash for Certificate {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write(certificate::Certificate::as_bytes(self).as_ref());
}
}
impl certificate::Certificate for Certificate {
type Blob = Blob;
fn blob(&self) -> <Self::Blob as blob::Blob>::Hash {
self.attestations[0].blob
}
fn as_bytes(&self) -> Bytes {
wire::serialize(self)
.expect("Certificate shouldn't fail to be serialized")
.into()
}
}
// TODO: add generic impl when the trait for Certificate is expanded // TODO: add generic impl when the trait for Certificate is expanded
impl DaProtocol for FullReplication<AbsoluteNumber<Attestation, Certificate>> { impl DaProtocol for FullReplication<AbsoluteNumber<Attestation, Certificate>> {

View File

@ -37,7 +37,7 @@ use task_manager::TaskManager;
use crate::committee_membership::UpdateableCommitteeMembership; use crate::committee_membership::UpdateableCommitteeMembership;
use nomos_core::block::builder::BlockBuilder; use nomos_core::block::builder::BlockBuilder;
use nomos_core::block::Block; use nomos_core::block::Block;
use nomos_core::da::blob::{Blob, BlobSelect}; use nomos_core::da::certificate::{BlobCertificateSelect, Certificate};
use nomos_core::tx::{Transaction, TxSelect}; use nomos_core::tx::{Transaction, TxSelect};
use nomos_core::vote::Tally; use nomos_core::vote::Tally;
use nomos_mempool::{ use nomos_mempool::{
@ -103,7 +103,7 @@ impl<O: Overlay, Ts, Bs> CarnotSettings<O, Ts, Bs> {
} }
} }
pub struct CarnotConsensus<A, P, M, O, B, TxS, BS> pub struct CarnotConsensus<A, P, M, O, C, TxS, BS>
where where
A: NetworkAdapter, A: NetworkAdapter,
M: MempoolAdapter<Tx = P::Tx>, M: MempoolAdapter<Tx = P::Tx>,
@ -113,7 +113,7 @@ where
<P::Tx as Transaction>::Hash: Debug, <P::Tx as Transaction>::Hash: Debug,
A::Backend: 'static, A::Backend: 'static,
TxS: TxSelect<Tx = P::Tx>, TxS: TxSelect<Tx = P::Tx>,
BS: BlobSelect<Blob = B>, BS: BlobCertificateSelect<Certificate = C>,
{ {
service_state: ServiceStateHandle<Self>, service_state: ServiceStateHandle<Self>,
// underlying networking backend. We need this so we can relay and check the types properly // underlying networking backend. We need this so we can relay and check the types properly
@ -122,10 +122,10 @@ where
mempool_relay: Relay<MempoolService<M, P>>, mempool_relay: Relay<MempoolService<M, P>>,
_overlay: std::marker::PhantomData<O>, _overlay: std::marker::PhantomData<O>,
// this need to be substituted by some kind DA bo // this need to be substituted by some kind DA bo
_blob: std::marker::PhantomData<B>, _blob_certificate: std::marker::PhantomData<C>,
} }
impl<A, P, M, O, B, TxS, BS> ServiceData for CarnotConsensus<A, P, M, O, B, TxS, BS> impl<A, P, M, O, C, TxS, BS> ServiceData for CarnotConsensus<A, P, M, O, C, TxS, BS>
where where
A: NetworkAdapter, A: NetworkAdapter,
P: MemPool, P: MemPool,
@ -134,7 +134,7 @@ where
M: MempoolAdapter<Tx = P::Tx>, M: MempoolAdapter<Tx = P::Tx>,
O: Overlay + Debug, O: Overlay + Debug,
TxS: TxSelect<Tx = P::Tx>, TxS: TxSelect<Tx = P::Tx>,
BS: BlobSelect<Blob = B>, BS: BlobCertificateSelect<Certificate = C>,
{ {
const SERVICE_ID: ServiceId = "Carnot"; const SERVICE_ID: ServiceId = "Carnot";
type Settings = CarnotSettings<O, TxS::Settings, BS::Settings>; type Settings = CarnotSettings<O, TxS::Settings, BS::Settings>;
@ -144,7 +144,7 @@ where
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl<A, P, M, O, B, TxS, BS> ServiceCore for CarnotConsensus<A, P, M, O, B, TxS, BS> impl<A, P, M, O, C, TxS, BS> ServiceCore for CarnotConsensus<A, P, M, O, C, TxS, BS>
where where
A: NetworkAdapter + Clone + Send + Sync + 'static, A: NetworkAdapter + Clone + Send + Sync + 'static,
P: MemPool + Send + Sync + 'static, P: MemPool + Send + Sync + 'static,
@ -152,14 +152,23 @@ where
P::Tx: P::Tx:
Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static, Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
<P::Tx as Transaction>::Hash: Debug + Send + Sync, <P::Tx as Transaction>::Hash: Debug + Send + Sync,
B: Blob + Debug + Clone + Eq + Hash + Serialize + DeserializeOwned + Send + Sync + 'static, C: Certificate
+ Debug
+ Clone
+ Eq
+ Hash
+ Serialize
+ DeserializeOwned
+ Send
+ Sync
+ 'static,
M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static, M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static,
O: Overlay + Debug + Send + Sync + 'static, O: Overlay + Debug + Send + Sync + 'static,
O::LeaderSelection: UpdateableLeaderSelection, O::LeaderSelection: UpdateableLeaderSelection,
O::CommitteeMembership: UpdateableCommitteeMembership, O::CommitteeMembership: UpdateableCommitteeMembership,
TxS: TxSelect<Tx = P::Tx> + Clone + Send + Sync + 'static, TxS: TxSelect<Tx = P::Tx> + Clone + Send + Sync + 'static,
TxS::Settings: Send + Sync + 'static, TxS::Settings: Send + Sync + 'static,
BS: BlobSelect<Blob = B> + Clone + Send + Sync + 'static, BS: BlobCertificateSelect<Certificate = C> + Clone + Send + Sync + 'static,
BS::Settings: Send + Sync + 'static, BS::Settings: Send + Sync + 'static,
{ {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> { fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
@ -169,7 +178,7 @@ where
service_state, service_state,
network_relay, network_relay,
_overlay: Default::default(), _overlay: Default::default(),
_blob: Default::default(), _blob_certificate: Default::default(),
mempool_relay, mempool_relay,
}) })
} }
@ -286,13 +295,17 @@ where
#[derive(Debug)] #[derive(Debug)]
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
enum Output<Tx: Clone + Eq + Hash, Blob: Clone + Eq + Hash> { enum Output<Tx: Clone + Eq + Hash, BlobCertificate: Clone + Eq + Hash> {
Send(consensus_engine::Send), Send(consensus_engine::Send),
BroadcastTimeoutQc { timeout_qc: TimeoutQc }, BroadcastTimeoutQc {
BroadcastProposal { proposal: Block<Tx, Blob> }, timeout_qc: TimeoutQc,
},
BroadcastProposal {
proposal: Block<Tx, BlobCertificate>,
},
} }
impl<A, P, M, O, B, TxS, BS> CarnotConsensus<A, P, M, O, B, TxS, BS> impl<A, P, M, O, C, TxS, BS> CarnotConsensus<A, P, M, O, C, TxS, BS>
where where
A: NetworkAdapter + Clone + Send + Sync + 'static, A: NetworkAdapter + Clone + Send + Sync + 'static,
P: MemPool + Send + Sync + 'static, P: MemPool + Send + Sync + 'static,
@ -300,13 +313,22 @@ where
P::Tx: P::Tx:
Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static, Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
<P::Tx as Transaction>::Hash: Debug + Send + Sync, <P::Tx as Transaction>::Hash: Debug + Send + Sync,
B: Blob + Debug + Clone + Eq + Hash + Serialize + DeserializeOwned + Send + Sync + 'static, C: Certificate
+ Debug
+ Clone
+ Eq
+ Hash
+ Serialize
+ DeserializeOwned
+ Send
+ Sync
+ 'static,
M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static, M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static,
O: Overlay + Debug + Send + Sync + 'static, O: Overlay + Debug + Send + Sync + 'static,
O::LeaderSelection: UpdateableLeaderSelection, O::LeaderSelection: UpdateableLeaderSelection,
O::CommitteeMembership: UpdateableCommitteeMembership, O::CommitteeMembership: UpdateableCommitteeMembership,
TxS: TxSelect<Tx = P::Tx> + Clone + Send + Sync + 'static, TxS: TxSelect<Tx = P::Tx> + Clone + Send + Sync + 'static,
BS: BlobSelect<Blob = B> + Clone + Send + Sync + 'static, BS: BlobCertificateSelect<Certificate = C> + Clone + Send + Sync + 'static,
{ {
fn process_message(carnot: &Carnot<O>, msg: ConsensusMsg) { fn process_message(carnot: &Carnot<O>, msg: ConsensusMsg) {
match msg { match msg {
@ -330,8 +352,8 @@ where
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
async fn process_carnot_event( async fn process_carnot_event(
mut carnot: Carnot<O>, mut carnot: Carnot<O>,
event: Event<P::Tx, B>, event: Event<P::Tx, C>,
task_manager: &mut TaskManager<View, Event<P::Tx, B>>, task_manager: &mut TaskManager<View, Event<P::Tx, C>>,
adapter: A, adapter: A,
private_key: PrivateKey, private_key: PrivateKey,
mempool_relay: OutboundRelay<MempoolMsg<P::Tx>>, mempool_relay: OutboundRelay<MempoolMsg<P::Tx>>,
@ -350,7 +372,7 @@ where
tracing::debug!("approving proposal {:?}", block); tracing::debug!("approving proposal {:?}", block);
let (new_carnot, out) = carnot.approve_block(block); let (new_carnot, out) = carnot.approve_block(block);
carnot = new_carnot; carnot = new_carnot;
output = Some(Output::Send::<P::Tx, B>(out)); output = Some(Output::Send::<P::Tx, C>(out));
} }
Event::LocalTimeout { view } => { Event::LocalTimeout { view } => {
tracing::debug!("local timeout"); tracing::debug!("local timeout");
@ -420,11 +442,11 @@ where
#[instrument(level = "debug", skip(adapter, task_manager, stream))] #[instrument(level = "debug", skip(adapter, task_manager, stream))]
async fn process_block( async fn process_block(
mut carnot: Carnot<O>, mut carnot: Carnot<O>,
block: Block<P::Tx, B>, block: Block<P::Tx, C>,
mut stream: Pin<Box<dyn Stream<Item = Block<P::Tx, B>> + Send>>, mut stream: Pin<Box<dyn Stream<Item = Block<P::Tx, C>> + Send>>,
task_manager: &mut TaskManager<View, Event<P::Tx, B>>, task_manager: &mut TaskManager<View, Event<P::Tx, C>>,
adapter: A, adapter: A,
) -> (Carnot<O>, Option<Output<P::Tx, B>>) { ) -> (Carnot<O>, Option<Output<P::Tx, C>>) {
tracing::debug!("received proposal {:?}", block); tracing::debug!("received proposal {:?}", block);
if carnot.highest_voted_view() >= block.header().view { if carnot.highest_voted_view() >= block.header().view {
tracing::debug!("already voted for view {}", block.header().view); tracing::debug!("already voted for view {}", block.header().view);
@ -500,9 +522,9 @@ where
carnot: Carnot<O>, carnot: Carnot<O>,
timeout_qc: TimeoutQc, timeout_qc: TimeoutQc,
new_views: HashSet<NewView>, new_views: HashSet<NewView>,
task_manager: &mut TaskManager<View, Event<P::Tx, B>>, task_manager: &mut TaskManager<View, Event<P::Tx, C>>,
adapter: A, adapter: A,
) -> (Carnot<O>, Option<Output<P::Tx, B>>) { ) -> (Carnot<O>, Option<Output<P::Tx, C>>) {
let leader_committee = [carnot.id()].into_iter().collect(); let leader_committee = [carnot.id()].into_iter().collect();
let leader_tally_settings = CarnotTallySettings { let leader_tally_settings = CarnotTallySettings {
threshold: carnot.leader_super_majority_threshold(), threshold: carnot.leader_super_majority_threshold(),
@ -537,9 +559,9 @@ where
async fn receive_timeout_qc( async fn receive_timeout_qc(
carnot: Carnot<O>, carnot: Carnot<O>,
timeout_qc: TimeoutQc, timeout_qc: TimeoutQc,
task_manager: &mut TaskManager<View, Event<P::Tx, B>>, task_manager: &mut TaskManager<View, Event<P::Tx, C>>,
adapter: A, adapter: A,
) -> (Carnot<O>, Option<Output<P::Tx, B>>) { ) -> (Carnot<O>, Option<Output<P::Tx, C>>) {
let mut new_state = carnot.receive_timeout_qc(timeout_qc.clone()); let mut new_state = carnot.receive_timeout_qc(timeout_qc.clone());
let self_committee = carnot.self_committee(); let self_committee = carnot.self_committee();
let tally_settings = CarnotTallySettings { let tally_settings = CarnotTallySettings {
@ -564,7 +586,7 @@ where
async fn process_root_timeout( async fn process_root_timeout(
carnot: Carnot<O>, carnot: Carnot<O>,
timeouts: HashSet<Timeout>, timeouts: HashSet<Timeout>,
) -> (Carnot<O>, Option<Output<P::Tx, B>>) { ) -> (Carnot<O>, Option<Output<P::Tx, C>>) {
// we might have received a timeout_qc sent by some other node and advanced the view // we might have received a timeout_qc sent by some other node and advanced the view
// already, in which case we should ignore the timeout // already, in which case we should ignore the timeout
if carnot.current_view() if carnot.current_view()
@ -609,7 +631,7 @@ where
tx_selector: TxS, tx_selector: TxS,
blob_selector: BS, blob_selector: BS,
mempool_relay: OutboundRelay<MempoolMsg<P::Tx>>, mempool_relay: OutboundRelay<MempoolMsg<P::Tx>>,
) -> Option<Output<P::Tx, B>> { ) -> Option<Output<P::Tx, C>> {
let (reply_channel, rx) = tokio::sync::oneshot::channel(); let (reply_channel, rx) = tokio::sync::oneshot::channel();
let mut output = None; let mut output = None;
mempool_relay mempool_relay
@ -629,7 +651,7 @@ where
.with_proposer(id) .with_proposer(id)
.with_beacon_state(beacon) .with_beacon_state(beacon)
.with_transactions(txs) .with_transactions(txs)
.with_blobs([].into_iter()) .with_blobs_certificates([].into_iter())
.build() .build()
else { else {
panic!("Proposal block should always succeed to be built") panic!("Proposal block should always succeed to be built")
@ -644,7 +666,7 @@ where
async fn process_view_change( async fn process_view_change(
carnot: Carnot<O>, carnot: Carnot<O>,
prev_view: View, prev_view: View,
task_manager: &mut TaskManager<View, Event<P::Tx, B>>, task_manager: &mut TaskManager<View, Event<P::Tx, C>>,
adapter: A, adapter: A,
timeout: Duration, timeout: Duration,
) { ) {
@ -681,7 +703,7 @@ where
} }
} }
async fn gather_timeout_qc(adapter: A, view: consensus_engine::View) -> Event<P::Tx, B> { async fn gather_timeout_qc(adapter: A, view: consensus_engine::View) -> Event<P::Tx, C> {
if let Some(timeout_qc) = adapter if let Some(timeout_qc) = adapter
.timeout_qc_stream(view) .timeout_qc_stream(view)
.await .await
@ -701,7 +723,7 @@ where
committee: Committee, committee: Committee,
block: consensus_engine::Block, block: consensus_engine::Block,
tally: CarnotTallySettings, tally: CarnotTallySettings,
) -> Event<P::Tx, B> { ) -> Event<P::Tx, C> {
let tally = CarnotTally::new(tally); let tally = CarnotTally::new(tally);
let votes_stream = adapter.votes_stream(&committee, block.view, block.id).await; let votes_stream = adapter.votes_stream(&committee, block.view, block.id).await;
match tally.tally(block.clone(), votes_stream).await { match tally.tally(block.clone(), votes_stream).await {
@ -718,7 +740,7 @@ where
committee: Committee, committee: Committee,
timeout_qc: TimeoutQc, timeout_qc: TimeoutQc,
tally: CarnotTallySettings, tally: CarnotTallySettings,
) -> Event<P::Tx, B> { ) -> Event<P::Tx, C> {
let tally = NewViewTally::new(tally); let tally = NewViewTally::new(tally);
let stream = adapter let stream = adapter
.new_view_stream(&committee, timeout_qc.view().next()) .new_view_stream(&committee, timeout_qc.view().next())
@ -740,7 +762,7 @@ where
committee: Committee, committee: Committee,
view: consensus_engine::View, view: consensus_engine::View,
tally: CarnotTallySettings, tally: CarnotTallySettings,
) -> Event<P::Tx, B> { ) -> Event<P::Tx, C> {
let tally = TimeoutTally::new(tally); let tally = TimeoutTally::new(tally);
let stream = adapter.timeout_stream(&committee, view).await; let stream = adapter.timeout_stream(&committee, view).await;
match tally.tally(view, stream).await { match tally.tally(view, stream).await {
@ -752,7 +774,7 @@ where
} }
#[instrument(level = "debug", skip(adapter))] #[instrument(level = "debug", skip(adapter))]
async fn gather_block(adapter: A, view: consensus_engine::View) -> Event<P::Tx, B> { async fn gather_block(adapter: A, view: consensus_engine::View) -> Event<P::Tx, C> {
let stream = adapter let stream = adapter
.proposal_chunks_stream(view) .proposal_chunks_stream(view)
.await .await
@ -814,11 +836,11 @@ where
} }
} }
async fn handle_output<A, Tx, B>(adapter: &A, node_id: NodeId, output: Output<Tx, B>) async fn handle_output<A, Tx, C>(adapter: &A, node_id: NodeId, output: Output<Tx, C>)
where where
A: NetworkAdapter, A: NetworkAdapter,
Tx: Hash + Eq + Clone + Serialize + DeserializeOwned + Debug, Tx: Hash + Eq + Clone + Serialize + DeserializeOwned + Debug,
B: Clone + Eq + Hash + Serialize + DeserializeOwned, C: Clone + Eq + Hash + Serialize + DeserializeOwned,
{ {
match output { match output {
Output::Send(consensus_engine::Send { to, payload }) => match payload { Output::Send(consensus_engine::Send { to, payload }) => match payload {
@ -878,10 +900,10 @@ where
} }
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
enum Event<Tx: Clone + Hash + Eq, Blob: Clone + Eq + Hash> { enum Event<Tx: Clone + Hash + Eq, BlobCertificate: Clone + Eq + Hash> {
Proposal { Proposal {
block: Block<Tx, Blob>, block: Block<Tx, BlobCertificate>,
stream: Pin<Box<dyn Stream<Item = Block<Tx, Blob>> + Send>>, stream: Pin<Box<dyn Stream<Item = Block<Tx, BlobCertificate>> + Send>>,
}, },
#[allow(dead_code)] #[allow(dead_code)]
Approve { Approve {