diff --git a/AllTests-mainnet.md b/AllTests-mainnet.md index 92edeac87..566ca1420 100644 --- a/AllTests-mainnet.md +++ b/AllTests-mainnet.md @@ -74,6 +74,11 @@ OK: 6/6 Fail: 0/6 Skip: 0/6 + Reverse order block add & get [Preset: mainnet] OK ``` OK: 1/1 Fail: 0/1 Skip: 0/1 +## Block quarantine +```diff ++ Unviable smoke test OK +``` +OK: 1/1 Fail: 0/1 Skip: 0/1 ## BlockId and helpers ```diff + atSlot sanity OK @@ -361,6 +366,7 @@ OK: 2/2 Fail: 0/2 Skip: 0/2 OK: 4/4 Fail: 0/4 Skip: 0/4 ## SyncManager test suite ```diff ++ Process all unviable blocks OK + [SyncQueue#Backward] Async unordered push test OK + [SyncQueue#Backward] Async unordered push with rewind test OK + [SyncQueue#Backward] Pass through established limits test OK @@ -380,7 +386,7 @@ OK: 4/4 Fail: 0/4 Skip: 0/4 + [SyncQueue] getLastNonEmptySlot() test OK + [SyncQueue] hasEndGap() test OK ``` -OK: 18/18 Fail: 0/18 Skip: 0/18 +OK: 19/19 Fail: 0/19 Skip: 0/19 ## Zero signature sanity checks ```diff + SSZ serialization roundtrip of SignedBeaconBlockHeader OK @@ -451,4 +457,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1 OK: 1/1 Fail: 0/1 Skip: 0/1 ---TOTAL--- -OK: 249/253 Fail: 0/253 Skip: 4/253 +OK: 251/255 Fail: 0/255 Skip: 4/255 diff --git a/beacon_chain/consensus_object_pools/block_pools_types.nim b/beacon_chain/consensus_object_pools/block_pools_types.nim index 20db74a32..eb15d04de 100644 --- a/beacon_chain/consensus_object_pools/block_pools_types.nim +++ b/beacon_chain/consensus_object_pools/block_pools_types.nim @@ -38,8 +38,8 @@ type ## appears or be discarded if finality obsoletes it UnviableFork - ## Block is from a different history / fork than the one we're interested - ## in (based on our finalized checkpoint) + ## Block is from a history / fork that does not include our most current + ## finalized checkpoint Duplicate ## We've seen this block already, can't add again @@ -53,9 +53,6 @@ type OnFinalizedCallback* = proc(data: FinalizationInfoObject) {.gcsafe, raises: [Defect].} - FetchRecord* = object - root*: Eth2Digest - KeyedBlockRef* = object # Special wrapper for BlockRef used in ChainDAG.blocks that allows lookup # by root without keeping a Table that keeps a separate copy of the digest diff --git a/beacon_chain/consensus_object_pools/block_quarantine.nim b/beacon_chain/consensus_object_pools/block_quarantine.nim index 7e725fb99..30903aa47 100644 --- a/beacon_chain/consensus_object_pools/block_quarantine.nim +++ b/beacon_chain/consensus_object_pools/block_quarantine.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2018-2021 Status Research & Development GmbH +# Copyright (c) 2018-2022 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -9,20 +9,26 @@ import std/[tables], - chronicles, stew/bitops2, - ../spec/forks, - ./block_pools_types + ../spec/forks -export tables, forks, block_pools_types +export tables, forks const MaxMissingItems = 1024 + ## Arbitrary + MaxOrphans = SLOTS_PER_EPOCH * 3 + ## Enough for finalization in an alternative fork + MaxUnviables = 16 * 1024 + ## About a day of blocks - most likely not needed but it's quite cheap.. type MissingBlock* = object tries*: int + FetchRecord* = object + root*: Eth2Digest + Quarantine* = object ## Keeps track of unvalidated blocks coming from the network ## and that cannot yet be added to the chain @@ -32,18 +38,29 @@ type ## ## Trivially invalid blocks may be dropped before reaching this stage. - orphans*: Table[(Eth2Digest, ValidatorSig), ForkedSignedBeaconBlock] ##\ - ## Blocks that we don't have a parent for - when we resolve the parent, we - ## can proceed to resolving the block as well - we index this by root and - ## signature such that a block with invalid signature won't cause a block - ## with a valid signature to be dropped + orphans*: Table[(Eth2Digest, ValidatorSig), ForkedSignedBeaconBlock] + ## Blocks that we don't have a parent for - when we resolve the parent, we + ## can proceed to resolving the block as well - we index this by root and + ## signature such that a block with invalid signature won't cause a block + ## with a valid signature to be dropped - missing*: Table[Eth2Digest, MissingBlock] ##\ - ## Roots of blocks that we would like to have (either parent_root of - ## unresolved blocks or block roots of attestations) + unviable*: OrderedTable[Eth2Digest, tuple[]] + ## Unviable blocks are those that come from a history that does not + ## include the finalized checkpoint we're currently following, and can + ## therefore never be included in our canonical chain - we keep their hash + ## around so that we can avoid cluttering the orphans table with their + ## descendants - the ChainDAG only keeps track blocks that make up the + ## valid and canonical history. + ## + ## Entries are evicted in FIFO order - recent entries are more likely to + ## appear again in attestations and blocks - however, the unviable block + ## table is not a complete directory of all unviable blocks circulating - + ## only those we have observed, been able to verify as unviable and fit + ## in this cache. -logScope: - topics = "quarant" + missing*: Table[Eth2Digest, MissingBlock] + ## Roots of blocks that we would like to have (either parent_root of + ## unresolved blocks or block roots of attestations) func init*(T: type Quarantine): T = T() @@ -83,70 +100,112 @@ func addMissing*(quarantine: var Quarantine, root: Eth2Digest) = if quarantine.missing.len >= MaxMissingItems: return + if root in quarantine.unviable: + # Won't get anywhere with this block + return + # It's not really missing if we're keeping it in the quarantine - if (not anyIt(quarantine.orphans.keys, it[0] == root)): - # If the block is in orphans, we no longer need it - discard quarantine.missing.hasKeyOrPut(root, MissingBlock()) + if anyIt(quarantine.orphans.keys, it[0] == root): + return + + # Add if it's not there, but don't update missing counter + discard quarantine.missing.hasKeyOrPut(root, MissingBlock()) func removeOrphan*( quarantine: var Quarantine, signedBlock: ForkySignedBeaconBlock) = quarantine.orphans.del((signedBlock.root, signedBlock.signature)) func isViableOrphan( - dag: ChainDAGRef, signedBlock: ForkedSignedBeaconBlock): bool = + finalizedSlot: Slot, signedBlock: ForkedSignedBeaconBlock): bool = # The orphan must be newer than the finalization point so that its parent # either is the finalized block or more recent - let slot = withBlck(signedBlock): blck.message.slot - slot > dag.finalizedHead.slot + let + slot = getForkedBlockField(signedBlock, slot) + slot > finalizedSlot -func removeOldBlocks(quarantine: var Quarantine, dag: ChainDAGRef) = - var oldBlocks: seq[(Eth2Digest, ValidatorSig)] +func cleanupUnviable(quarantine: var Quarantine) = + while quarantine.unviable.len() >= MaxUnviables: + var toDel: Eth2Digest + for k in quarantine.unviable.keys(): + toDel = k + break # Cannot modify while for-looping + quarantine.unviable.del(toDel) - template removeNonviableOrphans(orphans: untyped) = - for k, v in orphans.pairs(): - if not isViableOrphan(dag, v): - oldBlocks.add k +func addUnviable*(quarantine: var Quarantine, root: Eth2Digest) = + if root in quarantine.unviable: + return - for k in oldBlocks: - orphans.del k + quarantine.cleanupUnviable() - removeNonviableOrphans(quarantine.orphans) + # Remove the tree of orphans whose ancestor is unviable - they are now also + # unviable! This helps avoiding junk in the quarantine, because we don't keep + # unviable parents in the DAG and there's no way to tell an orphan from an + # unviable block without the parent. + var + toRemove: seq[(Eth2Digest, ValidatorSig)] # Can't modify while iterating + toCheck = @[root] + while toCheck.len > 0: + let root = toCheck.pop() + for k, v in quarantine.orphans.mpairs(): + if getForkedBlockField(v, parent_root) == root: + toCheck.add(k[0]) + toRemove.add(k) + elif k[0] == root: + toRemove.add(k) + + for k in toRemove: + quarantine.orphans.del k + quarantine.unviable.add(k[0], ()) + + toRemove.setLen(0) + + quarantine.unviable.add(root, ()) + +func cleanupOrphans(quarantine: var Quarantine, finalizedSlot: Slot) = + var toDel: seq[(Eth2Digest, ValidatorSig)] + + for k, v in quarantine.orphans.pairs(): + if not isViableOrphan(finalizedSlot, v): + toDel.add k + + for k in toDel: + quarantine.addUnviable k[0] func clearQuarantine*(quarantine: var Quarantine) = - quarantine.orphans.clear() - quarantine.missing.clear() + quarantine = Quarantine() # Typically, blocks will arrive in mostly topological order, with some # out-of-order block pairs. Therefore, it is unhelpful to use either a # FIFO or LIFO discpline, and since by definition each block gets used # either 0 or 1 times it's not a cache either. Instead, stop accepting -# new blocks, and rely on syncing to cache up again if necessary. When -# using forward sync, blocks only arrive in an order not requiring the -# quarantine. +# new blocks, and rely on syncing to cache up again if necessary. # # For typical use cases, this need not be large, as they're two or three # blocks arriving out of order due to variable network delays. As blocks # for future slots are rejected before reaching quarantine, this usually # will be a block for the last couple of slots for which the parent is a # likely imminent arrival. - -# Since we start forward sync when about one epoch is missing, that's as -# good a number as any. -const MAX_QUARANTINE_ORPHANS = SLOTS_PER_EPOCH - -func add*(quarantine: var Quarantine, dag: ChainDAGRef, - signedBlock: ForkedSignedBeaconBlock): bool = +func addOrphan*( + quarantine: var Quarantine, finalizedSlot: Slot, + signedBlock: ForkedSignedBeaconBlock): bool = ## Adds block to quarantine's `orphans` and `missing` lists. - if not isViableOrphan(dag, signedBlock): + if not isViableOrphan(finalizedSlot, signedBlock): + quarantine.addUnviable(signedBlock.root) return false - quarantine.removeOldBlocks(dag) + quarantine.cleanupOrphans(finalizedSlot) + + let parent_root = getForkedBlockField(signedBlock, parent_root) + + if parent_root in quarantine.unviable: + quarantine.unviable.add(signedBlock.root, ()) + return true # Even if the quarantine is full, we need to schedule its parent for # downloading or we'll never get to the bottom of things - withBlck(signedBlock): quarantine.addMissing(blck.message.parent_root) + quarantine.addMissing(parent_root) - if quarantine.orphans.lenu64 >= MAX_QUARANTINE_ORPHANS: + if quarantine.orphans.lenu64 >= MaxOrphans: return false quarantine.orphans[(signedBlock.root, signedBlock.signature)] = @@ -164,7 +223,7 @@ iterator pop*(quarantine: var Quarantine, root: Eth2Digest): for k in toRemove: quarantine.orphans.del k - for k, v in quarantine.orphans: + for k, v in quarantine.orphans.mpairs(): if getForkedBlockField(v, parent_root) == root: toRemove.add(k) yield v diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index 35eeb5420..fd3181b63 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -63,13 +63,13 @@ type # Producers # ---------------------------------------------------------------- - blockQueue*: AsyncQueue[BlockEntry] # Exported for "test_sync_manager" + blockQueue: AsyncQueue[BlockEntry] # Consumer # ---------------------------------------------------------------- consensusManager: ref ConsensusManager - validatorMonitor: ref ValidatorMonitor ## Blockchain DAG, AttestationPool and Quarantine + validatorMonitor: ref ValidatorMonitor getBeaconTime: GetBeaconTimeFn verifier: BatchVerifier @@ -101,40 +101,6 @@ proc new*(T: type BlockProcessor, proc hasBlocks*(self: BlockProcessor): bool = self.blockQueue.len() > 0 -# Enqueue -# ------------------------------------------------------------------------------ - -proc addBlock*( - self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, - resfut: Future[Result[void, BlockError]] = nil, - validationDur = Duration()) = - ## Enqueue a Gossip-validated block for consensus verification - # Backpressure: - # There is no backpressure here - producers must wait for `resfut` to - # constrain their own processing - # Producers: - # - Gossip (when synced) - # - SyncManager (during sync) - # - RequestManager (missing ancestor blocks) - - withBlck(blck): - if blck.message.slot <= self.consensusManager.dag.finalizedHead.slot: - # let backfill blocks skip the queue - these are always "fast" to process - # because there are no state rewinds to deal with - let res = self.consensusManager.dag.addBackfillBlock(blck) - if resFut != nil: - resFut.complete(res) - return - - try: - self.blockQueue.addLastNoWait(BlockEntry( - blck: blck, - resfut: resfut, queueTick: Moment.now(), - validationDur: validationDur, - src: src)) - except AsyncQueueFullError: - raiseAssert "unbounded queue" - # Storage # ------------------------------------------------------------------------------ @@ -143,7 +109,7 @@ proc dumpInvalidBlock*( if self.dumpEnabled: dump(self.dumpDirInvalid, signedBlock) -proc dumpBlock*[T]( +proc dumpBlock[T]( self: BlockProcessor, signedBlock: ForkySignedBeaconBlock, res: Result[T, BlockError]) = @@ -156,6 +122,32 @@ proc dumpBlock*[T]( else: discard +proc storeBackfillBlock( + self: var BlockProcessor, + signedBlock: ForkySignedBeaconBlock): Result[void, BlockError] = + + # The block is certainly not missing any more + self.consensusManager.quarantine[].missing.del(signedBlock.root) + + let res = self.consensusManager.dag.addBackfillBlock(signedBlock) + + if res.isErr(): + case res.error + of BlockError.MissingParent: + if signedBlock.message.parent_root in + self.consensusManager.quarantine[].unviable: + # DAG doesn't know about unviable ancestor blocks - we do! Translate + # this to the appropriate error so that sync etc doesn't retry the block + self.consensusManager.quarantine[].addUnviable(signedBlock.root) + + return err(BlockError.UnviableFork) + of BlockError.UnviableFork: + # Track unviables so that descendants can be discarded properly + self.consensusManager.quarantine[].addUnviable(signedBlock.root) + else: discard + + res + proc storeBlock*( self: var BlockProcessor, src: MsgSource, wallTime: BeaconTime, @@ -213,13 +205,26 @@ proc storeBlock*( # However this block was before the last finalized epoch and so its parent # was pruned from the ForkChoice. if blck.isErr(): - if blck.error() == BlockError.MissingParent: - if not self.consensusManager.quarantine[].add( - dag, ForkedSignedBeaconBlock.init(signedBlock)): + case blck.error() + of BlockError.MissingParent: + if signedBlock.message.parent_root in + self.consensusManager.quarantine[].unviable: + # DAG doesn't know about unviable ancestor blocks - we do! Translate + # this to the appropriate error so that sync etc doesn't retry the block + self.consensusManager.quarantine[].addUnviable(signedBlock.root) + + return err(BlockError.UnviableFork) + + if not self.consensusManager.quarantine[].addOrphan( + dag.finalizedHead.slot, ForkedSignedBeaconBlock.init(signedBlock)): debug "Block quarantine full", blockRoot = shortLog(signedBlock.root), blck = shortLog(signedBlock.message), signature = shortLog(signedBlock.signature) + of BlockError.UnviableFork: + # Track unviables so that descendants can be discarded properly + self.consensusManager.quarantine[].addUnviable(signedBlock.root) + else: discard return blck @@ -247,6 +252,41 @@ proc storeBlock*( blck +# Enqueue +# ------------------------------------------------------------------------------ + +proc addBlock*( + self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, + resfut: Future[Result[void, BlockError]] = nil, + validationDur = Duration()) = + ## Enqueue a Gossip-validated block for consensus verification + # Backpressure: + # There is no backpressure here - producers must wait for `resfut` to + # constrain their own processing + # Producers: + # - Gossip (when synced) + # - SyncManager (during sync) + # - RequestManager (missing ancestor blocks) + + withBlck(blck): + if blck.message.slot <= self.consensusManager.dag.finalizedHead.slot: + # let backfill blocks skip the queue - these are always "fast" to process + # because there are no state rewinds to deal with + let res = self.storeBackfillBlock(blck) + + if resFut != nil: + resFut.complete(res) + return + + try: + self.blockQueue.addLastNoWait(BlockEntry( + blck: blck, + resfut: resfut, queueTick: Moment.now(), + validationDur: validationDur, + src: src)) + except AsyncQueueFullError: + raiseAssert "unbounded queue" + # Event Loop # ------------------------------------------------------------------------------ diff --git a/beacon_chain/gossip_processing/gossip_validation.nim b/beacon_chain/gossip_processing/gossip_validation.nim index c34866bce..ac91a36b1 100644 --- a/beacon_chain/gossip_processing/gossip_validation.nim +++ b/beacon_chain/gossip_processing/gossip_validation.nim @@ -228,6 +228,7 @@ template validateBeaconBlockBellatrix( compute_timestamp_at_slot(state.data, signed_beacon_block.message.slot) if not (signed_beacon_block.message.body.execution_payload.timestamp == timestampAtSlot): + quarantine[].addUnviable(signed_beacon_block.root) return errReject("BeaconBlock: Mismatched execution payload timestamp") # https://github.com/ethereum/consensus-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#beacon_block @@ -306,9 +307,15 @@ proc validateBeaconBlock*( # And implicitly: # [REJECT] The block's parent (defined by block.parent_root) passes validation. let parent = dag.getBlockRef(signed_beacon_block.message.parent_root).valueOr: + if signed_beacon_block.message.parent_root in quarantine[].unviable: + quarantine[].addUnviable(signed_beacon_block.root) + return errReject("BeaconBlock: parent from unviable fork") + # When the parent is missing, we can't validate the block - we'll queue it # in the quarantine for later processing - if not quarantine[].add(dag, ForkedSignedBeaconBlock.init(signed_beacon_block)): + if not quarantine[].addOrphan( + dag.finalizedHead.slot, + ForkedSignedBeaconBlock.init(signed_beacon_block)): debug "Block quarantine full" return errIgnore("BeaconBlock: Parent not found") @@ -328,6 +335,8 @@ proc validateBeaconBlock*( return errIgnore("BeaconBlock: Can't find ancestor") if not (finalized_checkpoint.root in [ancestor.root, Eth2Digest()]): + quarantine[].addUnviable(signed_beacon_block.root) + return errReject("BeaconBlock: Finalized checkpoint not an ancestor") # [REJECT] The block is proposed by the expected proposer_index for the @@ -344,6 +353,8 @@ proc validateBeaconBlock*( return errIgnore("BeaconBlock: Cannot compute proposer") # internal issue if uint64(proposer.get()) != signed_beacon_block.message.proposer_index: + quarantine[].addUnviable(signed_beacon_block.root) + return errReject("BeaconBlock: Unexpected proposer proposer") # [REJECT] The proposer signature, signed_beacon_block.signature, is valid @@ -355,6 +366,8 @@ proc validateBeaconBlock*( signed_beacon_block.root, dag.validatorKey(proposer.get()).get(), signed_beacon_block.signature): + quarantine[].addUnviable(signed_beacon_block.root) + return errReject("BeaconBlock: Invalid proposer signature") validateBeaconBlockBellatrix(signed_beacon_block, parent) diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 24986ea69..d728580f8 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -30,14 +30,15 @@ import ../spec/datatypes/[phase0, altair, bellatrix], ../spec/[eth2_ssz_serialization, network, helpers, forks], ../validators/keystore_management, - ./eth2_discovery, ./peer_pool, ./libp2p_json_serialization + "."/[eth2_discovery, libp2p_json_serialization, peer_pool, peer_scores] when chronicles.enabledLogLevel == LogLevel.TRACE: import std/sequtils export - tables, version, multiaddress, peer_pool, peerinfo, p2pProtocol, connection, - libp2p_json_serialization, eth2_ssz_serialization, results, eth2_discovery + tables, version, multiaddress, peerinfo, p2pProtocol, connection, + libp2p_json_serialization, eth2_ssz_serialization, results, eth2_discovery, + peer_pool, peer_scores logScope: topics = "networking" @@ -216,15 +217,6 @@ const clientId* = "Nimbus beacon node " & fullVersionStr nodeMetadataFilename = "node-metadata.json" - NewPeerScore = 200 - ## Score which will be assigned to new connected Peer - PeerScoreLowLimit = 0 - ## Score after which peer will be kicked - PeerScoreHighLimit = 1000 - ## Max value of peer's score - PeerScoreInvalidRequest = -500 - ## This peer is sending malformed or nonsensical data - ConcurrentConnections = 20 ## Maximum number of active concurrent connection requests. diff --git a/beacon_chain/sync/peer_scores.nim b/beacon_chain/networking/peer_scores.nim similarity index 62% rename from beacon_chain/sync/peer_scores.nim rename to beacon_chain/networking/peer_scores.nim index 57bd8f380..a356bb74d 100644 --- a/beacon_chain/sync/peer_scores.nim +++ b/beacon_chain/networking/peer_scores.nim @@ -8,6 +8,15 @@ {.push raises: [Defect].} const + NewPeerScore* = 300 + ## Score which will be assigned to new connected Peer + PeerScoreLowLimit* = 0 + ## Score after which peer will be kicked + PeerScoreHighLimit* = 1000 + ## Max value of peer's score + PeerScoreInvalidRequest* = -500 + ## This peer is sending malformed or nonsensical data + PeerScoreHeadTooNew* = -100 ## The peer reports a head newer than our wall clock slot PeerScoreNoStatus* = -100 @@ -26,5 +35,11 @@ const ## Peer's response contains incorrect blocks. PeerScoreBadResponse* = -1000 ## Peer's response is not in requested range. - PeerScoreMissingBlocks* = -200 - ## Peer response contains too many empty blocks. + PeerScoreMissingBlocks* = -25 + ## Peer response contains too many empty blocks - this can happen either + ## because a long reorg happened or the peer is falsely trying to convince + ## us that a long reorg happened. + ## Peer's `blocksByRange` answer is fine. + PeerScoreUnviableFork* = -200 + ## Peer responded with blocks from an unviable fork - are they on a + ## different chain? diff --git a/beacon_chain/sync/request_manager.nim b/beacon_chain/sync/request_manager.nim index f600d5e23..176b29f57 100644 --- a/beacon_chain/sync/request_manager.nim +++ b/beacon_chain/sync/request_manager.nim @@ -7,12 +7,13 @@ {.push raises: [Defect].} -import options, sequtils, strutils +import std/[sequtils, strutils] import chronos, chronicles import - ../spec/datatypes/[phase0, altair], + ../spec/datatypes/[phase0], ../spec/forks, ../networking/eth2_network, + ../consensus_object_pools/block_quarantine, "."/sync_protocol, "."/sync_manager export sync_manager @@ -82,37 +83,49 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager, if blocks.isOk: let ublocks = blocks.get() if checkResponse(items, ublocks): - var res: Result[void, BlockError] - if len(ublocks) > 0: - for b in ublocks: - res = await rman.blockVerifier(b) - if res.isErr(): - case res.error() - of BlockError.MissingParent: - # Ignoring because the order of the blocks that - # we requested may be different from the order in which we need - # these blocks to apply. - discard - of BlockError.Duplicate, BlockError.UnviableFork: - # Ignoring because these errors could occur due to the - # concurrent/parallel requests we made. - discard - of BlockError.Invalid: - # We stop processing blocks further to avoid DoS attack with big - # chunk of incorrect blocks. - break - else: - res = Result[void, BlockError].ok() + var + gotGoodBlock = false + gotUnviableBlock = false + + for b in ublocks: + let ver = await rman.blockVerifier(b) + if ver.isErr(): + case ver.error() + of BlockError.MissingParent: + # Ignoring because the order of the blocks that + # we requested may be different from the order in which we need + # these blocks to apply. + discard + of BlockError.Duplicate: + # Ignoring because these errors could occur due to the + # concurrent/parallel requests we made. + discard + of BlockError.UnviableFork: + # If they're working a different fork, we'll want to descore them + # but also process the other blocks (in case we can register the + # other blocks as unviable) + gotUnviableBlock = true + of BlockError.Invalid: + # We stop processing blocks because peer is either sending us + # junk or working a different fork + warn "Received invalid block", + peer = peer, blocks = shortLog(items), + peer_score = peer.getScore() + peer.updateScore(PeerScoreBadBlocks) + + return # Stop processing this junk... + else: + gotGoodBlock = true + + if gotUnviableBlock: + notice "Received blocks from an unviable fork", + peer = peer, blocks = shortLog(items), + peer_score = peer.getScore() + peer.updateScore(PeerScoreUnviableFork) + elif gotGoodBlock: + # We reward peer only if it returns something. + peer.updateScore(PeerScoreGoodBlocks) - if res.isOk(): - if len(ublocks) > 0: - # We reward peer only if it returns something. - peer.updateScore(PeerScoreGoodBlocks) - else: - # We are not penalizing other errors because of the reasons described - # above. - if res.error == BlockError.Invalid: - peer.updateScore(PeerScoreBadBlocks) else: peer.updateScore(PeerScoreBadResponse) else: diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index c9b2da848..59719a9fd 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -13,9 +13,9 @@ import ../spec/datatypes/[phase0, altair], ../spec/eth2_apis/rpc_types, ../spec/[helpers, forks], - ../networking/[peer_pool, eth2_network], + ../networking/[peer_pool, peer_scores, eth2_network], ../beacon_clock, - ./peer_scores, ./sync_queue + ./sync_queue export phase0, altair, merge, chronos, chronicles, results, helpers, peer_scores, sync_queue, forks diff --git a/beacon_chain/sync/sync_queue.nim b/beacon_chain/sync/sync_queue.nim index 7bf1233ce..4efca24bd 100644 --- a/beacon_chain/sync/sync_queue.nim +++ b/beacon_chain/sync/sync_queue.nim @@ -15,11 +15,10 @@ import ../spec/[helpers, forks], ../networking/[peer_pool, eth2_network], ../gossip_processing/block_processor, - ../consensus_object_pools/block_pools_types, - ./peer_scores + ../consensus_object_pools/block_pools_types export base, phase0, altair, merge, chronos, chronicles, results, - block_pools_types, helpers, peer_scores + block_pools_types, helpers logScope: topics = "syncqueue" @@ -63,7 +62,6 @@ type chunkSize*: uint64 queueSize*: int counter*: uint64 - opcounter*: uint64 pending*: Table[uint64, SyncRequest[T]] waiters: seq[SyncWaiter] getSafeSlot*: GetSlotCallback @@ -570,33 +568,59 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], if processingCb != nil: processingCb() - template isOkResponse(res: auto): bool = - res.isOk() or res.error in {BlockError.Duplicate, BlockError.UnviableFork} - # Validating received blocks one by one - var res: Result[void, BlockError] - var failSlot: Option[Slot] - if len(item.data) > 0: - for blk in sq.blocks(item): - trace "Pushing block", block_root = blk.root, - block_slot = blk.slot - res = await sq.blockVerifier(blk) - if not res.isOkResponse(): - failSlot = some(blk.slot) + var + hasOkBlock = false + hasInvalidBlock = false + unviableBlock: Option[(Eth2Digest, Slot)] + missingParentSlot: Option[Slot] + + # compiler segfault if this is moved into the for loop, at time of writing + res: Result[void, BlockError] + + for blk in sq.blocks(item): + res = await sq.blockVerifier(blk) + if res.isOk(): + hasOkBlock = true + else: + case res.error() + of BlockError.MissingParent: + missingParentSlot = some(blk.slot) break - else: - res = Result[void, BlockError].ok() + of BlockError.Duplicate: + # Keep going, happens naturally + discard + of BlockError.UnviableFork: + # Keep going so as to register other unviable blocks with the + # quarantine + if unviableBlock.isNone: + # Remember the first unviable block, so we can log it + unviableBlock = some((blk.root, blk.slot)) - # Increase progress counter, so watch task will be able to know that we are - # not stuck. - inc(sq.opcounter) + of BlockError.Invalid: + hasInvalidBlock = true - if res.isOkResponse(): + let req = item.request + warn "Received invalid sequence of blocks", peer = req.item, + request_slot = req.slot, request_count = req.count, + request_step = req.step, blocks_count = len(item.data), + blocks_map = getShortMap(req, item.data), + direction = req.kind, topics = "syncman" + req.item.updateScore(PeerScoreBadBlocks) + break + + # When errors happen while processing blocks, we retry the same request + # with, hopefully, a different peer + let retryRequest = + hasInvalidBlock or unviableBlock.isSome() or missingParentSlot.isSome() + if not retryRequest: sq.advanceOutput(item.request.count) - if len(item.data) > 0: + + if hasOkBlock: # If there no error and response was not empty we should reward peer - # with some bonus score. + # with some bonus score - not for duplicate blocks though. item.request.item.updateScore(PeerScoreGoodBlocks) + sq.wakeupWaiters() else: debug "Block pool rejected peer's response", peer = item.request.item, @@ -604,13 +628,31 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], request_count = item.request.count, request_step = item.request.step, blocks_map = getShortMap(item.request, item.data), - blocks_count = len(item.data), errCode = res.error, + blocks_count = len(item.data), + ok = hasOkBlock, + unviable = unviableBlock.isSome(), + missing_parent = missingParentSlot.isSome(), direction = item.request.kind, topics = "syncman" - var resetSlot: Option[Slot] + # We need to move failed response to the debts queue. + sq.toDebtsQueue(item.request) + + if unviableBlock.isSome: + let req = item.request + notice "Received blocks from an unviable fork", + blockRoot = unviableBlock.get()[0], + blockSlot = unviableBlock.get()[1], peer = req.item, + request_slot = req.slot, request_count = req.count, + request_step = req.step, blocks_count = len(item.data), + blocks_map = getShortMap(req, item.data), + direction = req.kind, topics = "syncman" + req.item.updateScore(PeerScoreUnviableFork) + + if missingParentSlot.isSome: + var + resetSlot: Option[Slot] + failSlot = missingParentSlot.get() - case res.error - of BlockError.MissingParent: # If we got `BlockError.MissingParent` it means that peer returns chain # of blocks with holes or `block_pool` is in incomplete state. We going # to rewind to the first slot at latest finalized epoch. @@ -620,11 +662,11 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], case sq.kind of SyncQueueKind.Forward: if safeSlot < req.slot: - let rewindSlot = sq.getRewindPoint(failSlot.get(), safeSlot) + let rewindSlot = sq.getRewindPoint(failSlot, safeSlot) warn "Unexpected missing parent, rewind happens", peer = req.item, rewind_to_slot = rewindSlot, rewind_epoch_count = sq.rewind.get().epochCount, - rewind_fail_slot = failSlot.get(), + rewind_fail_slot = failSlot, finalized_slot = safeSlot, request_slot = req.slot, request_count = req.count, request_step = req.step, blocks_count = len(item.data), @@ -642,11 +684,11 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], req.item.updateScore(PeerScoreBadBlocks) of SyncQueueKind.Backward: if safeSlot > req.slot: - let rewindSlot = sq.getRewindPoint(failSlot.get(), safeSlot) + let rewindSlot = sq.getRewindPoint(failSlot, safeSlot) # It's quite common peers give us fewer blocks than we ask for info "Gap in block range response, rewinding", peer = req.item, rewind_to_slot = rewindSlot, - rewind_fail_slot = failSlot.get(), + rewind_fail_slot = failSlot, finalized_slot = safeSlot, request_slot = req.slot, request_count = req.count, request_step = req.step, blocks_count = len(item.data), @@ -662,32 +704,21 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], blocks_map = getShortMap(req, item.data), direction = req.kind, topics = "syncman" req.item.updateScore(PeerScoreBadBlocks) - of BlockError.Invalid: - let req = item.request - warn "Received invalid sequence of blocks", peer = req.item, - request_slot = req.slot, request_count = req.count, - request_step = req.step, blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data), - direction = req.kind, topics = "syncman" - req.item.updateScore(PeerScoreBadBlocks) - of BlockError.Duplicate, BlockError.UnviableFork: - raiseAssert "Handled above" - # We need to move failed response to the debts queue. - sq.toDebtsQueue(item.request) - if resetSlot.isSome(): - await sq.resetWait(resetSlot) - case sq.kind - of SyncQueueKind.Forward: - debug "Rewind to slot was happened", reset_slot = reset_slot.get(), - queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot, - rewind_epoch_count = sq.rewind.get().epochCount, - rewind_fail_slot = sq.rewind.get().failSlot, - reset_slot = resetSlot, direction = sq.kind, topics = "syncman" - of SyncQueueKind.Backward: - debug "Rewind to slot was happened", reset_slot = reset_slot.get(), - queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot, - reset_slot = resetSlot, direction = sq.kind, topics = "syncman" + if resetSlot.isSome(): + await sq.resetWait(resetSlot) + case sq.kind + of SyncQueueKind.Forward: + debug "Rewind to slot was happened", reset_slot = reset_slot.get(), + queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot, + rewind_epoch_count = sq.rewind.get().epochCount, + rewind_fail_slot = sq.rewind.get().failSlot, + reset_slot = resetSlot, direction = sq.kind, topics = "syncman" + of SyncQueueKind.Backward: + debug "Rewind to slot was happened", reset_slot = reset_slot.get(), + queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot, + reset_slot = resetSlot, direction = sq.kind, topics = "syncman" + break proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T]) = diff --git a/tests/all_tests.nim b/tests/all_tests.nim index e4b0a1be7..1bb547be3 100644 --- a/tests/all_tests.nim +++ b/tests/all_tests.nim @@ -18,6 +18,7 @@ import # Unit test ./test_beacon_time, ./test_block_dag, ./test_block_processor, + ./test_block_quarantine, ./test_datatypes, ./test_discovery, ./test_eth1_monitor, diff --git a/tests/test_block_quarantine.nim b/tests/test_block_quarantine.nim new file mode 100644 index 000000000..5fa00614b --- /dev/null +++ b/tests/test_block_quarantine.nim @@ -0,0 +1,59 @@ +# beacon_chain +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.used.} + +import + chronicles, + unittest2, + ../beacon_chain/spec/forks, + ../beacon_chain/spec/datatypes/phase0, + ../beacon_chain/consensus_object_pools/block_quarantine + +proc makeBlock(slot: Slot, parent: Eth2Digest): ForkedSignedBeaconBlock = + var + b = phase0.SignedBeaconBlock( + message: phase0.BeaconBlock(slot: slot, parent_root: parent)) + b.root = hash_tree_root(b.message) + ForkedSignedBeaconBlock.init(b) + +suite "Block quarantine": + test "Unviable smoke test": + let + b0 = makeBlock(Slot 0, Eth2Digest()) + b1 = makeBlock(Slot 1, b0.root) + b2 = makeBlock(Slot 2, b1.root) + b3 = makeBlock(Slot 3, b2.root) + b4 = makeBlock(Slot 4, b2.root) + + var quarantine: Quarantine + + quarantine.addMissing(b1.root) + check: + FetchRecord(root: b1.root) in quarantine.checkMissing() + + quarantine.addOrphan(Slot 0, b1) + + FetchRecord(root: b1.root) notin quarantine.checkMissing() + + quarantine.addOrphan(Slot 0, b2) + quarantine.addOrphan(Slot 0, b3) + quarantine.addOrphan(Slot 0, b4) + + (b4.root, ValidatorSig()) in quarantine.orphans + + quarantine.addUnviable(b4.root) + + check: + (b4.root, ValidatorSig()) notin quarantine.orphans + + quarantine.addUnviable(b1.root) + + check: + (b1.root, ValidatorSig()) notin quarantine.orphans + (b2.root, ValidatorSig()) notin quarantine.orphans + (b3.root, ValidatorSig()) notin quarantine.orphans diff --git a/tests/test_sync_manager.nim b/tests/test_sync_manager.nim index f4537a81b..4357e6d31 100644 --- a/tests/test_sync_manager.nim +++ b/tests/test_sync_manager.nim @@ -552,6 +552,52 @@ suite "SyncManager test suite": check waitFor(runTest()) == true + test "Process all unviable blocks": + let + aq = newAsyncQueue[BlockEntry]() + startSlot = Slot(0) + chunkSize = SLOTS_PER_EPOCH + numberOfChunks = 1'u64 + finishSlot = Slot(startSlot + numberOfChunks * chunkSize - 1'u64) + queueSize = 1 + + var counter = int(startSlot) + + proc forwardValidator(aq: AsyncQueue[BlockEntry]) {.async.} = + while true: + let sblock = await aq.popFirst() + withBlck(sblock.blck): + sblock.fail(BlockError.UnviableFork) + inc(counter) + + var + chain = createChain(startSlot, finishSlot) + queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, + startSlot, finishSlot, chunkSize, + getFirstSlotAtFinalizedEpoch, collector(aq), + queueSize) + validatorFut = forwardValidator(aq) + + let + p1 = SomeTPeer() + + proc runTest(): Future[bool] {.async.} = + var r11 = queue.pop(finishSlot, p1) + + # Push a single request that will fail with all blocks being unviable + var f11 = queue.push(r11, chain.getSlice(startSlot, r11)) + discard await f11.withTimeout(100.milliseconds) + + check: + f11.finished == true + counter == int(startSlot + chunkSize) # should process all unviable blocks + debtLen(queue) == chunkSize # The range must be retried + + await validatorFut.cancelAndWait() + return true + + check waitFor(runTest()) == true + test "[SyncQueue#Backward] Async unordered push with rewind test": let aq = newAsyncQueue[BlockEntry]()