From 6f466894abf0e0ef44fe2195e247ff27e1479357 Mon Sep 17 00:00:00 2001 From: Etan Kissling Date: Thu, 21 Mar 2024 18:37:31 +0100 Subject: [PATCH] answer `RequestManager` queries from disk if possible (#6109) When restarting beacon node, orphaned blocks remain in the database but on startup, only the canonical chain as selected by fork choice loads. When a new block is discovered that builds on top of an orphaned block, the orphaned block is re-downloaded using sync/request manager, despite it already being present on disk. Such queries can be answered locally to improve discovery speed of alternate forks. --- .../block_quarantine.nim | 9 +- .../gossip_processing/eth2_processor.nim | 20 +-- beacon_chain/nimbus_beacon_node.nim | 13 +- beacon_chain/sync/request_manager.nim | 144 ++++++++++++++---- 4 files changed, 141 insertions(+), 45 deletions(-) diff --git a/beacon_chain/consensus_object_pools/block_quarantine.nim b/beacon_chain/consensus_object_pools/block_quarantine.nim index d6da6d24b..cf7efb87e 100644 --- a/beacon_chain/consensus_object_pools/block_quarantine.nim +++ b/beacon_chain/consensus_object_pools/block_quarantine.nim @@ -310,13 +310,14 @@ proc addBlobless*( quarantine.missing.del(signedBlock.root) true -func popBlobless*(quarantine: var Quarantine, root: Eth2Digest): - Opt[deneb.SignedBeaconBlock] = +func popBlobless*( + quarantine: var Quarantine, + root: Eth2Digest): Opt[ForkedSignedBeaconBlock] = var blck: deneb.SignedBeaconBlock if quarantine.blobless.pop(root, blck): - Opt.some(blck) + Opt.some(ForkedSignedBeaconBlock.init(blck)) else: - Opt.none(deneb.SignedBeaconBlock) + Opt.none(ForkedSignedBeaconBlock) iterator peekBlobless*(quarantine: var Quarantine): deneb.SignedBeaconBlock = for k, v in quarantine.blobless.mpairs(): diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index e83932113..c2bfafb05 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -303,15 +303,17 @@ proc processBlobSidecar*( let block_root = hash_tree_root(block_header) if (let o = self.quarantine[].popBlobless(block_root); o.isSome): let blobless = o.unsafeGet() - - if self.blobQuarantine[].hasBlobs(blobless): - self.blockProcessor[].enqueueBlock( - MsgSource.gossip, - ForkedSignedBeaconBlock.init(blobless), - Opt.some(self.blobQuarantine[].popBlobs(block_root, blobless))) - else: - discard self.quarantine[].addBlobless(self.dag.finalizedHead.slot, - blobless) + withBlck(blobless): + when consensusFork >= ConsensusFork.Deneb: + if self.blobQuarantine[].hasBlobs(forkyBlck): + self.blockProcessor[].enqueueBlock( + MsgSource.gossip, blobless, + Opt.some(self.blobQuarantine[].popBlobs(block_root, forkyBlck))) + else: + discard self.quarantine[].addBlobless( + self.dag.finalizedHead.slot, forkyBlck) + else: + raiseAssert "Could not have been added as blobless" blob_sidecars_received.inc() blob_sidecar_delay.observe(delay.toFloatSeconds()) diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 478bbda3c..75aaea536 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -419,6 +419,16 @@ proc initFullNode( await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, Opt.none(BlobSidecars), maybeFinalized = maybeFinalized) + rmanBlockLoader = proc( + blockRoot: Eth2Digest): Opt[ForkedTrustedSignedBeaconBlock] = + dag.getForkedBlock(blockRoot) + rmanBlobLoader = proc( + blobId: BlobIdentifier): Opt[ref BlobSidecar] = + var blob_sidecar = BlobSidecar.new() + if dag.db.getBlobSidecar(blobId.block_root, blobId.index, blob_sidecar[]): + Opt.some blob_sidecar + else: + Opt.none(ref BlobSidecar) processor = Eth2Processor.new( config.doppelgangerDetection, @@ -444,7 +454,8 @@ proc initFullNode( requestManager = RequestManager.init( node.network, dag.cfg.DENEB_FORK_EPOCH, getBeaconTime, (proc(): bool = syncManager.inProgress), - quarantine, blobQuarantine, rmanBlockVerifier) + quarantine, blobQuarantine, rmanBlockVerifier, + rmanBlockLoader, rmanBlobLoader) if node.config.lightClientDataServe: proc scheduleSendingLightClientUpdates(slot: Slot) = diff --git a/beacon_chain/sync/request_manager.nim b/beacon_chain/sync/request_manager.nim index 420d195c4..1a4cb83eb 100644 --- a/beacon_chain/sync/request_manager.nim +++ b/beacon_chain/sync/request_manager.nim @@ -37,10 +37,19 @@ const POLL_INTERVAL = 1.seconds type - BlockVerifierFn* = - proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool): - Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} - InhibitFn* = proc: bool {.gcsafe, raises:[].} + BlockVerifierFn* = proc( + signedBlock: ForkedSignedBeaconBlock, + maybeFinalized: bool + ): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} + + BlockLoaderFn* = proc( + blockRoot: Eth2Digest + ): Opt[ForkedTrustedSignedBeaconBlock] {.gcsafe, raises: [].} + + BlobLoaderFn* = proc( + blobId: BlobIdentifier): Opt[ref BlobSidecar] {.gcsafe, raises: [].} + + InhibitFn* = proc: bool {.gcsafe, raises: [].} RequestManager* = object network*: Eth2Node @@ -49,6 +58,8 @@ type quarantine: ref Quarantine blobQuarantine: ref BlobQuarantine blockVerifier: BlockVerifierFn + blockLoader: BlockLoaderFn + blobLoader: BlobLoaderFn blockLoopFuture: Future[void].Raising([CancelledError]) blobLoopFuture: Future[void].Raising([CancelledError]) @@ -64,7 +75,9 @@ proc init*(T: type RequestManager, network: Eth2Node, inhibit: InhibitFn, quarantine: ref Quarantine, blobQuarantine: ref BlobQuarantine, - blockVerifier: BlockVerifierFn): RequestManager = + blockVerifier: BlockVerifierFn, + blockLoader: BlockLoaderFn = nil, + blobLoader: BlobLoaderFn = nil): RequestManager = RequestManager( network: network, getBeaconTime: getBeaconTime, @@ -72,7 +85,8 @@ proc init*(T: type RequestManager, network: Eth2Node, quarantine: quarantine, blobQuarantine: blobQuarantine, blockVerifier: blockVerifier, - ) + blockLoader: blockLoader, + blobLoader: blobLoader) proc checkResponse(roots: openArray[Eth2Digest], blocks: openArray[ref ForkedSignedBeaconBlock]): bool = @@ -96,12 +110,13 @@ proc checkResponse(idList: seq[BlobIdentifier], let block_root = hash_tree_root(blob.signed_block_header.message) var found = false for id in idList: - if id.block_root == block_root and - id.index == blob.index: - found = true - break + if id.block_root == block_root and id.index == blob.index: + found = true + break if not found: - return false + return false + blob[].verify_blob_sidecar_inclusion_proof().isOkOr: + return false true proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async: (raises: [CancelledError]).} = @@ -204,10 +219,10 @@ proc fetchBlobsFromNetwork(self: RequestManager, curRoot = block_root if (let o = self.quarantine[].popBlobless(curRoot); o.isSome): let b = o.unsafeGet() - discard await self.blockVerifier(ForkedSignedBeaconBlock.init(b), false) + discard await self.blockVerifier(b, false) # TODO: - # If appropriate, return a VerifierError.InvalidBlob from verification, - # check for it here, and penalize the peer accordingly. + # If appropriate, return a VerifierError.InvalidBlob from + # verification, check for it here, and penalize the peer accordingly else: debug "Blobs by root request failed", peer = peer, blobs = shortLog(idList), err = blobs.error() @@ -217,7 +232,8 @@ proc fetchBlobsFromNetwork(self: RequestManager, if not(isNil(peer)): self.network.peerPool.release(peer) -proc requestManagerBlockLoop(rman: RequestManager) {.async: (raises: [CancelledError]).} = +proc requestManagerBlockLoop( + rman: RequestManager) {.async: (raises: [CancelledError]).} = while true: # TODO This polling could be replaced with an AsyncEvent that is fired # from the quarantine when there's work to do @@ -226,24 +242,50 @@ proc requestManagerBlockLoop(rman: RequestManager) {.async: (raises: [CancelledE if rman.inhibit(): continue - let blocks = mapIt(rman.quarantine[].checkMissing( - SYNC_MAX_REQUESTED_BLOCKS), it.root) - if blocks.len == 0: + let missingBlockRoots = + rman.quarantine[].checkMissing(SYNC_MAX_REQUESTED_BLOCKS).mapIt(it.root) + if missingBlockRoots.len == 0: continue - debug "Requesting detected missing blocks", blocks = shortLog(blocks) + var blockRoots: seq[Eth2Digest] + if rman.blockLoader == nil: + blockRoots = missingBlockRoots + else: + var verifiers: + seq[Future[Result[void, VerifierError]].Raising([CancelledError])] + for blockRoot in missingBlockRoots: + let blck = rman.blockLoader(blockRoot).valueOr: + blockRoots.add blockRoot + continue + debug "Loaded orphaned block from storage", blockRoot + verifiers.add rman.blockVerifier( + blck.asSigned(), maybeFinalized = false) + try: + await allFutures(verifiers) + except CancelledError as exc: + var futs = newSeqOfCap[Future[void].Raising([])](verifiers.len) + for verifier in verifiers: + futs.add verifier.cancelAndWait() + await noCancel allFutures(futs) + raise exc + + if blockRoots.len == 0: + continue + + debug "Requesting detected missing blocks", blocks = shortLog(blockRoots) let start = SyncMoment.now(0) - var workers: array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])] + var workers: + array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])] for i in 0 ..< PARALLEL_REQUESTS: - workers[i] = rman.requestBlocksByRoot(blocks) + workers[i] = rman.requestBlocksByRoot(blockRoots) await allFutures(workers) - let finish = SyncMoment.now(uint64(len(blocks))) + let finish = SyncMoment.now(uint64(len(blockRoots))) - debug "Request manager block tick", blocks = shortLog(blocks), + debug "Request manager block tick", blocks = shortLog(blockRoots), sync_speed = speed(start, finish) proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] = @@ -280,7 +322,8 @@ proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] = rman.quarantine[].removeBlobless(blobless) fetches -proc requestManagerBlobLoop(rman: RequestManager) {.async: (raises: [CancelledError]).} = +proc requestManagerBlobLoop( + rman: RequestManager) {.async: (raises: [CancelledError]).} = while true: # TODO This polling could be replaced with an AsyncEvent that is fired # from the quarantine when there's work to do @@ -288,19 +331,58 @@ proc requestManagerBlobLoop(rman: RequestManager) {.async: (raises: [CancelledEr if rman.inhibit(): continue - let fetches = rman.getMissingBlobs() - if fetches.len > 0: - debug "Requesting detected missing blobs", blobs = shortLog(fetches) + let missingBlobIds = rman.getMissingBlobs() + if missingBlobIds.len == 0: + continue + + var blobIds: seq[BlobIdentifier] + if rman.blobLoader == nil: + blobIds = missingBlobIds + else: + var + blockRoots: seq[Eth2Digest] + curRoot: Eth2Digest + for blobId in missingBlobIds: + if blobId.block_root != curRoot: + curRoot = blobId.block_root + blockRoots.add curRoot + let blob_sidecar = rman.blobLoader(blobId).valueOr: + blobIds.add blobId + if blockRoots.len > 0 and blockRoots[^1] == curRoot: + # A blob is missing, remove from list of fully available blocks + discard blockRoots.pop() + continue + debug "Loaded orphaned blob from storage", blobId + rman.blobQuarantine[].put(blob_sidecar) + var verifiers = newSeqOfCap[ + Future[Result[void, VerifierError]] + .Raising([CancelledError])](blockRoots.len) + for blockRoot in blockRoots: + let blck = rman.quarantine[].popBlobless(blockRoot).valueOr: + continue + verifiers.add rman.blockVerifier(blck, maybeFinalized = false) + try: + await allFutures(verifiers) + except CancelledError as exc: + var futs = newSeqOfCap[Future[void].Raising([])](verifiers.len) + for verifier in verifiers: + futs.add verifier.cancelAndWait() + await noCancel allFutures(futs) + raise exc + + if blobIds.len > 0: + debug "Requesting detected missing blobs", blobs = shortLog(blobIds) let start = SyncMoment.now(0) - var workers: array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])] + var workers: + array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])] for i in 0 ..< PARALLEL_REQUESTS: - workers[i] = rman.fetchBlobsFromNetwork(fetches) + workers[i] = rman.fetchBlobsFromNetwork(blobIds) await allFutures(workers) - let finish = SyncMoment.now(uint64(len(fetches))) + let finish = SyncMoment.now(uint64(len(blobIds))) debug "Request manager blob tick", - blobs_count = len(fetches), + blobs_count = len(blobIds), sync_speed = speed(start, finish) proc start*(rman: var RequestManager) =