From 8b3ffec0d530f3bfaa83198de098cb199f81b921 Mon Sep 17 00:00:00 2001 From: tersec Date: Mon, 17 Apr 2023 19:36:15 +0000 Subject: [PATCH] include small dedup in block processor to handle blockByRoot blocks (#4814) --- .../gossip_processing/block_processor.nim | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index d0572a5bb..8434cc31d 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -13,6 +13,7 @@ import ../spec/[signatures, signatures_batch], ../sszdump +from std/deques import Deque, addLast, contains, initDeque, items, len, shrink from ../consensus_object_pools/consensus_manager import ConsensusManager, checkNextProposer, optimisticExecutionPayloadHash, runProposalForkchoiceUpdated, shouldSyncOptimistically, updateHead, @@ -46,6 +47,10 @@ const ## syncing the finalized part of the chain PAYLOAD_PRE_WALL_SLOTS = SLOTS_PER_EPOCH * 2 ## Number of slots from wall time that we start processing every payload + MAX_DEDUP_QUEUE_LEN = 16 + ## Number of blocks, with FIFO discipline, against which to check queued + ## blocks before being processed to avoid spamming ELs. This should stay + ## small enough that even O(n) algorithms are reasonable. type BlobSidecars* = seq[ref BlobSidecar] @@ -102,6 +107,9 @@ type ## The slot at which we sent a payload to the execution client the last ## time + dupBlckBuf: Deque[(Eth2Digest, ValidatorSig)] + # Small buffer to allow for filtering of duplicate blocks in block queue + NewPayloadStatus {.pure.} = enum valid notValid @@ -139,7 +147,9 @@ proc new*(T: type BlockProcessor, validatorMonitor: validatorMonitor, blobQuarantine: blobQuarantine, getBeaconTime: getBeaconTime, - verifier: BatchVerifier(rng: rng, taskpool: taskpool) + verifier: BatchVerifier(rng: rng, taskpool: taskpool), + dupBlckBuf: initDeque[(Eth2Digest, ValidatorSig)]( + initialSize = MAX_DEDUP_QUEUE_LEN) ) # Sync callbacks @@ -679,6 +689,19 @@ proc addBlock*( except AsyncQueueFullError: raiseAssert "unbounded queue" +# Dedup +# ------------------------------------------------------------------------------ + +func checkDuplicateBlocks(self: ref BlockProcessor, entry: BlockEntry): bool = + let key = (entry.blck.root, entry.blck.signature) + if self.dupBlckBuf.contains key: + return true + doAssert self.dupBlckBuf.len <= MAX_DEDUP_QUEUE_LEN + if self.dupBlckBuf.len >= MAX_DEDUP_QUEUE_LEN: + self.dupBlckBuf.shrink(fromFirst = 1) + self.dupBlckBuf.addLast key + false + # Event Loop # ------------------------------------------------------------------------------ @@ -695,6 +718,9 @@ proc processBlock( error "Processing block before genesis, clock turned back?" quit 1 + if self.checkDuplicateBlocks(entry): + return + let res = withBlck(entry.blck): await self.storeBlock( entry.src, wallTime, blck, entry.blobs, entry.maybeFinalized,