diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index a1cac91b..6ba2f6c6 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -36,6 +36,9 @@ use overwatch_derive::*; use overwatch_rs::services::handle::ServiceHandle; pub use config::{Config, ConsensusArgs, HttpArgs, LogArgs, NetworkArgs, OverlayArgs}; +use nomos_core::{ + da::blob::select::FillSize as FillSizeWithBlobs, tx::select::FillSize as FillSizeWithTx, +}; pub use tx::Tx; #[cfg(all(feature = "waku", feature = "libp2p"))] @@ -48,8 +51,12 @@ pub type Carnot = CarnotConsensus< MempoolWakuAdapter, FlatOverlay, Blob, + FillSizeWithTx, + FillSizeWithBlobs, >; +const MB16: usize = 1024 * 1024 * 16; + #[cfg(feature = "libp2p")] pub type Carnot = CarnotConsensus< ConsensusLibp2pAdapter, @@ -57,6 +64,8 @@ pub type Carnot = CarnotConsensus< MempoolLibp2pAdapter, FlatOverlay, Blob, + FillSizeWithTx, + FillSizeWithBlobs, >; #[cfg(feature = "libp2p")] diff --git a/nomos-core/src/block/builder.rs b/nomos-core/src/block/builder.rs index 2521e609..30eba0c9 100644 --- a/nomos-core/src/block/builder.rs +++ b/nomos-core/src/block/builder.rs @@ -83,6 +83,18 @@ where self } + #[must_use] + pub fn with_transactions(mut self, txs: impl Iterator + 'static) -> Self { + self.txs = Some(Box::new(txs)); + self + } + + #[must_use] + pub fn with_blobs(mut self, blobs: impl Iterator + 'static) -> Self { + self.blobs = Some(Box::new(blobs)); + self + } + #[allow(clippy::result_large_err)] pub fn build(self) -> Result, Self> { if let Self { diff --git a/nomos-core/src/da/blob/mod.rs b/nomos-core/src/da/blob/mod.rs index a8336314..9486dbb9 100644 --- a/nomos-core/src/da/blob/mod.rs +++ b/nomos-core/src/da/blob/mod.rs @@ -16,6 +16,9 @@ pub trait Blob { pub trait BlobSelect { type Blob: Blob; + type Settings: Clone; + + fn new(settings: Self::Settings) -> Self; fn select_blob_from<'i, I: Iterator + 'i>( &self, blobs: I, diff --git a/nomos-core/src/da/blob/select.rs b/nomos-core/src/da/blob/select.rs index 55ce76a7..d00723b4 100644 --- a/nomos-core/src/da/blob/select.rs +++ b/nomos-core/src/da/blob/select.rs @@ -6,7 +6,7 @@ use std::marker::PhantomData; use crate::da::blob::{Blob, BlobSelect}; use crate::utils; -#[derive(Default)] +#[derive(Default, Clone, Copy)] pub struct FillSize { _blob: PhantomData, } @@ -21,6 +21,11 @@ impl FillSize { impl BlobSelect for FillSize { type Blob = B; + type Settings = (); + + fn new(_settings: Self::Settings) -> Self { + FillSize::new() + } fn select_blob_from<'i, I: Iterator + 'i>( &self, diff --git a/nomos-core/src/da/mod.rs b/nomos-core/src/da/mod.rs index 66321b11..db301fba 100644 --- a/nomos-core/src/da/mod.rs +++ b/nomos-core/src/da/mod.rs @@ -23,7 +23,7 @@ pub trait DaProtocol { /// Depending on the protocol, it may be necessary to feed multiple blobs to /// recover the initial data. fn recv_blob(&mut self, blob: Self::Blob); - /// Attempt to recover the initial data from feeded blobs. + /// Attempt to recover the initial data from fed blobs. /// If the protocol is not yet ready to return the data, return None. fn extract(&mut self) -> Option; /// Attest that we have received and stored a blob. diff --git a/nomos-core/src/tx/mod.rs b/nomos-core/src/tx/mod.rs index 7f6430a6..1b329abd 100644 --- a/nomos-core/src/tx/mod.rs +++ b/nomos-core/src/tx/mod.rs @@ -22,6 +22,9 @@ pub trait Transaction { pub trait TxSelect { type Tx: Transaction; + type Settings: Clone; + fn new(settings: Self::Settings) -> Self; + fn select_tx_from<'i, I: Iterator + 'i>( &self, txs: I, diff --git a/nomos-core/src/tx/select.rs b/nomos-core/src/tx/select.rs index c788121d..3ddd7ffb 100644 --- a/nomos-core/src/tx/select.rs +++ b/nomos-core/src/tx/select.rs @@ -6,7 +6,7 @@ use std::marker::PhantomData; use crate::tx::{Transaction, TxSelect}; use crate::utils; -#[derive(Default)] +#[derive(Default, Clone, Copy)] pub struct FillSize { _tx: PhantomData, } @@ -21,6 +21,11 @@ impl FillSize { impl TxSelect for FillSize { type Tx = Tx; + type Settings = (); + + fn new(_: Self::Settings) -> Self { + FillSize::new() + } fn select_tx_from<'i, I: Iterator + 'i>( &self, diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 3391300e..8529ac94 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -35,8 +35,10 @@ use consensus_engine::{ use task_manager::TaskManager; use crate::committee_membership::UpdateableCommitteeMembership; +use nomos_core::block::builder::BlockBuilder; use nomos_core::block::Block; -use nomos_core::tx::Transaction; +use nomos_core::da::blob::{Blob, BlobSelect}; +use nomos_core::tx::{Transaction, TxSelect}; use nomos_core::vote::Tally; use nomos_mempool::{ backend::MemPool, network::NetworkAdapter as MempoolAdapter, MempoolMsg, MempoolService, @@ -59,39 +61,49 @@ fn default_timeout() -> Duration { pub type Seed = [u8; 32]; #[derive(Debug, Deserialize, Serialize)] -pub struct CarnotSettings { +pub struct CarnotSettings { pub private_key: [u8; 32], pub overlay_settings: O::Settings, #[serde(default = "default_timeout")] pub timeout: Duration, + #[serde(default)] + pub transaction_selector_settings: Ts, + #[serde(default)] + pub blob_selector_settings: Bs, } -impl Clone for CarnotSettings { +impl Clone for CarnotSettings { fn clone(&self) -> Self { Self { private_key: self.private_key, overlay_settings: self.overlay_settings.clone(), timeout: self.timeout, + transaction_selector_settings: self.transaction_selector_settings.clone(), + blob_selector_settings: self.blob_selector_settings.clone(), } } } -impl CarnotSettings { +impl CarnotSettings { #[inline] pub const fn new( private_key: [u8; 32], overlay_settings: O::Settings, + transaction_selector_settings: Ts, + blob_selector_settings: Bs, timeout: Duration, ) -> Self { Self { private_key, overlay_settings, timeout, + transaction_selector_settings, + blob_selector_settings, } } } -pub struct CarnotConsensus +pub struct CarnotConsensus where A: NetworkAdapter, M: MempoolAdapter, @@ -100,6 +112,8 @@ where P::Tx: Transaction + Debug + 'static, ::Hash: Debug, A::Backend: 'static, + TxS: TxSelect, + BS: BlobSelect, { service_state: ServiceStateHandle, // underlying networking backend. We need this so we can relay and check the types properly @@ -111,7 +125,7 @@ where _blob: std::marker::PhantomData, } -impl ServiceData for CarnotConsensus +impl ServiceData for CarnotConsensus where A: NetworkAdapter, P: MemPool, @@ -119,16 +133,18 @@ where ::Hash: Debug, M: MempoolAdapter, O: Overlay + Debug, + TxS: TxSelect, + BS: BlobSelect, { const SERVICE_ID: ServiceId = "Carnot"; - type Settings = CarnotSettings; + type Settings = CarnotSettings; type State = NoState; type StateOperator = NoOperator; type Message = ConsensusMsg; } #[async_trait::async_trait] -impl ServiceCore for CarnotConsensus +impl ServiceCore for CarnotConsensus where A: NetworkAdapter + Clone + Send + Sync + 'static, P: MemPool + Send + Sync + 'static, @@ -136,11 +152,15 @@ where P::Tx: Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static, ::Hash: Debug + Send + Sync, - B: Debug + Clone + Eq + Hash + Serialize + DeserializeOwned + Send + Sync + 'static, + B: Blob + Debug + Clone + Eq + Hash + Serialize + DeserializeOwned + Send + Sync + 'static, M: MempoolAdapter + Send + Sync + 'static, O: Overlay + Debug + Send + Sync + 'static, O::LeaderSelection: UpdateableLeaderSelection, O::CommitteeMembership: UpdateableCommitteeMembership, + TxS: TxSelect + Clone + Send + Sync + 'static, + TxS::Settings: Send + Sync + 'static, + BS: BlobSelect + Clone + Send + Sync + 'static, + BS::Settings: Send + Sync + 'static, { fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); @@ -171,6 +191,8 @@ where private_key, overlay_settings, timeout, + transaction_selector_settings, + blob_selector_settings, } = self.service_state.settings_reader.get_updated_settings(); let overlay = O::new(overlay_settings); @@ -196,6 +218,9 @@ where participating_nodes: carnot.root_committee(), }; + let tx_selector = TxS::new(transaction_selector_settings); + let blob_selector = BS::new(blob_selector_settings); + let mut task_manager = TaskManager::new(); let genesis_block = carnot.genesis_block(); @@ -245,6 +270,8 @@ where adapter.clone(), private_key, mempool_relay.clone(), + tx_selector.clone(), + blob_selector.clone(), timeout, ) .await @@ -265,7 +292,7 @@ enum Output { BroadcastProposal { proposal: Block }, } -impl CarnotConsensus +impl CarnotConsensus where A: NetworkAdapter + Clone + Send + Sync + 'static, P: MemPool + Send + Sync + 'static, @@ -273,11 +300,13 @@ where P::Tx: Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static, ::Hash: Debug + Send + Sync, - B: Debug + Clone + Eq + Hash + Serialize + DeserializeOwned + Send + Sync + 'static, + B: Blob + Debug + Clone + Eq + Hash + Serialize + DeserializeOwned + Send + Sync + 'static, M: MempoolAdapter + Send + Sync + 'static, O: Overlay + Debug + Send + Sync + 'static, O::LeaderSelection: UpdateableLeaderSelection, O::CommitteeMembership: UpdateableCommitteeMembership, + TxS: TxSelect + Clone + Send + Sync + 'static, + BS: BlobSelect + Clone + Send + Sync + 'static, { fn process_message(carnot: &Carnot, msg: ConsensusMsg) { match msg { @@ -306,6 +335,8 @@ where adapter: A, private_key: PrivateKey, mempool_relay: OutboundRelay>, + tx_selector: TxS, + blobl_selector: BS, timeout: Duration, ) -> Carnot { let mut output = None; @@ -354,7 +385,15 @@ where (carnot, output) = Self::process_root_timeout(carnot, timeouts).await; } Event::ProposeBlock { qc } => { - output = Self::propose_block(carnot.id(), private_key, qc, mempool_relay).await; + output = Self::propose_block( + carnot.id(), + private_key, + qc, + tx_selector.clone(), + blobl_selector.clone(), + mempool_relay, + ) + .await; } _ => {} } @@ -559,11 +598,16 @@ where (carnot, output) } - #[instrument(level = "debug", skip(mempool_relay, private_key))] + #[instrument( + level = "debug", + skip(mempool_relay, private_key, tx_selector, blob_selector) + )] async fn propose_block( id: NodeId, private_key: PrivateKey, qc: Qc, + tx_selector: TxS, + blob_selector: BS, mempool_relay: OutboundRelay>, ) -> Option> { let (reply_channel, rx) = tokio::sync::oneshot::channel(); @@ -579,7 +623,17 @@ where match rx.await { Ok(txs) => { let beacon = RandomBeaconState::generate_happy(qc.view(), &private_key); - let proposal = Block::new(qc.view().next(), qc, txs, [].into_iter(), id, beacon); + let Ok(proposal) = BlockBuilder::new(tx_selector, blob_selector) + .with_view(qc.view().next()) + .with_parent_qc(qc) + .with_proposer(id) + .with_beacon_state(beacon) + .with_transactions(txs) + .with_blobs([].into_iter()) + .build() + else { + panic!("Proposal block should always succeed to be built") + }; output = Some(Output::BroadcastProposal { proposal }); } Err(e) => tracing::error!("Could not fetch txs {e}"), diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index c41ef75c..efbc4604 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -272,6 +272,8 @@ fn create_node_config( leader_super_majority_threshold: Some(threshold), }, timeout, + transaction_selector_settings: (), + blob_selector_settings: (), }, log: Default::default(), http: nomos_http::http::HttpServiceSettings {