From e50561839dbfe8c429b364c7265a36f407edc6dd Mon Sep 17 00:00:00 2001 From: Giacomo Pasini <21265557+zeegomo@users.noreply.github.com> Date: Mon, 30 Oct 2023 12:38:04 +0100 Subject: [PATCH] Remove block contents from mempool (#485) * Add Hash type param to Attestation and Certificate * remove block contents from mempool --- nomos-core/src/da/attestation/mod.rs | 4 +- nomos-core/src/da/certificate/mod.rs | 4 +- nomos-da/full-replication/src/lib.rs | 3 ++ nomos-services/consensus/src/lib.rs | 57 +++++++++++++++++++++++++--- 4 files changed, 60 insertions(+), 8 deletions(-) diff --git a/nomos-core/src/da/attestation/mod.rs b/nomos-core/src/da/attestation/mod.rs index 713596b9..a3dfd8dd 100644 --- a/nomos-core/src/da/attestation/mod.rs +++ b/nomos-core/src/da/attestation/mod.rs @@ -1,9 +1,11 @@ use crate::da::blob::Blob; use bytes::Bytes; +use std::hash::Hash; pub trait Attestation { type Blob: Blob; + type Hash: Hash + Eq + Clone; fn blob(&self) -> ::Hash; - fn hash(&self) -> ::Hash; + fn hash(&self) -> Self::Hash; fn as_bytes(&self) -> Bytes; } diff --git a/nomos-core/src/da/certificate/mod.rs b/nomos-core/src/da/certificate/mod.rs index 2d055f3d..0885c020 100644 --- a/nomos-core/src/da/certificate/mod.rs +++ b/nomos-core/src/da/certificate/mod.rs @@ -2,11 +2,13 @@ pub mod select; use crate::da::blob::Blob; use bytes::Bytes; +use std::hash::Hash; pub trait Certificate { type Blob: Blob; + type Hash: Hash + Eq + Clone; fn blob(&self) -> ::Hash; - fn hash(&self) -> ::Hash; + fn hash(&self) -> Self::Hash; fn as_bytes(&self) -> Bytes; } diff --git a/nomos-da/full-replication/src/lib.rs b/nomos-da/full-replication/src/lib.rs index da0bfa9b..fd515231 100644 --- a/nomos-da/full-replication/src/lib.rs +++ b/nomos-da/full-replication/src/lib.rs @@ -117,6 +117,8 @@ pub struct Attestation { impl attestation::Attestation for Attestation { type Blob = Blob; + type Hash = [u8; 32]; + fn blob(&self) -> [u8; 32] { self.blob } @@ -145,6 +147,7 @@ impl Hash for Certificate { impl certificate::Certificate for Certificate { type Blob = Blob; + type Hash = [u8; 32]; fn blob(&self) -> ::Hash { self.attestations[0].blob diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 8c84bc47..60c0c13d 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -166,7 +166,7 @@ where ClPool::Settings: Send + Sync + 'static, DaPool: MemPool + Send + Sync + 'static, DaPool::Settings: Send + Sync + 'static, - ClPool::Item: Transaction + ClPool::Item: Transaction + Debug + Clone + Eq @@ -176,7 +176,7 @@ where + Send + Sync + 'static, - DaPool::Item: Certificate + DaPool::Item: Certificate + Debug + Clone + Eq @@ -365,7 +365,7 @@ where ClPool::Settings: Send + Sync + 'static, DaPool: MemPool + Send + Sync + 'static, DaPool::Settings: Send + Sync + 'static, - ClPool::Item: Transaction + ClPool::Item: Transaction + Debug + Clone + Eq @@ -375,7 +375,7 @@ where + Send + Sync + 'static, - DaPool::Item: Certificate + DaPool::Item: Certificate + Debug + Clone + Eq @@ -455,6 +455,8 @@ where task_manager, adapter.clone(), storage_relay, + cl_mempool_relay, + da_mempool_relay, ) .await; } @@ -530,8 +532,19 @@ where carnot } - #[allow(clippy::type_complexity)] - #[instrument(level = "debug", skip(adapter, task_manager, stream, storage_relay))] + #[allow(clippy::type_complexity, clippy::too_many_arguments)] + #[instrument( + level = "debug", + skip( + carnot, + adapter, + task_manager, + stream, + storage_relay, + cl_mempool_relay, + da_mempool_relay + ) + )] async fn process_block( mut carnot: Carnot, block: Block, @@ -539,6 +552,8 @@ where task_manager: &mut TaskManager>, adapter: A, storage_relay: OutboundRelay>, + cl_mempool_relay: OutboundRelay>, + da_mempool_relay: OutboundRelay>, ) -> (Carnot, Option>) { tracing::debug!("received proposal {:?}", block); if carnot.highest_voted_view() >= block.header().view { @@ -569,6 +584,22 @@ where if let Err((e, _msg)) = storage_relay.send(msg).await { tracing::error!("Could not send block to storage: {e}"); } + + // remove included content from mempool + mark_in_block( + cl_mempool_relay, + original_block.transactions().map(Transaction::hash), + block.id, + ) + .await; + + mark_in_block( + da_mempool_relay, + original_block.blobs().map(Certificate::hash), + block.id, + ) + .await; + if new_view != carnot.current_view() { task_manager.push( block.view, @@ -1076,6 +1107,20 @@ async fn get_mempool_contents( rx.await } +async fn mark_in_block( + mempool: OutboundRelay>, + ids: impl Iterator, + block: BlockId, +) { + mempool + .send(MempoolMsg::MarkInBlock { + ids: ids.collect(), + block, + }) + .await + .unwrap_or_else(|(e, _)| tracing::error!("Could not mark items in block: {e}")) +} + #[cfg(test)] mod tests { use consensus_engine::Block;