diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index 2973c376..7783b856 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -3,9 +3,8 @@ mod tx; use color_eyre::eyre::Result; use consensus_engine::overlay::{FlatOverlay, RandomBeaconState, RoundRobin}; -use full_replication::Blob; - -use full_replication::{AbsoluteNumber, Attestation, Certificate, FullReplication}; +use full_replication::Certificate; +use full_replication::{AbsoluteNumber, Attestation, Blob, FullReplication}; #[cfg(feature = "metrics")] use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsService}; use nomos_consensus::network::adapters::libp2p::Libp2pAdapter as ConsensusLibp2pAdapter; @@ -31,7 +30,8 @@ use overwatch_rs::services::handle::ServiceHandle; pub use config::{Config, ConsensusArgs, HttpArgs, LogArgs, NetworkArgs, OverlayArgs}; use nomos_core::{ - da::blob::select::FillSize as FillSizeWithBlobs, tx::select::FillSize as FillSizeWithTx, + da::certificate::select::FillSize as FillSizeWithBlobsCertificate, + tx::select::FillSize as FillSizeWithTx, }; pub use tx::Tx; @@ -42,9 +42,9 @@ pub type Carnot = CarnotConsensus< MockPool, MempoolLibp2pAdapter, FlatOverlay, - Blob, + Certificate, FillSizeWithTx, - FillSizeWithBlobs, + FillSizeWithBlobsCertificate, >; type DataAvailability = DataAvailabilityService< diff --git a/nomos-core/src/block/builder.rs b/nomos-core/src/block/builder.rs index 30eba0c9..2d32ac47 100644 --- a/nomos-core/src/block/builder.rs +++ b/nomos-core/src/block/builder.rs @@ -5,7 +5,8 @@ use serde::de::DeserializeOwned; use serde::Serialize; // internal use crate::block::Block; -use crate::da::blob::{Blob, BlobSelect}; +use crate::da::certificate::BlobCertificateSelect; +use crate::da::certificate::Certificate; use crate::tx::{Transaction, TxSelect}; use consensus_engine::overlay::RandomBeaconState; use consensus_engine::{NodeId, Qc, View}; @@ -39,12 +40,12 @@ pub struct BlockBuilder { blobs: Option>>, } -impl BlockBuilder +impl BlockBuilder where Tx: Transaction + Clone + Eq + Hash + Serialize + DeserializeOwned, - B: Blob + Clone + Eq + Hash + Serialize + DeserializeOwned, + C: Certificate + Clone + Eq + Hash + Serialize + DeserializeOwned, TxSelector: TxSelect, - BlobSelector: BlobSelect, + BlobSelector: BlobCertificateSelect, { pub fn new(tx_selector: TxSelector, blob_selector: BlobSelector) -> Self { Self { @@ -90,13 +91,16 @@ where } #[must_use] - pub fn with_blobs(mut self, blobs: impl Iterator + 'static) -> Self { - self.blobs = Some(Box::new(blobs)); + pub fn with_blobs_certificates( + mut self, + blobs_certificates: impl Iterator + 'static, + ) -> Self { + self.blobs = Some(Box::new(blobs_certificates)); self } #[allow(clippy::result_large_err)] - pub fn build(self) -> Result, Self> { + pub fn build(self) -> Result, Self> { if let Self { tx_selector, blob_selector, diff --git a/nomos-core/src/da/blob/mod.rs b/nomos-core/src/da/blob/mod.rs index 9486dbb9..f7780db4 100644 --- a/nomos-core/src/da/blob/mod.rs +++ b/nomos-core/src/da/blob/mod.rs @@ -1,5 +1,3 @@ -pub mod select; - use bytes::Bytes; use std::hash::Hash; @@ -13,14 +11,3 @@ pub trait Blob { } fn as_bytes(&self) -> Bytes; } - -pub trait BlobSelect { - type Blob: Blob; - type Settings: Clone; - - fn new(settings: Self::Settings) -> Self; - fn select_blob_from<'i, I: Iterator + 'i>( - &self, - blobs: I, - ) -> Box + 'i>; -} diff --git a/nomos-core/src/da/certificate/mod.rs b/nomos-core/src/da/certificate/mod.rs index 35600aaf..92425f62 100644 --- a/nomos-core/src/da/certificate/mod.rs +++ b/nomos-core/src/da/certificate/mod.rs @@ -1 +1,22 @@ -pub trait Certificate {} +pub mod select; + +use crate::da::blob::Blob; +use bytes::Bytes; + +pub trait Certificate { + type Blob: Blob; + fn blob(&self) -> ::Hash; + + fn as_bytes(&self) -> Bytes; +} + +pub trait BlobCertificateSelect { + type Certificate: Certificate; + type Settings: Clone; + + fn new(settings: Self::Settings) -> Self; + fn select_blob_from<'i, I: Iterator + 'i>( + &self, + certificates: I, + ) -> Box + 'i>; +} diff --git a/nomos-core/src/da/blob/select.rs b/nomos-core/src/da/certificate/select.rs similarity index 60% rename from nomos-core/src/da/blob/select.rs rename to nomos-core/src/da/certificate/select.rs index d00723b4..07d1dae7 100644 --- a/nomos-core/src/da/blob/select.rs +++ b/nomos-core/src/da/certificate/select.rs @@ -3,7 +3,7 @@ use std::marker::PhantomData; // crates // internal -use crate::da::blob::{Blob, BlobSelect}; +use crate::da::certificate::{BlobCertificateSelect, Certificate}; use crate::utils; #[derive(Default, Clone, Copy)] @@ -19,21 +19,21 @@ impl FillSize { } } -impl BlobSelect for FillSize { - type Blob = B; +impl BlobCertificateSelect for FillSize { + type Certificate = C; type Settings = (); fn new(_settings: Self::Settings) -> Self { FillSize::new() } - fn select_blob_from<'i, I: Iterator + 'i>( + fn select_blob_from<'i, I: Iterator + 'i>( &self, - blobs: I, - ) -> Box + 'i> { - utils::select::select_from_till_fill_size::( + certificates: I, + ) -> Box + 'i> { + utils::select::select_from_till_fill_size::( |blob| blob.as_bytes().len(), - blobs, + certificates, ) } } diff --git a/nomos-da/full-replication/src/lib.rs b/nomos-da/full-replication/src/lib.rs index 033d7375..398de11b 100644 --- a/nomos-da/full-replication/src/lib.rs +++ b/nomos-da/full-replication/src/lib.rs @@ -6,12 +6,14 @@ use nomos_core::da::{ }; // std use std::collections::HashSet; +use std::hash::{Hash, Hasher}; // crates use blake2::{ digest::{Update, VariableOutput}, Blake2bVar, }; use bytes::Bytes; +use nomos_core::wire; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone)] @@ -99,15 +101,15 @@ fn hasher(blob: &Blob) -> [u8; 32] { } impl blob::Blob for Blob { - type Hash = [u8; 32]; const HASHER: BlobHasher = hasher as BlobHasher; + type Hash = [u8; 32]; fn as_bytes(&self) -> bytes::Bytes { self.data.clone() } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct Attestation { blob: [u8; 32], voter: [u8; 32], @@ -119,16 +121,36 @@ impl attestation::Attestation for Attestation { self.blob } fn as_bytes(&self) -> Bytes { - Bytes::from([self.blob.as_ref(), self.voter.as_ref()].concat()) + wire::serialize(self) + .expect("Attestation shouldn't fail to be serialized") + .into() } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct Certificate { attestations: Vec, } -impl certificate::Certificate for Certificate {} +impl Hash for Certificate { + fn hash(&self, state: &mut H) { + state.write(certificate::Certificate::as_bytes(self).as_ref()); + } +} + +impl certificate::Certificate for Certificate { + type Blob = Blob; + + fn blob(&self) -> ::Hash { + self.attestations[0].blob + } + + fn as_bytes(&self) -> Bytes { + wire::serialize(self) + .expect("Certificate shouldn't fail to be serialized") + .into() + } +} // TODO: add generic impl when the trait for Certificate is expanded impl DaProtocol for FullReplication> { diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 8529ac94..784bb062 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -37,7 +37,7 @@ use task_manager::TaskManager; use crate::committee_membership::UpdateableCommitteeMembership; use nomos_core::block::builder::BlockBuilder; use nomos_core::block::Block; -use nomos_core::da::blob::{Blob, BlobSelect}; +use nomos_core::da::certificate::{BlobCertificateSelect, Certificate}; use nomos_core::tx::{Transaction, TxSelect}; use nomos_core::vote::Tally; use nomos_mempool::{ @@ -103,7 +103,7 @@ impl CarnotSettings { } } -pub struct CarnotConsensus +pub struct CarnotConsensus where A: NetworkAdapter, M: MempoolAdapter, @@ -113,7 +113,7 @@ where ::Hash: Debug, A::Backend: 'static, TxS: TxSelect, - BS: BlobSelect, + BS: BlobCertificateSelect, { service_state: ServiceStateHandle, // underlying networking backend. We need this so we can relay and check the types properly @@ -122,10 +122,10 @@ where mempool_relay: Relay>, _overlay: std::marker::PhantomData, // this need to be substituted by some kind DA bo - _blob: std::marker::PhantomData, + _blob_certificate: std::marker::PhantomData, } -impl ServiceData for CarnotConsensus +impl ServiceData for CarnotConsensus where A: NetworkAdapter, P: MemPool, @@ -134,7 +134,7 @@ where M: MempoolAdapter, O: Overlay + Debug, TxS: TxSelect, - BS: BlobSelect, + BS: BlobCertificateSelect, { const SERVICE_ID: ServiceId = "Carnot"; type Settings = CarnotSettings; @@ -144,7 +144,7 @@ where } #[async_trait::async_trait] -impl ServiceCore for CarnotConsensus +impl ServiceCore for CarnotConsensus where A: NetworkAdapter + Clone + Send + Sync + 'static, P: MemPool + Send + Sync + 'static, @@ -152,14 +152,23 @@ where P::Tx: Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static, ::Hash: Debug + Send + Sync, - B: Blob + Debug + Clone + Eq + Hash + Serialize + DeserializeOwned + Send + Sync + 'static, + C: Certificate + + Debug + + Clone + + Eq + + Hash + + Serialize + + DeserializeOwned + + Send + + Sync + + 'static, M: MempoolAdapter + Send + Sync + 'static, O: Overlay + Debug + Send + Sync + 'static, O::LeaderSelection: UpdateableLeaderSelection, O::CommitteeMembership: UpdateableCommitteeMembership, TxS: TxSelect + Clone + Send + Sync + 'static, TxS::Settings: Send + Sync + 'static, - BS: BlobSelect + Clone + Send + Sync + 'static, + BS: BlobCertificateSelect + Clone + Send + Sync + 'static, BS::Settings: Send + Sync + 'static, { fn init(service_state: ServiceStateHandle) -> Result { @@ -169,7 +178,7 @@ where service_state, network_relay, _overlay: Default::default(), - _blob: Default::default(), + _blob_certificate: Default::default(), mempool_relay, }) } @@ -286,13 +295,17 @@ where #[derive(Debug)] #[allow(clippy::large_enum_variant)] -enum Output { +enum Output { Send(consensus_engine::Send), - BroadcastTimeoutQc { timeout_qc: TimeoutQc }, - BroadcastProposal { proposal: Block }, + BroadcastTimeoutQc { + timeout_qc: TimeoutQc, + }, + BroadcastProposal { + proposal: Block, + }, } -impl CarnotConsensus +impl CarnotConsensus where A: NetworkAdapter + Clone + Send + Sync + 'static, P: MemPool + Send + Sync + 'static, @@ -300,13 +313,22 @@ where P::Tx: Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static, ::Hash: Debug + Send + Sync, - B: Blob + Debug + Clone + Eq + Hash + Serialize + DeserializeOwned + Send + Sync + 'static, + C: Certificate + + Debug + + Clone + + Eq + + Hash + + Serialize + + DeserializeOwned + + Send + + Sync + + 'static, M: MempoolAdapter + Send + Sync + 'static, O: Overlay + Debug + Send + Sync + 'static, O::LeaderSelection: UpdateableLeaderSelection, O::CommitteeMembership: UpdateableCommitteeMembership, TxS: TxSelect + Clone + Send + Sync + 'static, - BS: BlobSelect + Clone + Send + Sync + 'static, + BS: BlobCertificateSelect + Clone + Send + Sync + 'static, { fn process_message(carnot: &Carnot, msg: ConsensusMsg) { match msg { @@ -330,8 +352,8 @@ where #[allow(clippy::too_many_arguments)] async fn process_carnot_event( mut carnot: Carnot, - event: Event, - task_manager: &mut TaskManager>, + event: Event, + task_manager: &mut TaskManager>, adapter: A, private_key: PrivateKey, mempool_relay: OutboundRelay>, @@ -350,7 +372,7 @@ where tracing::debug!("approving proposal {:?}", block); let (new_carnot, out) = carnot.approve_block(block); carnot = new_carnot; - output = Some(Output::Send::(out)); + output = Some(Output::Send::(out)); } Event::LocalTimeout { view } => { tracing::debug!("local timeout"); @@ -420,11 +442,11 @@ where #[instrument(level = "debug", skip(adapter, task_manager, stream))] async fn process_block( mut carnot: Carnot, - block: Block, - mut stream: Pin> + Send>>, - task_manager: &mut TaskManager>, + block: Block, + mut stream: Pin> + Send>>, + task_manager: &mut TaskManager>, adapter: A, - ) -> (Carnot, Option>) { + ) -> (Carnot, Option>) { tracing::debug!("received proposal {:?}", block); if carnot.highest_voted_view() >= block.header().view { tracing::debug!("already voted for view {}", block.header().view); @@ -500,9 +522,9 @@ where carnot: Carnot, timeout_qc: TimeoutQc, new_views: HashSet, - task_manager: &mut TaskManager>, + task_manager: &mut TaskManager>, adapter: A, - ) -> (Carnot, Option>) { + ) -> (Carnot, Option>) { let leader_committee = [carnot.id()].into_iter().collect(); let leader_tally_settings = CarnotTallySettings { threshold: carnot.leader_super_majority_threshold(), @@ -537,9 +559,9 @@ where async fn receive_timeout_qc( carnot: Carnot, timeout_qc: TimeoutQc, - task_manager: &mut TaskManager>, + task_manager: &mut TaskManager>, adapter: A, - ) -> (Carnot, Option>) { + ) -> (Carnot, Option>) { let mut new_state = carnot.receive_timeout_qc(timeout_qc.clone()); let self_committee = carnot.self_committee(); let tally_settings = CarnotTallySettings { @@ -564,7 +586,7 @@ where async fn process_root_timeout( carnot: Carnot, timeouts: HashSet, - ) -> (Carnot, Option>) { + ) -> (Carnot, Option>) { // we might have received a timeout_qc sent by some other node and advanced the view // already, in which case we should ignore the timeout if carnot.current_view() @@ -609,7 +631,7 @@ where tx_selector: TxS, blob_selector: BS, mempool_relay: OutboundRelay>, - ) -> Option> { + ) -> Option> { let (reply_channel, rx) = tokio::sync::oneshot::channel(); let mut output = None; mempool_relay @@ -629,7 +651,7 @@ where .with_proposer(id) .with_beacon_state(beacon) .with_transactions(txs) - .with_blobs([].into_iter()) + .with_blobs_certificates([].into_iter()) .build() else { panic!("Proposal block should always succeed to be built") @@ -644,7 +666,7 @@ where async fn process_view_change( carnot: Carnot, prev_view: View, - task_manager: &mut TaskManager>, + task_manager: &mut TaskManager>, adapter: A, timeout: Duration, ) { @@ -681,7 +703,7 @@ where } } - async fn gather_timeout_qc(adapter: A, view: consensus_engine::View) -> Event { + async fn gather_timeout_qc(adapter: A, view: consensus_engine::View) -> Event { if let Some(timeout_qc) = adapter .timeout_qc_stream(view) .await @@ -701,7 +723,7 @@ where committee: Committee, block: consensus_engine::Block, tally: CarnotTallySettings, - ) -> Event { + ) -> Event { let tally = CarnotTally::new(tally); let votes_stream = adapter.votes_stream(&committee, block.view, block.id).await; match tally.tally(block.clone(), votes_stream).await { @@ -718,7 +740,7 @@ where committee: Committee, timeout_qc: TimeoutQc, tally: CarnotTallySettings, - ) -> Event { + ) -> Event { let tally = NewViewTally::new(tally); let stream = adapter .new_view_stream(&committee, timeout_qc.view().next()) @@ -740,7 +762,7 @@ where committee: Committee, view: consensus_engine::View, tally: CarnotTallySettings, - ) -> Event { + ) -> Event { let tally = TimeoutTally::new(tally); let stream = adapter.timeout_stream(&committee, view).await; match tally.tally(view, stream).await { @@ -752,7 +774,7 @@ where } #[instrument(level = "debug", skip(adapter))] - async fn gather_block(adapter: A, view: consensus_engine::View) -> Event { + async fn gather_block(adapter: A, view: consensus_engine::View) -> Event { let stream = adapter .proposal_chunks_stream(view) .await @@ -814,11 +836,11 @@ where } } -async fn handle_output(adapter: &A, node_id: NodeId, output: Output) +async fn handle_output(adapter: &A, node_id: NodeId, output: Output) where A: NetworkAdapter, Tx: Hash + Eq + Clone + Serialize + DeserializeOwned + Debug, - B: Clone + Eq + Hash + Serialize + DeserializeOwned, + C: Clone + Eq + Hash + Serialize + DeserializeOwned, { match output { Output::Send(consensus_engine::Send { to, payload }) => match payload { @@ -878,10 +900,10 @@ where } #[allow(clippy::large_enum_variant)] -enum Event { +enum Event { Proposal { - block: Block, - stream: Pin> + Send>>, + block: Block, + stream: Pin> + Send>>, }, #[allow(dead_code)] Approve {