Remove block contents from mempool (#485)

* Add Hash type param to Attestation and Certificate

* remove block contents from mempool
This commit is contained in:
Giacomo Pasini 2023-10-30 12:38:04 +01:00 committed by GitHub
parent 7fcfe890be
commit e50561839d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 60 additions and 8 deletions

View File

@ -1,9 +1,11 @@
use crate::da::blob::Blob; use crate::da::blob::Blob;
use bytes::Bytes; use bytes::Bytes;
use std::hash::Hash;
pub trait Attestation { pub trait Attestation {
type Blob: Blob; type Blob: Blob;
type Hash: Hash + Eq + Clone;
fn blob(&self) -> <Self::Blob as Blob>::Hash; fn blob(&self) -> <Self::Blob as Blob>::Hash;
fn hash(&self) -> <Self::Blob as Blob>::Hash; fn hash(&self) -> Self::Hash;
fn as_bytes(&self) -> Bytes; fn as_bytes(&self) -> Bytes;
} }

View File

@ -2,11 +2,13 @@ pub mod select;
use crate::da::blob::Blob; use crate::da::blob::Blob;
use bytes::Bytes; use bytes::Bytes;
use std::hash::Hash;
pub trait Certificate { pub trait Certificate {
type Blob: Blob; type Blob: Blob;
type Hash: Hash + Eq + Clone;
fn blob(&self) -> <Self::Blob as Blob>::Hash; fn blob(&self) -> <Self::Blob as Blob>::Hash;
fn hash(&self) -> <Self::Blob as Blob>::Hash; fn hash(&self) -> Self::Hash;
fn as_bytes(&self) -> Bytes; fn as_bytes(&self) -> Bytes;
} }

View File

@ -117,6 +117,8 @@ pub struct Attestation {
impl attestation::Attestation for Attestation { impl attestation::Attestation for Attestation {
type Blob = Blob; type Blob = Blob;
type Hash = [u8; 32];
fn blob(&self) -> [u8; 32] { fn blob(&self) -> [u8; 32] {
self.blob self.blob
} }
@ -145,6 +147,7 @@ impl Hash for Certificate {
impl certificate::Certificate for Certificate { impl certificate::Certificate for Certificate {
type Blob = Blob; type Blob = Blob;
type Hash = [u8; 32];
fn blob(&self) -> <Self::Blob as blob::Blob>::Hash { fn blob(&self) -> <Self::Blob as blob::Blob>::Hash {
self.attestations[0].blob self.attestations[0].blob

View File

@ -166,7 +166,7 @@ where
ClPool::Settings: Send + Sync + 'static, ClPool::Settings: Send + Sync + 'static,
DaPool: MemPool + Send + Sync + 'static, DaPool: MemPool + Send + Sync + 'static,
DaPool::Settings: Send + Sync + 'static, DaPool::Settings: Send + Sync + 'static,
ClPool::Item: Transaction ClPool::Item: Transaction<Hash = ClPool::Key>
+ Debug + Debug
+ Clone + Clone
+ Eq + Eq
@ -176,7 +176,7 @@ where
+ Send + Send
+ Sync + Sync
+ 'static, + 'static,
DaPool::Item: Certificate DaPool::Item: Certificate<Hash = DaPool::Key>
+ Debug + Debug
+ Clone + Clone
+ Eq + Eq
@ -365,7 +365,7 @@ where
ClPool::Settings: Send + Sync + 'static, ClPool::Settings: Send + Sync + 'static,
DaPool: MemPool + Send + Sync + 'static, DaPool: MemPool + Send + Sync + 'static,
DaPool::Settings: Send + Sync + 'static, DaPool::Settings: Send + Sync + 'static,
ClPool::Item: Transaction ClPool::Item: Transaction<Hash = ClPool::Key>
+ Debug + Debug
+ Clone + Clone
+ Eq + Eq
@ -375,7 +375,7 @@ where
+ Send + Send
+ Sync + Sync
+ 'static, + 'static,
DaPool::Item: Certificate DaPool::Item: Certificate<Hash = DaPool::Key>
+ Debug + Debug
+ Clone + Clone
+ Eq + Eq
@ -455,6 +455,8 @@ where
task_manager, task_manager,
adapter.clone(), adapter.clone(),
storage_relay, storage_relay,
cl_mempool_relay,
da_mempool_relay,
) )
.await; .await;
} }
@ -530,8 +532,19 @@ where
carnot carnot
} }
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity, clippy::too_many_arguments)]
#[instrument(level = "debug", skip(adapter, task_manager, stream, storage_relay))] #[instrument(
level = "debug",
skip(
carnot,
adapter,
task_manager,
stream,
storage_relay,
cl_mempool_relay,
da_mempool_relay
)
)]
async fn process_block( async fn process_block(
mut carnot: Carnot<O>, mut carnot: Carnot<O>,
block: Block<ClPool::Item, DaPool::Item>, block: Block<ClPool::Item, DaPool::Item>,
@ -539,6 +552,8 @@ where
task_manager: &mut TaskManager<View, Event<ClPool::Item, DaPool::Item>>, task_manager: &mut TaskManager<View, Event<ClPool::Item, DaPool::Item>>,
adapter: A, adapter: A,
storage_relay: OutboundRelay<StorageMsg<Storage>>, storage_relay: OutboundRelay<StorageMsg<Storage>>,
cl_mempool_relay: OutboundRelay<MempoolMsg<ClPool::Item, ClPool::Key>>,
da_mempool_relay: OutboundRelay<MempoolMsg<DaPool::Item, DaPool::Key>>,
) -> (Carnot<O>, Option<Output<ClPool::Item, DaPool::Item>>) { ) -> (Carnot<O>, Option<Output<ClPool::Item, DaPool::Item>>) {
tracing::debug!("received proposal {:?}", block); tracing::debug!("received proposal {:?}", block);
if carnot.highest_voted_view() >= block.header().view { if carnot.highest_voted_view() >= block.header().view {
@ -569,6 +584,22 @@ where
if let Err((e, _msg)) = storage_relay.send(msg).await { if let Err((e, _msg)) = storage_relay.send(msg).await {
tracing::error!("Could not send block to storage: {e}"); tracing::error!("Could not send block to storage: {e}");
} }
// remove included content from mempool
mark_in_block(
cl_mempool_relay,
original_block.transactions().map(Transaction::hash),
block.id,
)
.await;
mark_in_block(
da_mempool_relay,
original_block.blobs().map(Certificate::hash),
block.id,
)
.await;
if new_view != carnot.current_view() { if new_view != carnot.current_view() {
task_manager.push( task_manager.push(
block.view, block.view,
@ -1076,6 +1107,20 @@ async fn get_mempool_contents<Item, Key>(
rx.await rx.await
} }
async fn mark_in_block<Item, Key>(
mempool: OutboundRelay<MempoolMsg<Item, Key>>,
ids: impl Iterator<Item = Key>,
block: BlockId,
) {
mempool
.send(MempoolMsg::MarkInBlock {
ids: ids.collect(),
block,
})
.await
.unwrap_or_else(|(e, _)| tracing::error!("Could not mark items in block: {e}"))
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use consensus_engine::Block; use consensus_engine::Block;