From 1cf777c64b7cb382d55c5745170fa2dff5c22d7d Mon Sep 17 00:00:00 2001 From: henridf Date: Fri, 19 May 2023 18:25:11 +0200 Subject: [PATCH] Fix sync for blocks older than MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS (#4977) When doing sync for blocks older than MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, we skip the blobs by range request, but we then pass en empty blob sequence to validation, which then fails. To fix this: Use an Option[Blobsidecars] to allow expressing the distinction between "empty blob sequence" and "blobs unavailable". Use the latter for "old" blocks, and don't attempt to run blob validation. --- .../gossip_processing/block_processor.nim | 79 ++++++++++--------- .../gossip_processing/eth2_processor.nim | 8 +- beacon_chain/nimbus_beacon_node.nim | 12 +-- beacon_chain/sync/sync_queue.nim | 10 ++- beacon_chain/validators/message_router.nim | 2 +- tests/test_block_processor.nim | 4 +- tests/test_sync_manager.nim | 2 +- 7 files changed, 62 insertions(+), 55 deletions(-) diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index 96b533b38..fc94e7b3d 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -60,7 +60,7 @@ type BlobSidecars* = seq[ref BlobSidecar] BlockEntry = object blck*: ForkedSignedBeaconBlock - blobs*: BlobSidecars + blobs*: Opt[BlobSidecars] maybeFinalized*: bool ## The block source claims the block has been finalized already resfut*: Future[Result[void, VerifierError]] @@ -126,7 +126,7 @@ type proc addBlock*( self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, - blobs: BlobSidecars, + blobs: Opt[BlobSidecars], resfut: Future[Result[void, VerifierError]] = nil, maybeFinalized = false, validationDur = Duration()) @@ -189,31 +189,30 @@ from ../consensus_object_pools/block_clearance import proc storeBackfillBlock( self: var BlockProcessor, signedBlock: ForkySignedBeaconBlock, - blobs: BlobSidecars): Result[void, VerifierError] = + blobsOpt: Opt[BlobSidecars]): Result[void, VerifierError] = # The block is certainly not missing any more self.consensusManager.quarantine[].missing.del(signedBlock.root) # Establish blob viability before calling addbackfillBlock to avoid # writing the block in case of blob error. - let blobsOk = - when typeof(signedBlock).toFork() >= ConsensusFork.Deneb: - let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq - if blobs.len > 0 or kzgCommits.len > 0: - let r = validate_blobs(kzgCommits, blobs.mapIt(it.blob), - blobs.mapIt(it.kzg_proof)) - if r.isErr(): - debug "backfill blob validation failed", - blockRoot = shortLog(signedBlock.root), - blobs = shortLog(blobs), - blck = shortLog(signedBlock.message), - signature = shortLog(signedBlock.signature), - msg = r.error() - r.isOk() - else: - true - else: - true + var blobsOk = true + when typeof(signedBlock).toFork() >= ConsensusFork.Deneb: + if blobsOpt.isSome: + let blobs = blobsOpt.get() + let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq + if blobs.len > 0 or kzgCommits.len > 0: + let r = validate_blobs(kzgCommits, blobs.mapIt(it.blob), + blobs.mapIt(it.kzg_proof)) + if r.isErr(): + debug "backfill blob validation failed", + blockRoot = shortLog(signedBlock.root), + blobs = shortLog(blobs), + blck = shortLog(signedBlock.message), + signature = shortLog(signedBlock.signature), + msg = r.error() + blobsOk = r.isOk() + if not blobsOk: return err(VerifierError.Invalid) @@ -236,6 +235,7 @@ proc storeBackfillBlock( return res # Only store blobs after successfully establishing block viability. + let blobs = blobsOpt.valueOr: BlobSidecars @[] for b in blobs: self.consensusManager.dag.db.putBlobSidecar(b[]) @@ -398,7 +398,7 @@ proc checkBloblessSignature(self: BlockProcessor, proc storeBlock*( self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime, signedBlock: ForkySignedBeaconBlock, - blobs: BlobSidecars, + blobsOpt: Opt[BlobSidecars], maybeFinalized = false, queueTick: Moment = Moment.now(), validationDur = Duration()): Future[Result[BlockRef, (VerifierError, ProcessingStatus)]] {.async.} = @@ -465,18 +465,20 @@ proc storeBlock*( # Establish blob viability before calling addHeadBlock to avoid # writing the block in case of blob error. when typeof(signedBlock).toFork() >= ConsensusFork.Deneb: - let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq - if blobs.len > 0 or kzgCommits.len > 0: - let r = validate_blobs(kzgCommits, blobs.mapIt(it.blob), - blobs.mapIt(it.kzg_proof)) - if r.isErr(): - debug "blob validation failed", - blockRoot = shortLog(signedBlock.root), - blobs = shortLog(blobs), - blck = shortLog(signedBlock.message), - signature = shortLog(signedBlock.signature), - msg = r.error() - return err((VerifierError.Invalid, ProcessingStatus.completed)) + if blobsOpt.isSome: + let blobs = blobsOpt.get() + let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq + if blobs.len > 0 or kzgCommits.len > 0: + let r = validate_blobs(kzgCommits, blobs.mapIt(it.blob), + blobs.mapIt(it.kzg_proof)) + if r.isErr(): + debug "blob validation failed", + blockRoot = shortLog(signedBlock.root), + blobs = shortLog(blobs), + blck = shortLog(signedBlock.message), + signature = shortLog(signedBlock.signature), + msg = r.error() + return err((VerifierError.Invalid, ProcessingStatus.completed)) type Trusted = typeof signedBlock.asTrusted() let blck = dag.addHeadBlock(self.verifier, signedBlock, payloadValid) do ( @@ -536,6 +538,7 @@ proc storeBlock*( self[].lastPayload = signedBlock.message.slot # write blobs now that block has been written. + let blobs = blobsOpt.valueOr: BlobSidecars @[] for b in blobs: self.consensusManager.dag.db.putBlobSidecar(b[]) @@ -669,10 +672,10 @@ proc storeBlock*( # Process the blocks that had the newly accepted block as parent withBlck(quarantined): when typeof(blck).toFork() < ConsensusFork.Deneb: - self[].addBlock(MsgSource.gossip, quarantined, BlobSidecars @[]) + self[].addBlock(MsgSource.gossip, quarantined, Opt.none(BlobSidecars)) else: if len(blck.message.body.blob_kzg_commitments) == 0: - self[].addBlock(MsgSource.gossip, quarantined, BlobSidecars @[]) + self[].addBlock(MsgSource.gossip, quarantined, Opt.some(BlobSidecars @[])) else: if (let res = checkBloblessSignature(self[], blck); res.isErr): warn "Failed to verify signature of unorphaned blobless block", @@ -681,7 +684,7 @@ proc storeBlock*( continue if self.blobQuarantine[].hasBlobs(blck): let blobs = self.blobQuarantine[].popBlobs(blck.root) - self[].addBlock(MsgSource.gossip, quarantined, blobs) + self[].addBlock(MsgSource.gossip, quarantined, Opt.some(blobs)) else: if not self.consensusManager.quarantine[].addBlobless( dag.finalizedHead.slot, blck): @@ -696,7 +699,7 @@ proc storeBlock*( proc addBlock*( self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, - blobs: BlobSidecars, + blobs: Opt[BlobSidecars], resfut: Future[Result[void, VerifierError]] = nil, maybeFinalized = false, validationDur = Duration()) = diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index abce67474..e3c7c0438 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -238,10 +238,10 @@ proc processSignedBeaconBlock*( # propagation of seemingly good blocks trace "Block validated" - var blobs: BlobSidecars + var blobs = Opt.none(BlobSidecars) when typeof(signedBlock).toFork() >= ConsensusFork.Deneb: if self.blobQuarantine[].hasBlobs(signedBlock): - blobs = self.blobQuarantine[].popBlobs(signedBlock.root) + blobs = Opt.some(self.blobQuarantine[].popBlobs(signedBlock.root)) else: if not self.quarantine[].addBlobless(self.dag.finalizedHead.slot, signedBlock): @@ -314,8 +314,8 @@ proc processSignedBlobSidecar*( self.blockProcessor[].addBlock( MsgSource.gossip, ForkedSignedBeaconBlock.init(blobless), - self.blobQuarantine[].popBlobs( - signedBlobSidecar.message.block_root) + Opt.some(self.blobQuarantine[].popBlobs( + signedBlobSidecar.message.block_root)) ) else: discard self.quarantine[].addBlobless(self.dag.finalizedHead.slot, diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 3d3c01d9f..03df39405 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -328,7 +328,7 @@ proc initFullNode( rng, taskpool, consensusManager, node.validatorMonitor, blobQuarantine, getBeaconTime) blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, - blobs: BlobSidecars, maybeFinalized: bool): + blobs: Opt[BlobSidecars], maybeFinalized: bool): Future[Result[void, VerifierError]] = # The design with a callback for block verification is unusual compared # to the rest of the application, but fits with the general approach @@ -340,7 +340,7 @@ proc initFullNode( resfut, maybeFinalized = maybeFinalized) resfut - rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, + rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool): Future[Result[void, VerifierError]] = let resfut = newFuture[Result[void, VerifierError]]("rmanBlockVerifier") @@ -355,12 +355,12 @@ proc initFullNode( return let blobs = blobQuarantine[].popBlobs(blck.root) blockProcessor[].addBlock(MsgSource.gossip, signedBlock, - blobs, + Opt.some(blobs), resfut, maybeFinalized = maybeFinalized) else: blockProcessor[].addBlock(MsgSource.gossip, signedBlock, - BlobSidecars @[], + Opt.none(BlobSidecars), resfut, maybeFinalized = maybeFinalized) resfut @@ -1375,8 +1375,8 @@ proc handleMissingBlobs(node: BeaconNode) = node.blockProcessor[].addBlock( MsgSource.gossip, ForkedSignedBeaconBlock.init(blobless), - node.blobQuarantine[].popBlobs( - blobless.root) + Opt.some(node.blobQuarantine[].popBlobs( + blobless.root)) ) node.quarantine[].removeBlobless(blobless) if fetches.len > 0: diff --git a/beacon_chain/sync/sync_queue.nim b/beacon_chain/sync/sync_queue.nim index 3db3066cd..553b11374 100644 --- a/beacon_chain/sync/sync_queue.nim +++ b/beacon_chain/sync/sync_queue.nim @@ -26,7 +26,7 @@ type GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].} ProcessingCallback* = proc() {.gcsafe, raises: [Defect].} BlockVerifier* = proc(signedBlock: ForkedSignedBeaconBlock, - blobs: BlobSidecars, maybeFinalized: bool): + blobs: Opt[BlobSidecars], maybeFinalized: bool): Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].} SyncQueueKind* {.pure.} = enum @@ -680,9 +680,13 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], var i=0 for blk in sq.blocks(item): if reqres.get().blobs.isNone(): - res = await sq.blockVerifier(blk[], BlobSidecars @[], maybeFinalized) + res = await sq.blockVerifier(blk[], + Opt.none(BlobSidecars), + maybeFinalized) else: - res = await sq.blockVerifier(blk[], reqres.get().blobs.get()[i], maybeFinalized) + res = await sq.blockVerifier(blk[], + Opt.some(reqres.get().blobs.get()[i]), + maybeFinalized) inc(i) if res.isOk(): diff --git a/beacon_chain/validators/message_router.nim b/beacon_chain/validators/message_router.nim index 67222fb20..1e37dac33 100644 --- a/beacon_chain/validators/message_router.nim +++ b/beacon_chain/validators/message_router.nim @@ -121,7 +121,7 @@ proc routeSignedBeaconBlock*( signature = shortLog(blck.signature), error = res.error() let newBlockRef = await router[].blockProcessor.storeBlock( - MsgSource.api, sendTime, blck, BlobSidecars @[]) + MsgSource.api, sendTime, blck, Opt.none(BlobSidecars)) # The boolean we return tells the caller whether the block was integrated # into the chain diff --git a/tests/test_block_processor.nim b/tests/test_block_processor.nim index 17ecec601..17151dac0 100644 --- a/tests/test_block_processor.nim +++ b/tests/test_block_processor.nim @@ -61,7 +61,7 @@ suite "Block processor" & preset(): asyncTest "Reverse order block add & get" & preset(): let missing = await processor.storeBlock( - MsgSource.gossip, b2.message.slot.start_beacon_time(), b2, BlobSidecars @[]) + MsgSource.gossip, b2.message.slot.start_beacon_time(), b2, Opt.none(BlobSidecars)) check: missing.error[0] == VerifierError.MissingParent check: @@ -71,7 +71,7 @@ suite "Block processor" & preset(): let status = await processor.storeBlock( - MsgSource.gossip, b2.message.slot.start_beacon_time(), b1, BlobSidecars @[]) + MsgSource.gossip, b2.message.slot.start_beacon_time(), b1, Opt.none(BlobSidecars)) b1Get = dag.getBlockRef(b1.root) check: diff --git a/tests/test_sync_manager.nim b/tests/test_sync_manager.nim index 71d507660..2285488bc 100644 --- a/tests/test_sync_manager.nim +++ b/tests/test_sync_manager.nim @@ -49,7 +49,7 @@ proc collector(queue: AsyncQueue[BlockEntry]): BlockVerifier = # in the async queue, similar to how BlockProcessor does it - as far as # testing goes, this is risky because it might introduce differences between # the BlockProcessor and this test - proc verify(signedBlock: ForkedSignedBeaconBlock, blobs: BlobSidecars, + proc verify(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], maybeFinalized: bool): Future[Result[void, VerifierError]] = let fut = newFuture[Result[void, VerifierError]]()