Block refactor (#368)

* Add sidecars to block

* Use cl and bl prefixes
This commit is contained in:
Daniel Sanchez 2023-09-05 13:43:36 +02:00 committed by GitHub
parent b9ef37e89e
commit 57746f5e76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 96 additions and 62 deletions

View File

@ -0,0 +1 @@
pub type Blob = Box<[u8]>;

View File

@ -1,3 +1,4 @@
mod blob;
mod tx;
use color_eyre::eyre::Result;
@ -32,6 +33,7 @@ use serde::{Deserialize, Serialize};
#[cfg(feature = "waku")]
use waku_bindings::SecretKey;
use crate::blob::Blob;
pub use tx::Tx;
#[cfg(all(feature = "waku", feature = "libp2p"))]
@ -76,6 +78,7 @@ pub type Carnot = CarnotConsensus<
MempoolWakuAdapter<Tx>,
MockFountain,
FlatOverlay<RoundRobin, RandomBeaconState>,
Blob,
>;
#[cfg(feature = "libp2p")]
@ -85,6 +88,7 @@ pub type Carnot = CarnotConsensus<
MempoolLibp2pAdapter<Tx>,
MockFountain,
FlatOverlay<RoundRobin, RandomBeaconState>,
Blob,
>;
#[derive(Services)]

View File

@ -15,21 +15,27 @@ pub type TxHash = [u8; 32];
/// A block
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Block<TxId: Clone + Eq + Hash> {
pub struct Block<Tx: Clone + Eq + Hash, Blob: Clone + Eq + Hash> {
header: consensus_engine::Block,
transactions: IndexSet<TxId>,
beacon: RandomBeaconState,
cl_transactions: IndexSet<Tx>,
bl_blobs: IndexSet<Blob>,
}
impl<TxId: Clone + Eq + Hash + Serialize + DeserializeOwned> Block<TxId> {
impl<
Tx: Clone + Eq + Hash + Serialize + DeserializeOwned,
Blob: Clone + Eq + Hash + Serialize + DeserializeOwned,
> Block<Tx, Blob>
{
pub fn new(
view: View,
parent_qc: Qc,
txs: impl Iterator<Item = TxId>,
txs: impl Iterator<Item = Tx>,
proposer: NodeId,
beacon: RandomBeaconState,
) -> Self {
let transactions = txs.collect();
let blobs = IndexSet::new();
let header = consensus_engine::Block {
id: BlockId::zeros(),
view,
@ -38,11 +44,11 @@ impl<TxId: Clone + Eq + Hash + Serialize + DeserializeOwned> Block<TxId> {
leader_id: proposer,
},
};
let mut s = Self {
header,
transactions,
beacon,
cl_transactions: transactions,
bl_blobs: blobs,
};
let id = block_id_from_wire_content(&s);
s.header.id = id;
@ -50,13 +56,17 @@ impl<TxId: Clone + Eq + Hash + Serialize + DeserializeOwned> Block<TxId> {
}
}
impl<TxId: Clone + Eq + Hash> Block<TxId> {
impl<Tx: Clone + Eq + Hash, Blob: Clone + Eq + Hash> Block<Tx, Blob> {
pub fn header(&self) -> &consensus_engine::Block {
&self.header
}
pub fn transactions(&self) -> impl Iterator<Item = &TxId> + '_ {
self.transactions.iter()
pub fn transactions(&self) -> impl Iterator<Item = &Tx> + '_ {
self.cl_transactions.iter()
}
pub fn blobs(&self) -> impl Iterator<Item = &Blob> + '_ {
self.bl_blobs.iter()
}
pub fn beacon(&self) -> &RandomBeaconState {
@ -64,8 +74,11 @@ impl<TxId: Clone + Eq + Hash> Block<TxId> {
}
}
pub fn block_id_from_wire_content<Tx: Clone + Eq + Hash + Serialize + DeserializeOwned>(
block: &Block<Tx>,
pub fn block_id_from_wire_content<
Tx: Clone + Eq + Hash + Serialize + DeserializeOwned,
Blob: Clone + Eq + Hash + Serialize + DeserializeOwned,
>(
block: &Block<Tx, Blob>,
) -> consensus_engine::BlockId {
use blake2::digest::{consts::U32, Digest};
use blake2::Blake2b;
@ -75,7 +88,11 @@ pub fn block_id_from_wire_content<Tx: Clone + Eq + Hash + Serialize + Deserializ
BlockId::new(hasher.finalize().into())
}
impl<TxId: Clone + Eq + Hash + Serialize + DeserializeOwned> Block<TxId> {
impl<
Tx: Clone + Eq + Hash + Serialize + DeserializeOwned,
Blob: Clone + Eq + Hash + Serialize + DeserializeOwned,
> Block<Tx, Blob>
{
/// Encode block into bytes
pub fn as_bytes(&self) -> Bytes {
wire::serialize(self).unwrap().into()

View File

@ -15,9 +15,9 @@ use nomos_core::block::Block;
pub trait UpdateableCommitteeMembership: CommitteeMembership {
type Error: Error;
fn on_new_block_received<Tx: Hash + Clone + Eq>(
fn on_new_block_received<Tx: Hash + Clone + Eq, Blob: Clone + Eq + Hash>(
&self,
block: &Block<Tx>,
block: &Block<Tx, Blob>,
) -> Result<Self, Self::Error>;
fn on_timeout_qc_received(&self, qc: &TimeoutQc) -> Result<Self, Self::Error>;
}
@ -25,9 +25,9 @@ pub trait UpdateableCommitteeMembership: CommitteeMembership {
impl UpdateableCommitteeMembership for FreezeMembership {
type Error = Infallible;
fn on_new_block_received<Tx: Hash + Clone + Eq>(
fn on_new_block_received<Tx: Hash + Clone + Eq, Blob: Clone + Eq + Hash>(
&self,
_block: &Block<Tx>,
_block: &Block<Tx, Blob>,
) -> Result<Self, Self::Error> {
Ok(Self)
}
@ -40,9 +40,9 @@ impl UpdateableCommitteeMembership for FreezeMembership {
impl UpdateableCommitteeMembership for RandomBeaconState {
type Error = RandomBeaconError;
fn on_new_block_received<Tx: Hash + Clone + Eq>(
fn on_new_block_received<Tx: Hash + Clone + Eq, Blob: Clone + Eq + Hash>(
&self,
block: &Block<Tx>,
block: &Block<Tx, Blob>,
) -> Result<Self, Self::Error> {
self.check_advance_happy(block.beacon().clone(), block.header().parent_qc.view())
}

View File

@ -9,9 +9,9 @@ use std::{convert::Infallible, error::Error, hash::Hash};
pub trait UpdateableLeaderSelection: LeaderSelection {
type Error: Error;
fn on_new_block_received<Tx: Hash + Clone + Eq>(
fn on_new_block_received<Tx: Hash + Clone + Eq, Blob: Clone + Eq + Hash>(
&self,
block: &Block<Tx>,
block: &Block<Tx, Blob>,
) -> Result<Self, Self::Error>;
fn on_timeout_qc_received(&self, qc: &TimeoutQc) -> Result<Self, Self::Error>;
}
@ -19,9 +19,9 @@ pub trait UpdateableLeaderSelection: LeaderSelection {
impl UpdateableLeaderSelection for RoundRobin {
type Error = Infallible;
fn on_new_block_received<Tx: Hash + Clone + Eq>(
fn on_new_block_received<Tx: Hash + Clone + Eq, Blob: Clone + Eq + Hash>(
&self,
_block: &Block<Tx>,
_block: &Block<Tx, Blob>,
) -> Result<Self, Self::Error> {
Ok(self.advance())
}
@ -34,9 +34,9 @@ impl UpdateableLeaderSelection for RoundRobin {
impl UpdateableLeaderSelection for RandomBeaconState {
type Error = RandomBeaconError;
fn on_new_block_received<Tx: Hash + Clone + Eq>(
fn on_new_block_received<Tx: Hash + Clone + Eq, Blob: Clone + Eq + Hash>(
&self,
block: &Block<Tx>,
block: &Block<Tx, Blob>,
) -> Result<Self, Self::Error> {
self.check_advance_happy(block.beacon().clone(), block.header().parent_qc.view())
// TODO: check random beacon public keys is leader id

View File

@ -96,7 +96,7 @@ impl<Fountain: FountainCode, O: Overlay> CarnotSettings<Fountain, O> {
}
}
pub struct CarnotConsensus<A, P, M, F, O>
pub struct CarnotConsensus<A, P, M, F, O, B>
where
F: FountainCode,
A: NetworkAdapter,
@ -114,9 +114,11 @@ where
mempool_relay: Relay<MempoolService<M, P>>,
_fountain: std::marker::PhantomData<F>,
_overlay: std::marker::PhantomData<O>,
// this need to be substituted by some kind DA bo
_blob: std::marker::PhantomData<B>,
}
impl<A, P, M, F, O> ServiceData for CarnotConsensus<A, P, M, F, O>
impl<A, P, M, F, O, B> ServiceData for CarnotConsensus<A, P, M, F, O, B>
where
F: FountainCode,
A: NetworkAdapter,
@ -134,7 +136,7 @@ where
}
#[async_trait::async_trait]
impl<A, P, M, F, O> ServiceCore for CarnotConsensus<A, P, M, F, O>
impl<A, P, M, F, O, B> ServiceCore for CarnotConsensus<A, P, M, F, O, B>
where
F: FountainCode + Clone + Send + Sync + 'static,
A: NetworkAdapter + Clone + Send + Sync + 'static,
@ -143,6 +145,7 @@ where
P::Tx:
Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
<P::Tx as Transaction>::Hash: Debug + Send + Sync,
B: Debug + Clone + Eq + Hash + Serialize + DeserializeOwned + Send + Sync + 'static,
M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static,
O: Overlay + Debug + Send + Sync + 'static,
O::LeaderSelection: UpdateableLeaderSelection,
@ -156,6 +159,7 @@ where
network_relay,
_fountain: Default::default(),
_overlay: Default::default(),
_blob: Default::default(),
mempool_relay,
})
}
@ -268,13 +272,13 @@ where
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
enum Output<Tx: Clone + Eq + Hash> {
enum Output<Tx: Clone + Eq + Hash, Blob: Clone + Eq + Hash> {
Send(consensus_engine::Send),
BroadcastTimeoutQc { timeout_qc: TimeoutQc },
BroadcastProposal { proposal: Block<Tx> },
BroadcastProposal { proposal: Block<Tx, Blob> },
}
impl<A, P, M, F, O> CarnotConsensus<A, P, M, F, O>
impl<A, P, M, F, O, B> CarnotConsensus<A, P, M, F, O, B>
where
F: FountainCode + Clone + Send + Sync + 'static,
A: NetworkAdapter + Clone + Send + Sync + 'static,
@ -283,6 +287,7 @@ where
P::Tx:
Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
<P::Tx as Transaction>::Hash: Debug + Send + Sync,
B: Debug + Clone + Eq + Hash + Serialize + DeserializeOwned + Send + Sync + 'static,
M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static,
O: Overlay + Debug + Send + Sync + 'static,
O::LeaderSelection: UpdateableLeaderSelection,
@ -310,8 +315,8 @@ where
#[allow(clippy::too_many_arguments)]
async fn process_carnot_event(
mut carnot: Carnot<O>,
event: Event<P::Tx>,
task_manager: &mut TaskManager<View, Event<P::Tx>>,
event: Event<P::Tx, B>,
task_manager: &mut TaskManager<View, Event<P::Tx, B>>,
adapter: A,
private_key: PrivateKey,
mempool_relay: OutboundRelay<MempoolMsg<P::Tx>>,
@ -329,7 +334,7 @@ where
tracing::debug!("approving proposal {:?}", block);
let (new_carnot, out) = carnot.approve_block(block);
carnot = new_carnot;
output = Some(Output::Send::<P::Tx>(out));
output = Some(Output::Send::<P::Tx, B>(out));
}
Event::LocalTimeout { view } => {
tracing::debug!("local timeout");
@ -391,11 +396,11 @@ where
#[instrument(level = "debug", skip(adapter, task_manager, stream))]
async fn process_block(
mut carnot: Carnot<O>,
block: Block<P::Tx>,
mut stream: Pin<Box<dyn Stream<Item = Block<P::Tx>> + Send>>,
task_manager: &mut TaskManager<View, Event<P::Tx>>,
block: Block<P::Tx, B>,
mut stream: Pin<Box<dyn Stream<Item = Block<P::Tx, B>> + Send>>,
task_manager: &mut TaskManager<View, Event<P::Tx, B>>,
adapter: A,
) -> (Carnot<O>, Option<Output<P::Tx>>) {
) -> (Carnot<O>, Option<Output<P::Tx, B>>) {
tracing::debug!("received proposal {:?}", block);
if carnot.highest_voted_view() >= block.header().view {
tracing::debug!("already voted for view {}", block.header().view);
@ -471,9 +476,9 @@ where
carnot: Carnot<O>,
timeout_qc: TimeoutQc,
new_views: HashSet<NewView>,
task_manager: &mut TaskManager<View, Event<P::Tx>>,
task_manager: &mut TaskManager<View, Event<P::Tx, B>>,
adapter: A,
) -> (Carnot<O>, Option<Output<P::Tx>>) {
) -> (Carnot<O>, Option<Output<P::Tx, B>>) {
let leader_committee = [carnot.id()].into_iter().collect();
let leader_tally_settings = CarnotTallySettings {
threshold: carnot.leader_super_majority_threshold(),
@ -508,9 +513,9 @@ where
async fn receive_timeout_qc(
carnot: Carnot<O>,
timeout_qc: TimeoutQc,
task_manager: &mut TaskManager<View, Event<P::Tx>>,
task_manager: &mut TaskManager<View, Event<P::Tx, B>>,
adapter: A,
) -> (Carnot<O>, Option<Output<P::Tx>>) {
) -> (Carnot<O>, Option<Output<P::Tx, B>>) {
let mut new_state = carnot.receive_timeout_qc(timeout_qc.clone());
let self_committee = carnot.self_committee();
let tally_settings = CarnotTallySettings {
@ -535,7 +540,7 @@ where
async fn process_root_timeout(
carnot: Carnot<O>,
timeouts: HashSet<Timeout>,
) -> (Carnot<O>, Option<Output<P::Tx>>) {
) -> (Carnot<O>, Option<Output<P::Tx, B>>) {
// we might have received a timeout_qc sent by some other node and advanced the view
// already, in which case we should ignore the timeout
if carnot.current_view()
@ -575,7 +580,7 @@ where
private_key: PrivateKey,
qc: Qc,
mempool_relay: OutboundRelay<MempoolMsg<P::Tx>>,
) -> Option<Output<P::Tx>> {
) -> Option<Output<P::Tx, B>> {
let (reply_channel, rx) = tokio::sync::oneshot::channel();
let mut output = None;
mempool_relay
@ -600,7 +605,7 @@ where
async fn process_view_change(
carnot: Carnot<O>,
prev_view: View,
task_manager: &mut TaskManager<View, Event<P::Tx>>,
task_manager: &mut TaskManager<View, Event<P::Tx, B>>,
adapter: A,
timeout: Duration,
) {
@ -637,7 +642,7 @@ where
}
}
async fn gather_timeout_qc(adapter: A, view: consensus_engine::View) -> Event<P::Tx> {
async fn gather_timeout_qc(adapter: A, view: consensus_engine::View) -> Event<P::Tx, B> {
if let Some(timeout_qc) = adapter
.timeout_qc_stream(view)
.await
@ -657,7 +662,7 @@ where
committee: Committee,
block: consensus_engine::Block,
tally: CarnotTallySettings,
) -> Event<P::Tx> {
) -> Event<P::Tx, B> {
let tally = CarnotTally::new(tally);
let votes_stream = adapter.votes_stream(&committee, block.view, block.id).await;
match tally.tally(block.clone(), votes_stream).await {
@ -674,7 +679,7 @@ where
committee: Committee,
timeout_qc: TimeoutQc,
tally: CarnotTallySettings,
) -> Event<P::Tx> {
) -> Event<P::Tx, B> {
let tally = NewViewTally::new(tally);
let stream = adapter
.new_view_stream(&committee, timeout_qc.view().next())
@ -696,7 +701,7 @@ where
committee: Committee,
view: consensus_engine::View,
tally: CarnotTallySettings,
) -> Event<P::Tx> {
) -> Event<P::Tx, B> {
let tally = TimeoutTally::new(tally);
let stream = adapter.timeout_stream(&committee, view).await;
match tally.tally(view, stream).await {
@ -708,7 +713,7 @@ where
}
#[instrument(level = "debug", skip(adapter))]
async fn gather_block(adapter: A, view: consensus_engine::View) -> Event<P::Tx> {
async fn gather_block(adapter: A, view: consensus_engine::View) -> Event<P::Tx, B> {
let stream = adapter
.proposal_chunks_stream(view)
.await
@ -770,11 +775,16 @@ where
}
}
async fn handle_output<A, F, Tx>(adapter: &A, fountain: &F, node_id: NodeId, output: Output<Tx>)
where
async fn handle_output<A, F, Tx, B>(
adapter: &A,
fountain: &F,
node_id: NodeId,
output: Output<Tx, B>,
) where
A: NetworkAdapter,
F: FountainCode,
Tx: Hash + Eq + Clone + Serialize + DeserializeOwned + Debug,
B: Clone + Eq + Hash + Serialize + DeserializeOwned,
{
match output {
Output::Send(consensus_engine::Send { to, payload }) => match payload {
@ -836,10 +846,11 @@ where
}
}
enum Event<Tx: Clone + Hash + Eq> {
#[allow(clippy::large_enum_variant)]
enum Event<Tx: Clone + Hash + Eq, Blob: Clone + Eq + Hash> {
Proposal {
block: Block<Tx>,
stream: Pin<Box<dyn Stream<Item = Block<Tx>> + Send>>,
block: Block<Tx, Blob>,
stream: Pin<Box<dyn Stream<Item = Block<Tx, Blob>> + Send>>,
},
#[allow(dead_code)]
Approve {

View File

@ -16,7 +16,7 @@ pub fn to_overlay_node<R: Rng>(
nodes: Vec<NodeId>,
leader: NodeId,
network_interface: InMemoryNetworkInterface<CarnotMessage>,
genesis: nomos_core::block::Block<[u8; 32]>,
genesis: nomos_core::block::Block<[u8; 32], Box<[u8]>>,
mut rng: R,
settings: &SimulationSettings,
) -> BoxedNode<CarnotSettings, CarnotState> {

View File

@ -10,6 +10,7 @@ use std::hash::Hash;
use std::time::Duration;
pub type CarnotTx = [u8; 32];
pub type CarnotBlob = Box<[u8]>;
pub(crate) struct EventBuilder {
id: NodeId,
@ -230,7 +231,7 @@ impl EventBuilder {
pub enum Event<Tx: Clone + Hash + Eq> {
Proposal {
block: Block<Tx>,
block: Block<Tx, Box<[u8]>>,
},
#[allow(dead_code)]
Approve {

View File

@ -23,7 +23,7 @@ use serde::Deserialize;
use self::messages::CarnotMessage;
use super::{Node, NodeId};
use crate::network::{InMemoryNetworkInterface, NetworkInterface, NetworkMessage};
use crate::node::carnot::event_builder::{CarnotTx, Event};
use crate::node::carnot::event_builder::{CarnotBlob, CarnotTx, Event};
use crate::node::carnot::message_cache::MessageCache;
use crate::output_processors::{Record, RecordType, Runtime};
use crate::settings::SimulationSettings;
@ -90,7 +90,7 @@ impl<
id: consensus_engine::NodeId,
settings: CarnotSettings,
overlay_settings: O::Settings,
genesis: nomos_core::block::Block<CarnotTx>,
genesis: nomos_core::block::Block<CarnotTx, CarnotBlob>,
network_interface: InMemoryNetworkInterface<CarnotMessage>,
rng: &mut R,
) -> Self {
@ -120,7 +120,7 @@ impl<
this
}
fn handle_output(&self, output: Output<CarnotTx>) {
fn handle_output(&self, output: Output<CarnotTx, CarnotBlob>) {
match output {
Output::Send(consensus_engine::Send {
to,
@ -343,7 +343,7 @@ impl<
fn update_overlay_with_block<Tx: Clone + Eq + Hash>(
state: Carnot<O>,
block: &nomos_core::block::Block<Tx>,
block: &nomos_core::block::Block<Tx, CarnotBlob>,
) -> Carnot<O> {
state
.update_overlay(|overlay| {
@ -438,12 +438,12 @@ impl<
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
enum Output<Tx: Clone + Eq + Hash> {
enum Output<Tx: Clone + Eq + Hash, Blob: Clone + Eq + Hash> {
Send(consensus_engine::Send),
BroadcastTimeoutQc {
timeout_qc: TimeoutQc,
},
BroadcastProposal {
proposal: nomos_core::block::Block<Tx>,
proposal: nomos_core::block::Block<Tx, Blob>,
},
}