From 021de18e066bdbc250137ef5a19f3a293d53dce2 Mon Sep 17 00:00:00 2001 From: henridf Date: Thu, 13 Apr 2023 21:11:40 +0200 Subject: [PATCH] Quarantine and reassembly of gossiped blobs and blocks (#4808) --- .../blob_quarantine.nim | 70 ++++++++++++ .../block_quarantine.nim | 105 ++++++++++++++---- .../gossip_processing/block_processor.nim | 24 +++- .../gossip_processing/eth2_processor.nim | 73 +++++++++--- beacon_chain/nimbus_beacon_node.nim | 10 +- tests/test_block_processor.nim | 7 +- tests/test_block_quarantine.nim | 20 ++++ 7 files changed, 267 insertions(+), 42 deletions(-) create mode 100644 beacon_chain/consensus_object_pools/blob_quarantine.nim diff --git a/beacon_chain/consensus_object_pools/blob_quarantine.nim b/beacon_chain/consensus_object_pools/blob_quarantine.nim new file mode 100644 index 000000000..f288e873d --- /dev/null +++ b/beacon_chain/consensus_object_pools/blob_quarantine.nim @@ -0,0 +1,70 @@ +# beacon_chain +# Copyright (c) 2018-2023 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import + std/tables, + ../spec/datatypes/deneb + + +const + MaxBlobs = SLOTS_PER_EPOCH * MAX_BLOBS_PER_BLOCK + + +type + BlobQuarantine* = object + blobs*: Table[(Eth2Digest, BlobIndex), ref BlobSidecar] + + +func put*(quarantine: var BlobQuarantine, blobSidecar: ref BlobSidecar) = + if quarantine.blobs.lenu64 > MaxBlobs: + return + discard quarantine.blobs.hasKeyOrPut((blobSidecar.block_root, + blobSidecar.index), blobSidecar) + +func blobIndices*(quarantine: BlobQuarantine, digest: Eth2Digest): + seq[BlobIndex] = + var r: seq[BlobIndex] = @[] + for i in 0..MAX_BLOBS_PER_BLOCK-1: + if quarantine.blobs.hasKey((digest, i)): + r.add(i) + r + +func hasBlob*(quarantine: BlobQuarantine, blobSidecar: BlobSidecar) : bool = + quarantine.blobs.hasKey((blobSidecar.block_root, blobSidecar.index)) + +func popBlobs*(quarantine: var BlobQuarantine, digest: Eth2Digest): + seq[ref BlobSidecar] = + var r: seq[ref BlobSidecar] = @[] + for i in 0..MAX_BLOBS_PER_BLOCK-1: + var b: ref BlobSidecar + if quarantine.blobs.pop((digest, i), b): + r.add(b) + r + +func peekBlobs*(quarantine: var BlobQuarantine, digest: Eth2Digest): + seq[ref BlobSidecar] = + var r: seq[ref BlobSidecar] = @[] + for i in 0..MAX_BLOBS_PER_BLOCK-1: + quarantine.blobs.withValue((digest, i), value): + r.add(value[]) + r + +func removeBlobs*(quarantine: var BlobQuarantine, digest: Eth2Digest) = + for i in 0..MAX_BLOBS_PER_BLOCK-1: + quarantine.blobs.del((digest, i)) + +func hasBlobs*(quarantine: BlobQuarantine, blck: deneb.SignedBeaconBlock): + bool = + let idxs = quarantine.blobIndices(blck.root) + if len(blck.message.body.blob_kzg_commitments) != len(idxs): + return false + for i in 0..len(idxs): + if idxs[i] != uint64(i): + return false + true diff --git a/beacon_chain/consensus_object_pools/block_quarantine.nim b/beacon_chain/consensus_object_pools/block_quarantine.nim index 3b4197994..723dabe54 100644 --- a/beacon_chain/consensus_object_pools/block_quarantine.nim +++ b/beacon_chain/consensus_object_pools/block_quarantine.nim @@ -19,6 +19,8 @@ const ## Arbitrary MaxOrphans = SLOTS_PER_EPOCH * 3 ## Enough for finalization in an alternative fork + MaxBlobless = SLOTS_PER_EPOCH + ## Arbitrary MaxUnviables = 16 * 1024 ## About a day of blocks - most likely not needed but it's quite cheap.. @@ -39,10 +41,19 @@ type ## Trivially invalid blocks may be dropped before reaching this stage. orphans*: Table[(Eth2Digest, ValidatorSig), ForkedSignedBeaconBlock] - ## Blocks that we don't have a parent for - when we resolve the parent, we - ## can proceed to resolving the block as well - we index this by root and - ## signature such that a block with invalid signature won't cause a block - ## with a valid signature to be dropped + ## Blocks that we don't have a parent for - when we resolve the + ## parent, we can proceed to resolving the block as well - we + ## index this by root and signature such that a block with + ## invalid signature won't cause a block with a valid signature + ## to be dropped. An orphan block may also be "blobless" (see + ## below) - if so, upon resolving the parent, it should be + ## stored in the blobless table. + + blobless*: Table[(Eth2Digest, ValidatorSig), deneb.SignedBeaconBlock] + ## Blocks that we don't have blobs for. When we have received + ## all blobs for this block, we can proceed to resolving the + ## block as well. A blobless block inserted into this table must + ## have a resolved parent (i.e., it is not an orphan). unviable*: OrderedTable[Eth2Digest, tuple[]] ## Unviable blocks are those that come from a history that does not @@ -115,12 +126,14 @@ func removeOrphan*( quarantine: var Quarantine, signedBlock: ForkySignedBeaconBlock) = quarantine.orphans.del((signedBlock.root, signedBlock.signature)) -func isViableOrphan( - finalizedSlot: Slot, signedBlock: ForkedSignedBeaconBlock): bool = +func removeBlobless*( + quarantine: var Quarantine, signedBlock: ForkySignedBeaconBlock) = + quarantine.blobless.del((signedBlock.root, signedBlock.signature)) + +func isViable( + finalizedSlot: Slot, slot: Slot): bool = # The orphan must be newer than the finalization point so that its parent # either is the finalized block or more recent - let - slot = getForkedBlockField(signedBlock, slot) slot > finalizedSlot func cleanupUnviable(quarantine: var Quarantine) = @@ -131,47 +144,76 @@ func cleanupUnviable(quarantine: var Quarantine) = break # Cannot modify while for-looping quarantine.unviable.del(toDel) -func addUnviable*(quarantine: var Quarantine, root: Eth2Digest) = - if root in quarantine.unviable: - return - - quarantine.cleanupUnviable() - +func removeUnviableTree[T](quarantine: var Quarantine, + toCheck: var seq[Eth2Digest], + tbl: var Table[(Eth2Digest, ValidatorSig), + T]): seq[Eth2Digest] = # Remove the tree of orphans whose ancestor is unviable - they are now also # unviable! This helps avoiding junk in the quarantine, because we don't keep # unviable parents in the DAG and there's no way to tell an orphan from an # unviable block without the parent. var toRemove: seq[(Eth2Digest, ValidatorSig)] # Can't modify while iterating - toCheck = @[root] + checked: seq[Eth2Digest] while toCheck.len > 0: let root = toCheck.pop() - for k, v in quarantine.orphans.mpairs(): - if getForkedBlockField(v, parent_root) == root: + if root notin checked: + checked.add(root) + for k, v in tbl.mpairs(): + when T is ForkedSignedBeaconBlock: + let blockRoot = getForkedBlockField(v, parent_root) + elif T is ForkySignedBeaconBlock: + let blockRoot = v.message.parent_root + else: + static: doAssert false + + if blockRoot == root: toCheck.add(k[0]) toRemove.add(k) elif k[0] == root: toRemove.add(k) for k in toRemove: - quarantine.orphans.del k + tbl.del k quarantine.unviable[k[0]] = () toRemove.setLen(0) + checked + +func addUnviable*(quarantine: var Quarantine, root: Eth2Digest) = + if root in quarantine.unviable: + return + + quarantine.cleanupUnviable() + var toCheck = @[root] + var checked = quarantine.removeUnviableTree(toCheck, quarantine.orphans) + discard quarantine.removeUnviableTree(checked, quarantine.blobless) + quarantine.unviable[root] = () func cleanupOrphans(quarantine: var Quarantine, finalizedSlot: Slot) = var toDel: seq[(Eth2Digest, ValidatorSig)] for k, v in quarantine.orphans: - if not isViableOrphan(finalizedSlot, v): + if not isViable(finalizedSlot, getForkedBlockField(v, slot)): toDel.add k for k in toDel: quarantine.addUnviable k[0] quarantine.orphans.del k +func cleanupBlobless(quarantine: var Quarantine, finalizedSlot: Slot) = + var toDel: seq[(Eth2Digest, ValidatorSig)] + + for k, v in quarantine.blobless: + if not isViable(finalizedSlot, v.message.slot): + toDel.add k + + for k in toDel: + quarantine.addUnviable k[0] + quarantine.blobless.del k + func clearAfterReorg*(quarantine: var Quarantine) = ## Clear missing and orphans to start with a fresh slate in case of a reorg ## Unviables remain unviable and are not cleared. @@ -193,7 +235,7 @@ func addOrphan*( quarantine: var Quarantine, finalizedSlot: Slot, signedBlock: ForkedSignedBeaconBlock): bool = ## Adds block to quarantine's `orphans` and `missing` lists. - if not isViableOrphan(finalizedSlot, signedBlock): + if not isViable(finalizedSlot, getForkedBlockField(signedBlock, slot)): quarantine.addUnviable(signedBlock.root) return false @@ -230,3 +272,26 @@ iterator pop*(quarantine: var Quarantine, root: Eth2Digest): if getForkedBlockField(v, parent_root) == root: toRemove.add(k) yield v + +proc addBlobless*( + quarantine: var Quarantine, finalizedSlot: Slot, + signedBlock: deneb.SignedBeaconBlock): bool = + + if not isViable(finalizedSlot, signedBlock.message.slot): + quarantine.addUnviable(signedBlock.root) + return false + + quarantine.cleanupBlobless(finalizedSlot) + + if quarantine.blobless.lenu64 >= MaxBlobless: + return false + + quarantine.blobless[(signedBlock.root, signedBlock.signature)] = signedBlock + quarantine.missing.del(signedBlock.root) + true + +iterator peekBlobless*(quarantine: var Quarantine, root: Eth2Digest): + deneb.SignedBeaconBlock = + for k, v in quarantine.blobless.mpairs(): + if k[0] == root: + yield v diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index e5c684afa..d5260e847 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -22,7 +22,9 @@ from ../consensus_object_pools/block_dag import BlockRef, root, shortLog, slot from ../consensus_object_pools/block_pools_types import EpochRef, VerifierError from ../consensus_object_pools/block_quarantine import - addOrphan, addUnviable, pop, removeOrphan + addBlobless, addOrphan, addUnviable, pop, removeOrphan +from ../consensus_object_pools/blob_quarantine import + BlobQuarantine, hasBlobs, popBlobs from ../validators/validator_monitor import MsgSource, ValidatorMonitor, registerAttestationInBlock, registerBeaconBlock, registerSyncAggregateInBlock @@ -91,6 +93,7 @@ type validatorMonitor: ref ValidatorMonitor getBeaconTime: GetBeaconTimeFn + blobQuarantine: ref BlobQuarantine verifier: BatchVerifier lastPayload: Slot @@ -123,6 +126,7 @@ proc new*(T: type BlockProcessor, rng: ref HmacDrbgContext, taskpool: TaskPoolPtr, consensusManager: ref ConsensusManager, validatorMonitor: ref ValidatorMonitor, + blobQuarantine: ref BlobQuarantine, getBeaconTime: GetBeaconTimeFn): ref BlockProcessor = (ref BlockProcessor)( dumpEnabled: dumpEnabled, @@ -131,6 +135,7 @@ proc new*(T: type BlockProcessor, blockQueue: newAsyncQueue[BlockEntry](), consensusManager: consensusManager, validatorMonitor: validatorMonitor, + blobQuarantine: blobQuarantine, getBeaconTime: getBeaconTime, verifier: BatchVerifier(rng: rng, taskpool: taskpool) ) @@ -564,7 +569,22 @@ proc storeBlock*( for quarantined in self.consensusManager.quarantine[].pop(blck.get().root): # Process the blocks that had the newly accepted block as parent - self[].addBlock(MsgSource.gossip, quarantined, BlobSidecars @[]) + withBlck(quarantined): + when typeof(blck).toFork() < ConsensusFork.Deneb: + self[].addBlock(MsgSource.gossip, quarantined, BlobSidecars @[]) + else: + if len(blck.message.body.blob_kzg_commitments) == 0: + self[].addBlock(MsgSource.gossip, quarantined, BlobSidecars @[]) + else: + if self.blobQuarantine[].hasBlobs(blck): + let blobs = self.blobQuarantine[].popBlobs(blck.root) + self[].addBlock(MsgSource.gossip, quarantined, blobs) + else: + if not self.consensusManager.quarantine[].addBlobless( + dag.finalizedHead.slot, blck): + notice "Block quarantine full (blobless)", + blockRoot = shortLog(quarantined.root), + signature = shortLog(quarantined.signature) return Result[BlockRef, (VerifierError, ProcessingStatus)].ok blck.get diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index 19f756551..32f400317 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -14,8 +14,8 @@ import ../spec/[helpers, forks], ../spec/datatypes/[altair, phase0, deneb], ../consensus_object_pools/[ - block_clearance, block_quarantine, blockchain_dag, exit_pool, attestation_pool, - light_client_pool, sync_committee_msg_pool], + blob_quarantine, block_clearance, block_quarantine, blockchain_dag, + exit_pool, attestation_pool, light_client_pool, sync_committee_msg_pool], ../validators/validator_pool, ../beacon_clock, "."/[gossip_validation, block_processor, batch_validation], @@ -145,6 +145,8 @@ type # ---------------------------------------------------------------- quarantine*: ref Quarantine + blobQuarantine*: ref BlobQuarantine + # Application-provided current time provider (to facilitate testing) getCurrentBeaconTime*: GetBeaconTimeFn @@ -167,6 +169,7 @@ proc new*(T: type Eth2Processor, syncCommitteeMsgPool: ref SyncCommitteeMsgPool, lightClientPool: ref LightClientPool, quarantine: ref Quarantine, + blobQuarantine: ref BlobQuarantine, rng: ref HmacDrbgContext, getBeaconTime: GetBeaconTimeFn, taskpool: TaskPoolPtr @@ -184,6 +187,7 @@ proc new*(T: type Eth2Processor, syncCommitteeMsgPool: syncCommitteeMsgPool, lightClientPool: lightClientPool, quarantine: quarantine, + blobQuarantine: blobQuarantine, getCurrentBeaconTime: getBeaconTime, batchCrypto: BatchCrypto.new( rng = rng, @@ -234,9 +238,21 @@ proc processSignedBeaconBlock*( # propagation of seemingly good blocks trace "Block validated" + var blobs: BlobSidecars + when typeof(signedBlock).toFork() >= ConsensusFork.Deneb: + if self.blobQuarantine[].hasBlobs(signedBlock): + blobs = self.blobQuarantine[].popBlobs(signedBlock.root) + else: + if not self.quarantine[].addBlobless(self.dag.finalizedHead.slot, + signedBlock): + notice "Block quarantine full (blobless)", + blockRoot = shortLog(signedBlock.root), + blck = shortLog(signedBlock.message), + signature = shortLog(signedBlock.signature) + self.blockProcessor[].addBlock( src, ForkedSignedBeaconBlock.init(signedBlock), - BlobSidecars @[], + blobs, maybeFinalized = maybeFinalized, validationDur = nanoseconds( (self.getCurrentBeaconTime() - wallTime).nanoseconds)) @@ -268,23 +284,50 @@ proc processSignedBlobSidecar*( # Potential under/overflows are fine; would just create odd metrics and logs let delay = wallTime - signedBlobSidecar.message.slot.start_beacon_time - debug "Blob received", delay + if self.blobQuarantine[].hasBlob(signedBlobSidecar.message): + debug "Blob received, already in quarantine", delay + return ValidationRes.ok + else: + debug "Blob received", delay let v = - self.dag.validateBlobSidecar(self.quarantine, signedBlobSidecar, wallTime, idx) + self.dag.validateBlobSidecar(self.quarantine, + signedBlobSidecar, wallTime, idx) - if v.isOk(): - trace "Blob validated" - - # TODO - # hand blob off to blob quarantine - - blob_sidecars_received.inc() - blob_sidecar_delay.observe(delay.toFloatSeconds()) - else: + if v.isErr(): debug "Dropping blob", error = v.error() - blob_sidecars_dropped.inc(1, [$v.error[0]]) + return v + + debug "Blob validated, putting in blob quarantine" + self.blobQuarantine[].put(newClone(signedBlobSidecar.message)) + var toAdd: seq[deneb.SignedBeaconBlock] + + var skippedBlocks = false + + for blobless in self.quarantine[].peekBlobless( + signedBlobSidecar.message.block_root): + if self.blobQuarantine[].hasBlobs(blobless): + toAdd.add(blobless) + else: + skippedBlocks = true + + if len(toAdd) > 0: + let blobs = self.blobQuarantine[].peekBlobs( + signedBlobSidecar.message.block_root) + for blobless in toAdd: + self.blockProcessor[].addBlock( + MsgSource.gossip, + ForkedSignedBeaconBlock.init(blobless), blobs) + self.quarantine[].removeBlobless(blobless) + + if not skippedBlocks: + # no blobless blocks remain in quarantine that need these blobs, + # so we can remove them. + self.blobQuarantine[].removeBlobs(toAdd[0].root) + + blob_sidecars_received.inc() + blob_sidecar_delay.observe(delay.toFloatSeconds()) v diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 88d9cdb84..8ad37a949 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -14,6 +14,7 @@ import stew/[byteutils, io2], eth/p2p/discoveryv5/[enr, random2], eth/keys, + ./consensus_object_pools/blob_quarantine, ./consensus_object_pools/vanity_logs/vanity_logs, ./networking/topic_params, ./rpc/[rest_api, state_ttl_cache], @@ -319,6 +320,7 @@ proc initFullNode( LightClientPool()) validatorChangePool = newClone( ValidatorChangePool.init(dag, attestationPool, onVoluntaryExitAdded)) + blobQuarantine = newClone(BlobQuarantine()) consensusManager = ConsensusManager.new( dag, attestationPool, quarantine, node.elManager, ActionTracker.init(rng, config.subscribeAllSubnets), @@ -326,7 +328,8 @@ proc initFullNode( config.defaultFeeRecipient, config.suggestedGasLimit) blockProcessor = BlockProcessor.new( config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming, - rng, taskpool, consensusManager, node.validatorMonitor, getBeaconTime) + rng, taskpool, consensusManager, node.validatorMonitor, + blobQuarantine, getBeaconTime) blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool): Future[Result[void, VerifierError]] = @@ -356,7 +359,7 @@ proc initFullNode( config.doppelgangerDetection, blockProcessor, node.validatorMonitor, dag, attestationPool, validatorChangePool, node.attachedValidators, syncCommitteeMsgPool, - lightClientPool, quarantine, rng, getBeaconTime, taskpool) + lightClientPool, quarantine, blobQuarantine, rng, getBeaconTime, taskpool) syncManager = newSyncManager[Peer, PeerId]( node.network.peerPool, dag.cfg.DENEB_FORK_EPOCH, SyncQueueKind.Forward, getLocalHeadSlot, getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, @@ -1348,6 +1351,9 @@ proc onSecond(node: BeaconNode, time: Moment) = notice "Shutting down after having reached the target synced epoch" bnStatus = BeaconNodeStatus.Stopping +# TODO +# onSecond timer to handle missing blobs, similar to above for blocks + proc runOnSecondLoop(node: BeaconNode) {.async.} = const sleepTime = chronos.seconds(1) diff --git a/tests/test_block_processor.nim b/tests/test_block_processor.nim index 7f7cb645f..39064e327 100644 --- a/tests/test_block_processor.nim +++ b/tests/test_block_processor.nim @@ -17,8 +17,8 @@ import ../beacon_chain/spec/datatypes/deneb, ../beacon_chain/gossip_processing/block_processor, ../beacon_chain/consensus_object_pools/[ - attestation_pool, blockchain_dag, block_quarantine, block_clearance, - consensus_manager], + attestation_pool, blockchain_dag, blob_quarantine, block_quarantine, + block_clearance, consensus_manager], ../beacon_chain/eth1/eth1_monitor, ./testutil, ./testdbutil, ./testblockutil @@ -41,6 +41,7 @@ suite "Block processor" & preset(): taskpool = Taskpool.new() verifier = BatchVerifier(rng: keys.newRng(), taskpool: taskpool) quarantine = newClone(Quarantine.init()) + blobQuarantine = newClone(BlobQuarantine()) attestationPool = newClone(AttestationPool.init(dag, quarantine)) elManager = new ELManager # TODO: initialise this properly actionTracker: ActionTracker @@ -56,7 +57,7 @@ suite "Block processor" & preset(): getTimeFn = proc(): BeaconTime = b2.message.slot.start_beacon_time() processor = BlockProcessor.new( false, "", "", keys.newRng(), taskpool, consensusManager, - validatorMonitor, getTimeFn) + validatorMonitor, blobQuarantine, getTimeFn) asyncTest "Reverse order block add & get" & preset(): let missing = await processor.storeBlock( diff --git a/tests/test_block_quarantine.nim b/tests/test_block_quarantine.nim index c4dad3285..948e0225c 100644 --- a/tests/test_block_quarantine.nim +++ b/tests/test_block_quarantine.nim @@ -20,6 +20,13 @@ func makeBlock(slot: Slot, parent: Eth2Digest): ForkedSignedBeaconBlock = b.root = hash_tree_root(b.message) ForkedSignedBeaconBlock.init(b) +func makeBlobbyBlock(slot: Slot, parent: Eth2Digest): deneb.SignedBeaconBlock = + var + b = deneb.SignedBeaconBlock( + message: deneb.BeaconBlock(slot: slot, parent_root: parent)) + b.root = hash_tree_root(b.message) + b + suite "Block quarantine": test "Unviable smoke test": let @@ -28,6 +35,8 @@ suite "Block quarantine": b2 = makeBlock(Slot 2, b1.root) b3 = makeBlock(Slot 3, b2.root) b4 = makeBlock(Slot 4, b2.root) + b5 = makeBlobbyBlock(Slot 4, b3.root) + b6 = makeBlobbyBlock(Slot 4, b4.root) var quarantine: Quarantine @@ -43,16 +52,27 @@ suite "Block quarantine": quarantine.addOrphan(Slot 0, b3) quarantine.addOrphan(Slot 0, b4) + quarantine.addBlobless(Slot 0, b5) + quarantine.addBlobless(Slot 0, b6) + (b4.root, ValidatorSig()) in quarantine.orphans + (b5.root, ValidatorSig()) in quarantine.blobless + (b6.root, ValidatorSig()) in quarantine.blobless quarantine.addUnviable(b4.root) check: (b4.root, ValidatorSig()) notin quarantine.orphans + (b5.root, ValidatorSig()) in quarantine.blobless + (b6.root, ValidatorSig()) notin quarantine.blobless + quarantine.addUnviable(b1.root) check: (b1.root, ValidatorSig()) notin quarantine.orphans (b2.root, ValidatorSig()) notin quarantine.orphans (b3.root, ValidatorSig()) notin quarantine.orphans + + (b5.root, ValidatorSig()) notin quarantine.blobless + (b6.root, ValidatorSig()) notin quarantine.blobless