From 49729e1ef302437a8bfbdfd1552354f44109887e Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Thu, 17 Aug 2023 15:12:37 +0200 Subject: [PATCH] prevent concurrent `storeBlock` calls (fixes #5285) (#5295) When a block is introduced to the system both via REST and gossip at the same time, we will call `storeBlock` from two locations leading to a dupliace check race condition as we wait for the EL. This issue may manifest in particular when using an external block builder that itself publishes the block onto the gossip network. * refactor enqueue flow * simplify calling `addBlock` * complete request manager verifier future for blobless blocks * re-verify parent conditions before adding block among other things, it might have gone stale or finalized between one call and the other --- .../block_clearance.nim | 15 +++- .../gossip_processing/block_processor.nim | 78 ++++++++++--------- .../gossip_processing/eth2_processor.nim | 30 +++---- beacon_chain/nimbus_beacon_node.nim | 38 ++++----- beacon_chain/validators/message_router.nim | 20 +++-- tests/test_block_processor.nim | 16 ++-- 6 files changed, 111 insertions(+), 86 deletions(-) diff --git a/beacon_chain/consensus_object_pools/block_clearance.nim b/beacon_chain/consensus_object_pools/block_clearance.nim index 6d3a55f4b..e8b16d305 100644 --- a/beacon_chain/consensus_object_pools/block_clearance.nim +++ b/beacon_chain/consensus_object_pools/block_clearance.nim @@ -230,13 +230,24 @@ proc addHeadBlockWithParent*( ## Cryptographic checks can be skipped by adding skipBlsValidation to ## dag.updateFlags. ## - ## The parent must be obtained using `checkHeadBlock` to ensure complete - ## verification. + ## The parent should be obtained using `checkHeadBlock`. logScope: blockRoot = shortLog(signedBlock.root) blck = shortLog(signedBlock.message) signature = shortLog(signedBlock.signature) + block: + # We re-check parent pre-conditions here to avoid the case where the parent + # has become stale - it is possible that the dag has finalized the parent + # by the time we get here which will cause us to return early. + let checkedParent = ? checkHeadBlock(dag, signedBlock) + if checkedParent != parent: + # This should never happen: it would mean that the caller supplied a + # different parent than the block points to! + error "checkHeadBlock parent mismatch - this is a bug", + parent = shortLog(parent), checkedParent = shortLog(checkedParent) + return err(VerifierError.MissingParent) + template blck(): untyped = signedBlock.message # shortcuts without copy template blockRoot(): untyped = signedBlock.root diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index 387bb66da..a1178df0f 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -117,13 +117,6 @@ type completed notCompleted -proc addBlock*( - self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, - blobs: Opt[BlobSidecars], - resfut: Future[Result[void, VerifierError]] = nil, - maybeFinalized = false, - validationDur = Duration()) - # Initialization # ------------------------------------------------------------------------------ @@ -386,7 +379,32 @@ proc checkBloblessSignature(self: BlockProcessor, return err("checkBloblessSignature: Invalid proposer signature") ok() -proc storeBlock*( +proc enqueueBlock*( + self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, + blobs: Opt[BlobSidecars], + resfut: Future[Result[void, VerifierError]] = nil, + maybeFinalized = false, + validationDur = Duration()) = + withBlck(blck): + if blck.message.slot <= self.consensusManager.dag.finalizedHead.slot: + # let backfill blocks skip the queue - these are always "fast" to process + # because there are no state rewinds to deal with + let res = self.storeBackfillBlock(blck, blobs) + resfut.complete(res) + return + + try: + self.blockQueue.addLastNoWait(BlockEntry( + blck: blck, + blobs: blobs, + maybeFinalized: maybeFinalized, + resfut: resfut, queueTick: Moment.now(), + validationDur: validationDur, + src: src)) + except AsyncQueueFullError: + raiseAssert "unbounded queue" + +proc storeBlock( self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime, signedBlock: ForkySignedBeaconBlock, blobsOpt: Opt[BlobSidecars], @@ -444,6 +462,10 @@ proc storeBlock*( err((error, ProcessingStatus.completed)) let + # We have to be careful that there exists only one in-flight entry point + # for adding blocks or the checks performed in `checkHeadBlock` might + # be invalidated (ie a block could be added while we wait for EL response + # here) parent = dag.checkHeadBlock(signedBlock) if parent.isErr(): @@ -698,10 +720,12 @@ proc storeBlock*( withBlck(quarantined): when typeof(blck).toFork() < ConsensusFork.Deneb: - self[].addBlock(MsgSource.gossip, quarantined, Opt.none(BlobSidecars)) + self[].enqueueBlock( + MsgSource.gossip, quarantined, Opt.none(BlobSidecars)) else: if len(blck.message.body.blob_kzg_commitments) == 0: - self[].addBlock(MsgSource.gossip, quarantined, Opt.some(BlobSidecars @[])) + self[].enqueueBlock( + MsgSource.gossip, quarantined, Opt.some(BlobSidecars @[])) else: if (let res = checkBloblessSignature(self[], blck); res.isErr): warn "Failed to verify signature of unorphaned blobless block", @@ -710,7 +734,7 @@ proc storeBlock*( continue if self.blobQuarantine[].hasBlobs(blck): let blobs = self.blobQuarantine[].popBlobs(blck.root) - self[].addBlock(MsgSource.gossip, quarantined, Opt.some(blobs)) + self[].enqueueBlock(MsgSource.gossip, quarantined, Opt.some(blobs)) else: if not self.consensusManager.quarantine[].addBlobless( dag.finalizedHead.slot, blck): @@ -725,10 +749,8 @@ proc storeBlock*( proc addBlock*( self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, - blobs: Opt[BlobSidecars], - resfut: Future[Result[void, VerifierError]] = nil, - maybeFinalized = false, - validationDur = Duration()) = + blobs: Opt[BlobSidecars], maybeFinalized = false, + validationDur = Duration()): Future[Result[void, VerifierError]] = ## Enqueue a Gossip-validated block for consensus verification # Backpressure: # There is no backpressure here - producers must wait for `resfut` to @@ -737,26 +759,10 @@ proc addBlock*( # - Gossip (when synced) # - SyncManager (during sync) # - RequestManager (missing ancestor blocks) - - withBlck(blck): - if blck.message.slot <= self.consensusManager.dag.finalizedHead.slot: - # let backfill blocks skip the queue - these are always "fast" to process - # because there are no state rewinds to deal with - let res = self.storeBackfillBlock(blck, blobs) - if resfut != nil: - resfut.complete(res) - return - - try: - self.blockQueue.addLastNoWait(BlockEntry( - blck: blck, - blobs: blobs, - maybeFinalized: maybeFinalized, - resfut: resfut, queueTick: Moment.now(), - validationDur: validationDur, - src: src)) - except AsyncQueueFullError: - raiseAssert "unbounded queue" + # - API + let resfut = newFuture[Result[void, VerifierError]]("BlockProcessor.addBlock") + enqueueBlock(self, src, blck, blobs, resfut, maybeFinalized, validationDur) + resfut # Event Loop # ------------------------------------------------------------------------------ @@ -787,7 +793,7 @@ proc processBlock( # - MAY queue the block for later processing. # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.1/sync/optimistic.md#execution-engine-errors await sleepAsync(chronos.seconds(1)) - self[].addBlock( + self[].enqueueBlock( entry.src, entry.blck, entry.blobs, entry.resfut, entry.maybeFinalized, entry.validationDur) # To ensure backpressure on the sync manager, do not complete these futures. diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index 0ea2d6e25..039300b30 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -239,20 +239,22 @@ proc processSignedBeaconBlock*( # propagation of seemingly good blocks trace "Block validated" - var blobs = Opt.none(BlobSidecars) - when typeof(signedBlock).toFork() >= ConsensusFork.Deneb: - if self.blobQuarantine[].hasBlobs(signedBlock): - blobs = Opt.some(self.blobQuarantine[].popBlobs(signedBlock.root)) + let blobs = + when typeof(signedBlock).toFork() >= ConsensusFork.Deneb: + if self.blobQuarantine[].hasBlobs(signedBlock): + Opt.some(self.blobQuarantine[].popBlobs(signedBlock.root)) + else: + if not self.quarantine[].addBlobless(self.dag.finalizedHead.slot, + signedBlock): + notice "Block quarantine full (blobless)", + blockRoot = shortLog(signedBlock.root), + blck = shortLog(signedBlock.message), + signature = shortLog(signedBlock.signature) + return v else: - if not self.quarantine[].addBlobless(self.dag.finalizedHead.slot, - signedBlock): - notice "Block quarantine full (blobless)", - blockRoot = shortLog(signedBlock.root), - blck = shortLog(signedBlock.message), - signature = shortLog(signedBlock.signature) - return v + Opt.none(BlobSidecars) - self.blockProcessor[].addBlock( + self.blockProcessor[].enqueueBlock( src, ForkedSignedBeaconBlock.init(signedBlock), blobs, maybeFinalized = maybeFinalized, @@ -293,7 +295,7 @@ proc processSignedBlobSidecar*( debug "Blob received", delay let v = - self.dag.validateBlobSidecar(self.quarantine, self.blob_quarantine, + self.dag.validateBlobSidecar(self.quarantine, self.blobQuarantine, signedBlobSidecar, wallTime, idx) if v.isErr(): @@ -312,7 +314,7 @@ proc processSignedBlobSidecar*( let blobless = o.unsafeGet() if self.blobQuarantine[].hasBlobs(blobless): - self.blockProcessor[].addBlock( + self.blockProcessor[].enqueueBlock( MsgSource.gossip, ForkedSignedBeaconBlock.init(blobless), Opt.some(self.blobQuarantine[].popBlobs( diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index ec3532302..6d7336cc5 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -332,44 +332,40 @@ proc initFullNode( config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming, rng, taskpool, consensusManager, node.validatorMonitor, blobQuarantine, getBeaconTime) - blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, - blobs: Opt[BlobSidecars], maybeFinalized: bool): + blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, + blobs: Opt[BlobSidecars], maybeFinalized: bool): Future[Result[void, VerifierError]] = # The design with a callback for block verification is unusual compared # to the rest of the application, but fits with the general approach # taken in the sync/request managers - this is an architectural compromise # that should probably be reimagined more holistically in the future. - let resfut = newFuture[Result[void, VerifierError]]("blockVerifier") - blockProcessor[].addBlock(MsgSource.gossip, signedBlock, - blobs, - resfut, - maybeFinalized = maybeFinalized) - resfut + blockProcessor[].addBlock( + MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized) rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, - maybeFinalized: bool): + maybeFinalized: bool): Future[Result[void, VerifierError]] = - let resfut = newFuture[Result[void, VerifierError]]("rmanBlockVerifier") withBlck(signedBlock): when typeof(blck).toFork() >= ConsensusFork.Deneb: if not blobQuarantine[].hasBlobs(blck): # We don't have all the blobs for this block, so we have # to put it in blobless quarantine. if not quarantine[].addBlobless(dag.finalizedHead.slot, blck): - let e = Result[void, VerifierError].err(VerifierError.UnviableFork) - resfut.complete(e) - return - let blobs = blobQuarantine[].popBlobs(blck.root) - blockProcessor[].addBlock(MsgSource.gossip, signedBlock, - Opt.some(blobs), - resfut, - maybeFinalized = maybeFinalized) + Future.completed( + Result[void, VerifierError].err(VerifierError.UnviableFork), + "rmanBlockVerifier") + else: + Future.completed( + Result[void, VerifierError].err(VerifierError.MissingParent), + "rmanBlockVerifier") + else: + let blobs = blobQuarantine[].popBlobs(blck.root) + blockProcessor[].addBlock(MsgSource.gossip, signedBlock, + Opt.some(blobs), + maybeFinalized = maybeFinalized) else: blockProcessor[].addBlock(MsgSource.gossip, signedBlock, Opt.none(BlobSidecars), - resfut, maybeFinalized = maybeFinalized) - resfut - processor = Eth2Processor.new( config.doppelgangerDetection, diff --git a/beacon_chain/validators/message_router.nim b/beacon_chain/validators/message_router.nim index 993561f09..c7d926d7a 100644 --- a/beacon_chain/validators/message_router.nim +++ b/beacon_chain/validators/message_router.nim @@ -164,16 +164,16 @@ proc routeSignedBeaconBlock*( notice "Blob sent", blob = shortLog(signedBlobs[i]), error = res.error[] blobs = Opt.some(blobsOpt.get().mapIt(newClone(it.message))) - let newBlockRef = await router[].blockProcessor.storeBlock( - MsgSource.api, sendTime, blck, blobs) + let added = await router[].blockProcessor[].addBlock( + MsgSource.api, ForkedSignedBeaconBlock.init(blck), blobs) # The boolean we return tells the caller whether the block was integrated # into the chain - if newBlockRef.isErr(): - return if newBlockRef.error()[0] != VerifierError.Duplicate: + if added.isErr(): + return if added.error() != VerifierError.Duplicate: warn "Unable to add routed block to block pool", blockRoot = shortLog(blck.root), blck = shortLog(blck.message), - signature = shortLog(blck.signature), err = newBlockRef.error() + signature = shortLog(blck.signature), err = added.error() ok(Opt.none(BlockRef)) else: # If it's duplicate, there's an existing BlockRef to return. The block @@ -183,10 +183,16 @@ proc routeSignedBeaconBlock*( if blockRef.isErr: warn "Unable to add routed duplicate block to block pool", blockRoot = shortLog(blck.root), blck = shortLog(blck.message), - signature = shortLog(blck.signature), err = newBlockRef.error() + signature = shortLog(blck.signature), err = added.error() ok(blockRef) - return ok(Opt.some(newBlockRef.get())) + + let blockRef = router[].dag.getBlockRef(blck.root) + if blockRef.isErr: + warn "Block finalised while waiting for block processor", + blockRoot = shortLog(blck.root), blck = shortLog(blck.message), + signature = shortLog(blck.signature) + ok(blockRef) proc routeAttestation*( router: ref MessageRouter, attestation: Attestation, diff --git a/tests/test_block_processor.nim b/tests/test_block_processor.nim index 18f07f406..24ae1a677 100644 --- a/tests/test_block_processor.nim +++ b/tests/test_block_processor.nim @@ -59,11 +59,15 @@ suite "Block processor" & preset(): processor = BlockProcessor.new( false, "", "", rng, taskpool, consensusManager, validatorMonitor, blobQuarantine, getTimeFn) + processorFut = processor.runQueueProcessingLoop() asyncTest "Reverse order block add & get" & preset(): - let missing = await processor.storeBlock( - MsgSource.gossip, b2.message.slot.start_beacon_time(), b2, Opt.none(BlobSidecars)) - check: missing.error[0] == VerifierError.MissingParent + let + missing = await processor[].addBlock( + MsgSource.gossip, ForkedSignedBeaconBlock.init(b2), + Opt.none(BlobSidecars)) + + check: missing.error == VerifierError.MissingParent check: not dag.containsForkBlock(b2.root) # Unresolved, shouldn't show up @@ -71,8 +75,9 @@ suite "Block processor" & preset(): FetchRecord(root: b1.root) in quarantine[].checkMissing(32) let - status = await processor.storeBlock( - MsgSource.gossip, b2.message.slot.start_beacon_time(), b1, Opt.none(BlobSidecars)) + status = await processor[].addBlock( + MsgSource.gossip, ForkedSignedBeaconBlock.init(b1), + Opt.none(BlobSidecars)) b1Get = dag.getBlockRef(b1.root) check: @@ -81,7 +86,6 @@ suite "Block processor" & preset(): dag.containsForkBlock(b1.root) not dag.containsForkBlock(b2.root) # Async pipeline must still run - discard processor.runQueueProcessingLoop() while processor[].hasBlocks(): poll()