From 57746f5e76c2faf2e427b72c5e5981d691c6653d Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Date: Tue, 5 Sep 2023 13:43:36 +0200 Subject: [PATCH] Block refactor (#368) * Add sidecars to block * Use cl and bl prefixes --- nodes/nomos-node/src/blob.rs | 1 + nodes/nomos-node/src/lib.rs | 4 ++ nomos-core/src/block.rs | 41 +++++++---- .../consensus/src/committee_membership/mod.rs | 12 ++-- .../consensus/src/leader_selection/mod.rs | 12 ++-- nomos-services/consensus/src/lib.rs | 71 +++++++++++-------- simulations/src/bin/app/overlay_node.rs | 2 +- simulations/src/node/carnot/event_builder.rs | 3 +- simulations/src/node/carnot/mod.rs | 12 ++-- 9 files changed, 96 insertions(+), 62 deletions(-) create mode 100644 nodes/nomos-node/src/blob.rs diff --git a/nodes/nomos-node/src/blob.rs b/nodes/nomos-node/src/blob.rs new file mode 100644 index 00000000..5860973b --- /dev/null +++ b/nodes/nomos-node/src/blob.rs @@ -0,0 +1 @@ +pub type Blob = Box<[u8]>; diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index 9fe56cfc..24fa9271 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -1,3 +1,4 @@ +mod blob; mod tx; use color_eyre::eyre::Result; @@ -32,6 +33,7 @@ use serde::{Deserialize, Serialize}; #[cfg(feature = "waku")] use waku_bindings::SecretKey; +use crate::blob::Blob; pub use tx::Tx; #[cfg(all(feature = "waku", feature = "libp2p"))] @@ -76,6 +78,7 @@ pub type Carnot = CarnotConsensus< MempoolWakuAdapter, MockFountain, FlatOverlay, + Blob, >; #[cfg(feature = "libp2p")] @@ -85,6 +88,7 @@ pub type Carnot = CarnotConsensus< MempoolLibp2pAdapter, MockFountain, FlatOverlay, + Blob, >; #[derive(Services)] diff --git a/nomos-core/src/block.rs b/nomos-core/src/block.rs index 2b86b049..93615221 100644 --- a/nomos-core/src/block.rs +++ b/nomos-core/src/block.rs @@ -15,21 +15,27 @@ pub type TxHash = [u8; 32]; /// A block #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Block { +pub struct Block { header: consensus_engine::Block, - transactions: IndexSet, beacon: RandomBeaconState, + cl_transactions: IndexSet, + bl_blobs: IndexSet, } -impl Block { +impl< + Tx: Clone + Eq + Hash + Serialize + DeserializeOwned, + Blob: Clone + Eq + Hash + Serialize + DeserializeOwned, + > Block +{ pub fn new( view: View, parent_qc: Qc, - txs: impl Iterator, + txs: impl Iterator, proposer: NodeId, beacon: RandomBeaconState, ) -> Self { let transactions = txs.collect(); + let blobs = IndexSet::new(); let header = consensus_engine::Block { id: BlockId::zeros(), view, @@ -38,11 +44,11 @@ impl Block { leader_id: proposer, }, }; - let mut s = Self { header, - transactions, beacon, + cl_transactions: transactions, + bl_blobs: blobs, }; let id = block_id_from_wire_content(&s); s.header.id = id; @@ -50,13 +56,17 @@ impl Block { } } -impl Block { +impl Block { pub fn header(&self) -> &consensus_engine::Block { &self.header } - pub fn transactions(&self) -> impl Iterator + '_ { - self.transactions.iter() + pub fn transactions(&self) -> impl Iterator + '_ { + self.cl_transactions.iter() + } + + pub fn blobs(&self) -> impl Iterator + '_ { + self.bl_blobs.iter() } pub fn beacon(&self) -> &RandomBeaconState { @@ -64,8 +74,11 @@ impl Block { } } -pub fn block_id_from_wire_content( - block: &Block, +pub fn block_id_from_wire_content< + Tx: Clone + Eq + Hash + Serialize + DeserializeOwned, + Blob: Clone + Eq + Hash + Serialize + DeserializeOwned, +>( + block: &Block, ) -> consensus_engine::BlockId { use blake2::digest::{consts::U32, Digest}; use blake2::Blake2b; @@ -75,7 +88,11 @@ pub fn block_id_from_wire_content Block { +impl< + Tx: Clone + Eq + Hash + Serialize + DeserializeOwned, + Blob: Clone + Eq + Hash + Serialize + DeserializeOwned, + > Block +{ /// Encode block into bytes pub fn as_bytes(&self) -> Bytes { wire::serialize(self).unwrap().into() diff --git a/nomos-services/consensus/src/committee_membership/mod.rs b/nomos-services/consensus/src/committee_membership/mod.rs index 8bef73f5..1bd6280c 100644 --- a/nomos-services/consensus/src/committee_membership/mod.rs +++ b/nomos-services/consensus/src/committee_membership/mod.rs @@ -15,9 +15,9 @@ use nomos_core::block::Block; pub trait UpdateableCommitteeMembership: CommitteeMembership { type Error: Error; - fn on_new_block_received( + fn on_new_block_received( &self, - block: &Block, + block: &Block, ) -> Result; fn on_timeout_qc_received(&self, qc: &TimeoutQc) -> Result; } @@ -25,9 +25,9 @@ pub trait UpdateableCommitteeMembership: CommitteeMembership { impl UpdateableCommitteeMembership for FreezeMembership { type Error = Infallible; - fn on_new_block_received( + fn on_new_block_received( &self, - _block: &Block, + _block: &Block, ) -> Result { Ok(Self) } @@ -40,9 +40,9 @@ impl UpdateableCommitteeMembership for FreezeMembership { impl UpdateableCommitteeMembership for RandomBeaconState { type Error = RandomBeaconError; - fn on_new_block_received( + fn on_new_block_received( &self, - block: &Block, + block: &Block, ) -> Result { self.check_advance_happy(block.beacon().clone(), block.header().parent_qc.view()) } diff --git a/nomos-services/consensus/src/leader_selection/mod.rs b/nomos-services/consensus/src/leader_selection/mod.rs index a7e74ce9..4a3c19ce 100644 --- a/nomos-services/consensus/src/leader_selection/mod.rs +++ b/nomos-services/consensus/src/leader_selection/mod.rs @@ -9,9 +9,9 @@ use std::{convert::Infallible, error::Error, hash::Hash}; pub trait UpdateableLeaderSelection: LeaderSelection { type Error: Error; - fn on_new_block_received( + fn on_new_block_received( &self, - block: &Block, + block: &Block, ) -> Result; fn on_timeout_qc_received(&self, qc: &TimeoutQc) -> Result; } @@ -19,9 +19,9 @@ pub trait UpdateableLeaderSelection: LeaderSelection { impl UpdateableLeaderSelection for RoundRobin { type Error = Infallible; - fn on_new_block_received( + fn on_new_block_received( &self, - _block: &Block, + _block: &Block, ) -> Result { Ok(self.advance()) } @@ -34,9 +34,9 @@ impl UpdateableLeaderSelection for RoundRobin { impl UpdateableLeaderSelection for RandomBeaconState { type Error = RandomBeaconError; - fn on_new_block_received( + fn on_new_block_received( &self, - block: &Block, + block: &Block, ) -> Result { self.check_advance_happy(block.beacon().clone(), block.header().parent_qc.view()) // TODO: check random beacon public keys is leader id diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index e286d1eb..115620a0 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -96,7 +96,7 @@ impl CarnotSettings { } } -pub struct CarnotConsensus +pub struct CarnotConsensus where F: FountainCode, A: NetworkAdapter, @@ -114,9 +114,11 @@ where mempool_relay: Relay>, _fountain: std::marker::PhantomData, _overlay: std::marker::PhantomData, + // this need to be substituted by some kind DA bo + _blob: std::marker::PhantomData, } -impl ServiceData for CarnotConsensus +impl ServiceData for CarnotConsensus where F: FountainCode, A: NetworkAdapter, @@ -134,7 +136,7 @@ where } #[async_trait::async_trait] -impl ServiceCore for CarnotConsensus +impl ServiceCore for CarnotConsensus where F: FountainCode + Clone + Send + Sync + 'static, A: NetworkAdapter + Clone + Send + Sync + 'static, @@ -143,6 +145,7 @@ where P::Tx: Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static, ::Hash: Debug + Send + Sync, + B: Debug + Clone + Eq + Hash + Serialize + DeserializeOwned + Send + Sync + 'static, M: MempoolAdapter + Send + Sync + 'static, O: Overlay + Debug + Send + Sync + 'static, O::LeaderSelection: UpdateableLeaderSelection, @@ -156,6 +159,7 @@ where network_relay, _fountain: Default::default(), _overlay: Default::default(), + _blob: Default::default(), mempool_relay, }) } @@ -268,13 +272,13 @@ where #[derive(Debug)] #[allow(clippy::large_enum_variant)] -enum Output { +enum Output { Send(consensus_engine::Send), BroadcastTimeoutQc { timeout_qc: TimeoutQc }, - BroadcastProposal { proposal: Block }, + BroadcastProposal { proposal: Block }, } -impl CarnotConsensus +impl CarnotConsensus where F: FountainCode + Clone + Send + Sync + 'static, A: NetworkAdapter + Clone + Send + Sync + 'static, @@ -283,6 +287,7 @@ where P::Tx: Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static, ::Hash: Debug + Send + Sync, + B: Debug + Clone + Eq + Hash + Serialize + DeserializeOwned + Send + Sync + 'static, M: MempoolAdapter + Send + Sync + 'static, O: Overlay + Debug + Send + Sync + 'static, O::LeaderSelection: UpdateableLeaderSelection, @@ -310,8 +315,8 @@ where #[allow(clippy::too_many_arguments)] async fn process_carnot_event( mut carnot: Carnot, - event: Event, - task_manager: &mut TaskManager>, + event: Event, + task_manager: &mut TaskManager>, adapter: A, private_key: PrivateKey, mempool_relay: OutboundRelay>, @@ -329,7 +334,7 @@ where tracing::debug!("approving proposal {:?}", block); let (new_carnot, out) = carnot.approve_block(block); carnot = new_carnot; - output = Some(Output::Send::(out)); + output = Some(Output::Send::(out)); } Event::LocalTimeout { view } => { tracing::debug!("local timeout"); @@ -391,11 +396,11 @@ where #[instrument(level = "debug", skip(adapter, task_manager, stream))] async fn process_block( mut carnot: Carnot, - block: Block, - mut stream: Pin> + Send>>, - task_manager: &mut TaskManager>, + block: Block, + mut stream: Pin> + Send>>, + task_manager: &mut TaskManager>, adapter: A, - ) -> (Carnot, Option>) { + ) -> (Carnot, Option>) { tracing::debug!("received proposal {:?}", block); if carnot.highest_voted_view() >= block.header().view { tracing::debug!("already voted for view {}", block.header().view); @@ -471,9 +476,9 @@ where carnot: Carnot, timeout_qc: TimeoutQc, new_views: HashSet, - task_manager: &mut TaskManager>, + task_manager: &mut TaskManager>, adapter: A, - ) -> (Carnot, Option>) { + ) -> (Carnot, Option>) { let leader_committee = [carnot.id()].into_iter().collect(); let leader_tally_settings = CarnotTallySettings { threshold: carnot.leader_super_majority_threshold(), @@ -508,9 +513,9 @@ where async fn receive_timeout_qc( carnot: Carnot, timeout_qc: TimeoutQc, - task_manager: &mut TaskManager>, + task_manager: &mut TaskManager>, adapter: A, - ) -> (Carnot, Option>) { + ) -> (Carnot, Option>) { let mut new_state = carnot.receive_timeout_qc(timeout_qc.clone()); let self_committee = carnot.self_committee(); let tally_settings = CarnotTallySettings { @@ -535,7 +540,7 @@ where async fn process_root_timeout( carnot: Carnot, timeouts: HashSet, - ) -> (Carnot, Option>) { + ) -> (Carnot, Option>) { // we might have received a timeout_qc sent by some other node and advanced the view // already, in which case we should ignore the timeout if carnot.current_view() @@ -575,7 +580,7 @@ where private_key: PrivateKey, qc: Qc, mempool_relay: OutboundRelay>, - ) -> Option> { + ) -> Option> { let (reply_channel, rx) = tokio::sync::oneshot::channel(); let mut output = None; mempool_relay @@ -600,7 +605,7 @@ where async fn process_view_change( carnot: Carnot, prev_view: View, - task_manager: &mut TaskManager>, + task_manager: &mut TaskManager>, adapter: A, timeout: Duration, ) { @@ -637,7 +642,7 @@ where } } - async fn gather_timeout_qc(adapter: A, view: consensus_engine::View) -> Event { + async fn gather_timeout_qc(adapter: A, view: consensus_engine::View) -> Event { if let Some(timeout_qc) = adapter .timeout_qc_stream(view) .await @@ -657,7 +662,7 @@ where committee: Committee, block: consensus_engine::Block, tally: CarnotTallySettings, - ) -> Event { + ) -> Event { let tally = CarnotTally::new(tally); let votes_stream = adapter.votes_stream(&committee, block.view, block.id).await; match tally.tally(block.clone(), votes_stream).await { @@ -674,7 +679,7 @@ where committee: Committee, timeout_qc: TimeoutQc, tally: CarnotTallySettings, - ) -> Event { + ) -> Event { let tally = NewViewTally::new(tally); let stream = adapter .new_view_stream(&committee, timeout_qc.view().next()) @@ -696,7 +701,7 @@ where committee: Committee, view: consensus_engine::View, tally: CarnotTallySettings, - ) -> Event { + ) -> Event { let tally = TimeoutTally::new(tally); let stream = adapter.timeout_stream(&committee, view).await; match tally.tally(view, stream).await { @@ -708,7 +713,7 @@ where } #[instrument(level = "debug", skip(adapter))] - async fn gather_block(adapter: A, view: consensus_engine::View) -> Event { + async fn gather_block(adapter: A, view: consensus_engine::View) -> Event { let stream = adapter .proposal_chunks_stream(view) .await @@ -770,11 +775,16 @@ where } } -async fn handle_output(adapter: &A, fountain: &F, node_id: NodeId, output: Output) -where +async fn handle_output( + adapter: &A, + fountain: &F, + node_id: NodeId, + output: Output, +) where A: NetworkAdapter, F: FountainCode, Tx: Hash + Eq + Clone + Serialize + DeserializeOwned + Debug, + B: Clone + Eq + Hash + Serialize + DeserializeOwned, { match output { Output::Send(consensus_engine::Send { to, payload }) => match payload { @@ -836,10 +846,11 @@ where } } -enum Event { +#[allow(clippy::large_enum_variant)] +enum Event { Proposal { - block: Block, - stream: Pin> + Send>>, + block: Block, + stream: Pin> + Send>>, }, #[allow(dead_code)] Approve { diff --git a/simulations/src/bin/app/overlay_node.rs b/simulations/src/bin/app/overlay_node.rs index 37901847..7389d671 100644 --- a/simulations/src/bin/app/overlay_node.rs +++ b/simulations/src/bin/app/overlay_node.rs @@ -16,7 +16,7 @@ pub fn to_overlay_node( nodes: Vec, leader: NodeId, network_interface: InMemoryNetworkInterface, - genesis: nomos_core::block::Block<[u8; 32]>, + genesis: nomos_core::block::Block<[u8; 32], Box<[u8]>>, mut rng: R, settings: &SimulationSettings, ) -> BoxedNode { diff --git a/simulations/src/node/carnot/event_builder.rs b/simulations/src/node/carnot/event_builder.rs index 3f717a5e..e428a3c9 100644 --- a/simulations/src/node/carnot/event_builder.rs +++ b/simulations/src/node/carnot/event_builder.rs @@ -10,6 +10,7 @@ use std::hash::Hash; use std::time::Duration; pub type CarnotTx = [u8; 32]; +pub type CarnotBlob = Box<[u8]>; pub(crate) struct EventBuilder { id: NodeId, @@ -230,7 +231,7 @@ impl EventBuilder { pub enum Event { Proposal { - block: Block, + block: Block>, }, #[allow(dead_code)] Approve { diff --git a/simulations/src/node/carnot/mod.rs b/simulations/src/node/carnot/mod.rs index e2df06fd..65480a08 100644 --- a/simulations/src/node/carnot/mod.rs +++ b/simulations/src/node/carnot/mod.rs @@ -23,7 +23,7 @@ use serde::Deserialize; use self::messages::CarnotMessage; use super::{Node, NodeId}; use crate::network::{InMemoryNetworkInterface, NetworkInterface, NetworkMessage}; -use crate::node::carnot::event_builder::{CarnotTx, Event}; +use crate::node::carnot::event_builder::{CarnotBlob, CarnotTx, Event}; use crate::node::carnot::message_cache::MessageCache; use crate::output_processors::{Record, RecordType, Runtime}; use crate::settings::SimulationSettings; @@ -90,7 +90,7 @@ impl< id: consensus_engine::NodeId, settings: CarnotSettings, overlay_settings: O::Settings, - genesis: nomos_core::block::Block, + genesis: nomos_core::block::Block, network_interface: InMemoryNetworkInterface, rng: &mut R, ) -> Self { @@ -120,7 +120,7 @@ impl< this } - fn handle_output(&self, output: Output) { + fn handle_output(&self, output: Output) { match output { Output::Send(consensus_engine::Send { to, @@ -343,7 +343,7 @@ impl< fn update_overlay_with_block( state: Carnot, - block: &nomos_core::block::Block, + block: &nomos_core::block::Block, ) -> Carnot { state .update_overlay(|overlay| { @@ -438,12 +438,12 @@ impl< #[derive(Debug)] #[allow(clippy::large_enum_variant)] -enum Output { +enum Output { Send(consensus_engine::Send), BroadcastTimeoutQc { timeout_qc: TimeoutQc, }, BroadcastProposal { - proposal: nomos_core::block::Block, + proposal: nomos_core::block::Block, }, }