Pipe propose block (#422)

* Add new method to tx selector

* Add new method to blob selector

* Docs typo

* Added missing txs and blobs methods to block builder

* Extend selector generics

* Pipe proposing selector types

* Remove leftover

* Added missing types on waku feature
This commit is contained in:
Daniel Sanchez 2023-09-20 11:01:04 +02:00 committed by GitHub
parent e0cd35dcdc
commit 500683b0da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 110 additions and 17 deletions

View File

@ -36,6 +36,9 @@ use overwatch_derive::*;
use overwatch_rs::services::handle::ServiceHandle;
pub use config::{Config, ConsensusArgs, HttpArgs, LogArgs, NetworkArgs, OverlayArgs};
use nomos_core::{
da::blob::select::FillSize as FillSizeWithBlobs, tx::select::FillSize as FillSizeWithTx,
};
pub use tx::Tx;
#[cfg(all(feature = "waku", feature = "libp2p"))]
@ -48,8 +51,12 @@ pub type Carnot = CarnotConsensus<
MempoolWakuAdapter<Tx>,
FlatOverlay<RoundRobin, RandomBeaconState>,
Blob,
FillSizeWithTx<MB16, Tx>,
FillSizeWithBlobs<MB16, Blob>,
>;
const MB16: usize = 1024 * 1024 * 16;
#[cfg(feature = "libp2p")]
pub type Carnot = CarnotConsensus<
ConsensusLibp2pAdapter,
@ -57,6 +64,8 @@ pub type Carnot = CarnotConsensus<
MempoolLibp2pAdapter<Tx>,
FlatOverlay<RoundRobin, RandomBeaconState>,
Blob,
FillSizeWithTx<MB16, Tx>,
FillSizeWithBlobs<MB16, Blob>,
>;
#[cfg(feature = "libp2p")]

View File

@ -83,6 +83,18 @@ where
self
}
#[must_use]
pub fn with_transactions(mut self, txs: impl Iterator<Item = Tx> + 'static) -> Self {
self.txs = Some(Box::new(txs));
self
}
#[must_use]
pub fn with_blobs(mut self, blobs: impl Iterator<Item = B> + 'static) -> Self {
self.blobs = Some(Box::new(blobs));
self
}
#[allow(clippy::result_large_err)]
pub fn build(self) -> Result<Block<Tx, B>, Self> {
if let Self {

View File

@ -16,6 +16,9 @@ pub trait Blob {
pub trait BlobSelect {
type Blob: Blob;
type Settings: Clone;
fn new(settings: Self::Settings) -> Self;
fn select_blob_from<'i, I: Iterator<Item = Self::Blob> + 'i>(
&self,
blobs: I,

View File

@ -6,7 +6,7 @@ use std::marker::PhantomData;
use crate::da::blob::{Blob, BlobSelect};
use crate::utils;
#[derive(Default)]
#[derive(Default, Clone, Copy)]
pub struct FillSize<const SIZE: usize, B> {
_blob: PhantomData<B>,
}
@ -21,6 +21,11 @@ impl<const SIZE: usize, B> FillSize<SIZE, B> {
impl<const SIZE: usize, B: Blob> BlobSelect for FillSize<SIZE, B> {
type Blob = B;
type Settings = ();
fn new(_settings: Self::Settings) -> Self {
FillSize::new()
}
fn select_blob_from<'i, I: Iterator<Item = Self::Blob> + 'i>(
&self,

View File

@ -23,7 +23,7 @@ pub trait DaProtocol {
/// Depending on the protocol, it may be necessary to feed multiple blobs to
/// recover the initial data.
fn recv_blob(&mut self, blob: Self::Blob);
/// Attempt to recover the initial data from feeded blobs.
/// Attempt to recover the initial data from fed blobs.
/// If the protocol is not yet ready to return the data, return None.
fn extract(&mut self) -> Option<Bytes>;
/// Attest that we have received and stored a blob.

View File

@ -22,6 +22,9 @@ pub trait Transaction {
pub trait TxSelect {
type Tx: Transaction;
type Settings: Clone;
fn new(settings: Self::Settings) -> Self;
fn select_tx_from<'i, I: Iterator<Item = Self::Tx> + 'i>(
&self,
txs: I,

View File

@ -6,7 +6,7 @@ use std::marker::PhantomData;
use crate::tx::{Transaction, TxSelect};
use crate::utils;
#[derive(Default)]
#[derive(Default, Clone, Copy)]
pub struct FillSize<const SIZE: usize, Tx> {
_tx: PhantomData<Tx>,
}
@ -21,6 +21,11 @@ impl<const SIZE: usize, Tx> FillSize<SIZE, Tx> {
impl<const SIZE: usize, Tx: Transaction> TxSelect for FillSize<SIZE, Tx> {
type Tx = Tx;
type Settings = ();
fn new(_: Self::Settings) -> Self {
FillSize::new()
}
fn select_tx_from<'i, I: Iterator<Item = Self::Tx> + 'i>(
&self,

View File

@ -35,8 +35,10 @@ use consensus_engine::{
use task_manager::TaskManager;
use crate::committee_membership::UpdateableCommitteeMembership;
use nomos_core::block::builder::BlockBuilder;
use nomos_core::block::Block;
use nomos_core::tx::Transaction;
use nomos_core::da::blob::{Blob, BlobSelect};
use nomos_core::tx::{Transaction, TxSelect};
use nomos_core::vote::Tally;
use nomos_mempool::{
backend::MemPool, network::NetworkAdapter as MempoolAdapter, MempoolMsg, MempoolService,
@ -59,39 +61,49 @@ fn default_timeout() -> Duration {
pub type Seed = [u8; 32];
#[derive(Debug, Deserialize, Serialize)]
pub struct CarnotSettings<O: Overlay> {
pub struct CarnotSettings<O: Overlay, Ts, Bs> {
pub private_key: [u8; 32],
pub overlay_settings: O::Settings,
#[serde(default = "default_timeout")]
pub timeout: Duration,
#[serde(default)]
pub transaction_selector_settings: Ts,
#[serde(default)]
pub blob_selector_settings: Bs,
}
impl<O: Overlay> Clone for CarnotSettings<O> {
impl<O: Overlay, Ts: Clone, Bs: Clone> Clone for CarnotSettings<O, Ts, Bs> {
fn clone(&self) -> Self {
Self {
private_key: self.private_key,
overlay_settings: self.overlay_settings.clone(),
timeout: self.timeout,
transaction_selector_settings: self.transaction_selector_settings.clone(),
blob_selector_settings: self.blob_selector_settings.clone(),
}
}
}
impl<O: Overlay> CarnotSettings<O> {
impl<O: Overlay, Ts, Bs> CarnotSettings<O, Ts, Bs> {
#[inline]
pub const fn new(
private_key: [u8; 32],
overlay_settings: O::Settings,
transaction_selector_settings: Ts,
blob_selector_settings: Bs,
timeout: Duration,
) -> Self {
Self {
private_key,
overlay_settings,
timeout,
transaction_selector_settings,
blob_selector_settings,
}
}
}
pub struct CarnotConsensus<A, P, M, O, B>
pub struct CarnotConsensus<A, P, M, O, B, TxS, BS>
where
A: NetworkAdapter,
M: MempoolAdapter<Tx = P::Tx>,
@ -100,6 +112,8 @@ where
P::Tx: Transaction + Debug + 'static,
<P::Tx as Transaction>::Hash: Debug,
A::Backend: 'static,
TxS: TxSelect<Tx = P::Tx>,
BS: BlobSelect<Blob = B>,
{
service_state: ServiceStateHandle<Self>,
// underlying networking backend. We need this so we can relay and check the types properly
@ -111,7 +125,7 @@ where
_blob: std::marker::PhantomData<B>,
}
impl<A, P, M, O, B> ServiceData for CarnotConsensus<A, P, M, O, B>
impl<A, P, M, O, B, TxS, BS> ServiceData for CarnotConsensus<A, P, M, O, B, TxS, BS>
where
A: NetworkAdapter,
P: MemPool,
@ -119,16 +133,18 @@ where
<P::Tx as Transaction>::Hash: Debug,
M: MempoolAdapter<Tx = P::Tx>,
O: Overlay + Debug,
TxS: TxSelect<Tx = P::Tx>,
BS: BlobSelect<Blob = B>,
{
const SERVICE_ID: ServiceId = "Carnot";
type Settings = CarnotSettings<O>;
type Settings = CarnotSettings<O, TxS::Settings, BS::Settings>;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = ConsensusMsg;
}
#[async_trait::async_trait]
impl<A, P, M, O, B> ServiceCore for CarnotConsensus<A, P, M, O, B>
impl<A, P, M, O, B, TxS, BS> ServiceCore for CarnotConsensus<A, P, M, O, B, TxS, BS>
where
A: NetworkAdapter + Clone + Send + Sync + 'static,
P: MemPool + Send + Sync + 'static,
@ -136,11 +152,15 @@ where
P::Tx:
Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
<P::Tx as Transaction>::Hash: Debug + Send + Sync,
B: Debug + Clone + Eq + Hash + Serialize + DeserializeOwned + Send + Sync + 'static,
B: Blob + Debug + Clone + Eq + Hash + Serialize + DeserializeOwned + Send + Sync + 'static,
M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static,
O: Overlay + Debug + Send + Sync + 'static,
O::LeaderSelection: UpdateableLeaderSelection,
O::CommitteeMembership: UpdateableCommitteeMembership,
TxS: TxSelect<Tx = P::Tx> + Clone + Send + Sync + 'static,
TxS::Settings: Send + Sync + 'static,
BS: BlobSelect<Blob = B> + Clone + Send + Sync + 'static,
BS::Settings: Send + Sync + 'static,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay();
@ -171,6 +191,8 @@ where
private_key,
overlay_settings,
timeout,
transaction_selector_settings,
blob_selector_settings,
} = self.service_state.settings_reader.get_updated_settings();
let overlay = O::new(overlay_settings);
@ -196,6 +218,9 @@ where
participating_nodes: carnot.root_committee(),
};
let tx_selector = TxS::new(transaction_selector_settings);
let blob_selector = BS::new(blob_selector_settings);
let mut task_manager = TaskManager::new();
let genesis_block = carnot.genesis_block();
@ -245,6 +270,8 @@ where
adapter.clone(),
private_key,
mempool_relay.clone(),
tx_selector.clone(),
blob_selector.clone(),
timeout,
)
.await
@ -265,7 +292,7 @@ enum Output<Tx: Clone + Eq + Hash, Blob: Clone + Eq + Hash> {
BroadcastProposal { proposal: Block<Tx, Blob> },
}
impl<A, P, M, O, B> CarnotConsensus<A, P, M, O, B>
impl<A, P, M, O, B, TxS, BS> CarnotConsensus<A, P, M, O, B, TxS, BS>
where
A: NetworkAdapter + Clone + Send + Sync + 'static,
P: MemPool + Send + Sync + 'static,
@ -273,11 +300,13 @@ where
P::Tx:
Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
<P::Tx as Transaction>::Hash: Debug + Send + Sync,
B: Debug + Clone + Eq + Hash + Serialize + DeserializeOwned + Send + Sync + 'static,
B: Blob + Debug + Clone + Eq + Hash + Serialize + DeserializeOwned + Send + Sync + 'static,
M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static,
O: Overlay + Debug + Send + Sync + 'static,
O::LeaderSelection: UpdateableLeaderSelection,
O::CommitteeMembership: UpdateableCommitteeMembership,
TxS: TxSelect<Tx = P::Tx> + Clone + Send + Sync + 'static,
BS: BlobSelect<Blob = B> + Clone + Send + Sync + 'static,
{
fn process_message(carnot: &Carnot<O>, msg: ConsensusMsg) {
match msg {
@ -306,6 +335,8 @@ where
adapter: A,
private_key: PrivateKey,
mempool_relay: OutboundRelay<MempoolMsg<P::Tx>>,
tx_selector: TxS,
blobl_selector: BS,
timeout: Duration,
) -> Carnot<O> {
let mut output = None;
@ -354,7 +385,15 @@ where
(carnot, output) = Self::process_root_timeout(carnot, timeouts).await;
}
Event::ProposeBlock { qc } => {
output = Self::propose_block(carnot.id(), private_key, qc, mempool_relay).await;
output = Self::propose_block(
carnot.id(),
private_key,
qc,
tx_selector.clone(),
blobl_selector.clone(),
mempool_relay,
)
.await;
}
_ => {}
}
@ -559,11 +598,16 @@ where
(carnot, output)
}
#[instrument(level = "debug", skip(mempool_relay, private_key))]
#[instrument(
level = "debug",
skip(mempool_relay, private_key, tx_selector, blob_selector)
)]
async fn propose_block(
id: NodeId,
private_key: PrivateKey,
qc: Qc,
tx_selector: TxS,
blob_selector: BS,
mempool_relay: OutboundRelay<MempoolMsg<P::Tx>>,
) -> Option<Output<P::Tx, B>> {
let (reply_channel, rx) = tokio::sync::oneshot::channel();
@ -579,7 +623,17 @@ where
match rx.await {
Ok(txs) => {
let beacon = RandomBeaconState::generate_happy(qc.view(), &private_key);
let proposal = Block::new(qc.view().next(), qc, txs, [].into_iter(), id, beacon);
let Ok(proposal) = BlockBuilder::new(tx_selector, blob_selector)
.with_view(qc.view().next())
.with_parent_qc(qc)
.with_proposer(id)
.with_beacon_state(beacon)
.with_transactions(txs)
.with_blobs([].into_iter())
.build()
else {
panic!("Proposal block should always succeed to be built")
};
output = Some(Output::BroadcastProposal { proposal });
}
Err(e) => tracing::error!("Could not fetch txs {e}"),

View File

@ -272,6 +272,8 @@ fn create_node_config(
leader_super_majority_threshold: Some(threshold),
},
timeout,
transaction_selector_settings: (),
blob_selector_settings: (),
},
log: Default::default(),
http: nomos_http::http::HttpServiceSettings {