From 08b87e2506d4f1e97caf3314dd0c20d92188cfcd Mon Sep 17 00:00:00 2001 From: Etan Kissling Date: Mon, 25 Mar 2024 21:48:06 +0100 Subject: [PATCH] add branch discovery module for use in split view scenarios When the network is partitioned for a long time, e.g., Goerli, branches start forming where different peers have distinct views about the chain state. The current syncing solution with sync manager doesn't handle the case well, as it is optimized for a healthy network where syncing can be parallelized across different peers. To support sync manager discovering additional branches, a new module is added that pulls in histories from peers on unknown branches in a backwards manner. --- beacon_chain/beacon_node.nim | 5 +- beacon_chain/conf.nim | 6 + .../block_pools_types.nim | 3 + .../consensus_object_pools/blockchain_dag.nim | 9 + beacon_chain/nimbus_beacon_node.nim | 29 +- beacon_chain/sync/branch_discovery.nim | 286 ++++++++++++++++++ beacon_chain/validators/beacon_validators.nim | 52 +++- 7 files changed, 385 insertions(+), 5 deletions(-) create mode 100644 beacon_chain/sync/branch_discovery.nim diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index e48bcfe7c..34f23eb00 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -25,7 +25,7 @@ import attestation_pool, sync_committee_msg_pool, validator_change_pool], ./spec/datatypes/[base, altair], ./spec/eth2_apis/dynamic_fee_recipients, - ./sync/[sync_manager, request_manager], + ./sync/[branch_discovery, sync_manager, request_manager], ./validators/[ action_tracker, message_router, validator_monitor, validator_pool, keystore_management], @@ -35,7 +35,7 @@ export osproc, chronos, presto, action_tracker, beacon_clock, beacon_chain_db, conf, light_client, attestation_pool, sync_committee_msg_pool, validator_change_pool, - eth2_network, el_manager, request_manager, sync_manager, + eth2_network, el_manager, branch_discovery, request_manager, sync_manager, eth2_processor, optimistic_processor, blockchain_dag, block_quarantine, base, message_router, validator_monitor, validator_pool, consensus_manager, dynamic_fee_recipients @@ -85,6 +85,7 @@ type requestManager*: RequestManager syncManager*: SyncManager[Peer, PeerId] backfiller*: SyncManager[Peer, PeerId] + branchDiscovery*: ref BranchDiscovery genesisSnapshotContent*: string processor*: ref Eth2Processor blockProcessor*: ref BlockProcessor diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index 3aa3d0dff..70c238f8a 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -584,6 +584,12 @@ type defaultValue: true name: "doppelganger-detection" .}: bool + splitViewsMerge* {. + hidden + desc: "Whether or not to try and discover unknown branches in situations where the network has partitioned into split views" + defaultValue: false + name: "debug-split-views-merge" .}: bool + syncHorizon* {. hidden desc: "Number of empty slots to process before considering the client out of sync. Defaults to the number of slots in 10 minutes" diff --git a/beacon_chain/consensus_object_pools/block_pools_types.nim b/beacon_chain/consensus_object_pools/block_pools_types.nim index 11a3806a6..da429b65d 100644 --- a/beacon_chain/consensus_object_pools/block_pools_types.nim +++ b/beacon_chain/consensus_object_pools/block_pools_types.nim @@ -206,6 +206,9 @@ type cfg*: RuntimeConfig + lastChainProgress*: Moment + ## Indicates the last wall time at which meaningful progress was made + shufflingRefs*: LRUCache[16, ShufflingRef] epochRefs*: LRUCache[32, EpochRef] diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index 56e49e81a..c8b3149da 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -1004,6 +1004,13 @@ proc applyBlock( ok() +proc resetChainProgressWatchdog*(dag: ChainDAGRef) = + dag.lastChainProgress = Moment.now() + +proc chainIsProgressing*(dag: ChainDAGRef): bool = + const watchdogDuration = chronos.minutes(60) + dag.lastChainProgress + watchdogDuration >= Moment.now() + proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, validatorMonitor: ref ValidatorMonitor, updateFlags: UpdateFlags, eraPath = ".", @@ -1044,6 +1051,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, # allow skipping some validation. updateFlags: updateFlags * {strictVerification}, cfg: cfg, + lastChainProgress: Moment.now(), vanityLogs: vanityLogs, @@ -2387,6 +2395,7 @@ proc updateHead*( quit 1 dag.head = newHead + dag.resetChainProgressWatchdog() if dag.headState.is_merge_transition_complete() and not lastHeadMergeComplete and diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 4c9feaa95..04181ab83 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -367,6 +367,9 @@ proc initFullNode( func getFrontfillSlot(): Slot = max(dag.frontfill.get(BlockId()).slot, dag.horizon) + func isBlockKnown(blockRoot: Eth2Digest): bool = + dag.getBlockRef(blockRoot).isSome + let quarantine = newClone( Quarantine.init()) @@ -398,6 +401,13 @@ proc initFullNode( # that should probably be reimagined more holistically in the future. blockProcessor[].addBlock( MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized) + branchDiscoveryBlockVerifier = proc( + signedBlock: ForkedSignedBeaconBlock, + blobs: Opt[BlobSidecars] + ): Future[Result[void, VerifierError]] {.async: (raises: [ + CancelledError], raw: true).} = + blockProcessor[].addBlock( + MsgSource.gossip, signedBlock, blobs, maybeFinalized = false) rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} = @@ -448,6 +458,9 @@ proc initFullNode( getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, getFrontfillSlot, dag.backfill.slot, blockVerifier, maxHeadAge = 0) + branchDiscovery = BranchDiscovery.new( + node.network, getFirstSlotAtFinalizedEpoch, isBlockKnown, + branchDiscoveryBlockVerifier) router = (ref MessageRouter)( processor: processor, network: node.network) @@ -490,6 +503,7 @@ proc initFullNode( node.requestManager = requestManager node.syncManager = syncManager node.backfiller = backfiller + node.branchDiscovery = branchDiscovery node.router = router await node.addValidators() @@ -1564,6 +1578,10 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} = await node.updateGossipStatus(slot + 1) + # Branch discovery module is only used to support ongoing sync manager tasks + if not node.syncManager.inProgress: + await node.branchDiscovery.stop() + func formatNextConsensusFork( node: BeaconNode, withVanityArt = false): Opt[string] = let consensusFork = @@ -1583,6 +1601,14 @@ func syncStatus(node: BeaconNode, wallSlot: Slot): string = let optimisticHead = not node.dag.head.executionValid if node.syncManager.inProgress: let + degradedSuffix = + case node.branchDiscovery.state + of BranchDiscoveryState.Active: + "/discovering" + of BranchDiscoveryState.Suspended: + "/degraded" + of BranchDiscoveryState.Stopped: + "" optimisticSuffix = if optimisticHead: "/opt" @@ -1593,7 +1619,8 @@ func syncStatus(node: BeaconNode, wallSlot: Slot): string = " - lc: " & $shortLog(node.consensusManager[].optimisticHead) else: "" - node.syncManager.syncStatus & optimisticSuffix & lightClientSuffix + node.syncManager.syncStatus & degradedSuffix & optimisticSuffix & + lightClientSuffix elif node.backfiller.inProgress: "backfill: " & node.backfiller.syncStatus elif optimistic_head: diff --git a/beacon_chain/sync/branch_discovery.nim b/beacon_chain/sync/branch_discovery.nim new file mode 100644 index 000000000..4a6545bf4 --- /dev/null +++ b/beacon_chain/sync/branch_discovery.nim @@ -0,0 +1,286 @@ +# beacon_chain +# Copyright (c) 2024 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +# This module is started when a chain stall is detected, i.e., +# a long period without any chain progress, while at the same time being +# connected to a healthy number of different peers. +# +# In such a scenario, the network may partition into multiple groups of peers +# that build on separate branches. It is vital to specifically target peers +# from different partitions and download branches which are not necessarily +# popularly served. Branches may be unpopular because they are expensive to +# apply, and may have a difficult time propagating despite a large weight in +# attestations backing them. This only exacerbates the longer the split view +# scenario is ongoing. +# +# While sync manager can sync popular chains well, it cannot reliably sync +# minority chains only served by a limited number of peers. This module +# augments sync manager in the split view scenario. +# Note that request manager is not running while sync manager is running. +# +# Note that the canonical chain may not be on the highest slot number, +# as some partitions of the network may have built on top of branches +# with lower validator support while the canonical chain was not visible. + +import + std/algorithm, + chronos, chronicles, metrics, results, + ../spec/[forks, network], + ../consensus_object_pools/block_pools_types, + ../networking/[eth2_network, peer_pool], + ./sync_protocol + +logScope: + topics = "branchdiscovery" + +declareGauge beacon_sync_branchdiscovery_state, + "Branch discovery module operating state" + +declareCounter beacon_sync_branchdiscovery_discovered_blocks, + "Number of beacon blocks discovered by the branch discovery module" + +type + BranchDiscoveryState* {.pure.} = enum + Stopped, + Suspended, + Active + + GetSlotCallback* = + proc(): Slot {.gcsafe, raises: [].} + + IsBlockKnownCallback* = + proc(blockRoot: Eth2Digest): bool {.gcsafe, raises: [].} + + BlockVerifierCallback* = proc( + signedBlock: ForkedSignedBeaconBlock, + blobs: Opt[BlobSidecars] + ): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} + + BranchDiscovery* = object + network: Eth2Node + getFinalizedSlot: GetSlotCallback + isBlockKnown: IsBlockKnownCallback + blockVerifier: BlockVerifierCallback + isActive: AsyncEvent + loopFuture: Future[void].Raising([]) + +proc new*( + T: type BranchDiscovery, + network: Eth2Node, + getFinalizedSlot: GetSlotCallback, + isBlockKnown: IsBlockKnownCallback, + blockVerifier: BlockVerifierCallback): ref BranchDiscovery = + let self = (ref BranchDiscovery)( + network: network, + getFinalizedSlot: getFinalizedSlot, + isBlockKnown: isBlockKnown, + blockVerifier: blockVerifier, + isActive: newAsyncEvent()) + self[].isActive.fire() + self + +proc discoverBranch( + self: BranchDiscovery, peer: Peer) {.async: (raises: [CancelledError]).} = + logScope: + peer + peer_score = peer.getScore() + + let + finalizedSlot = self.getFinalizedSlot() + peerHeadSlot = peer.getHeadSlot() + if peerHeadSlot <= finalizedSlot: + peer.updateScore(PeerScoreUseless) + debug "Peer's head slot is already finalized", peerHeadSlot, finalizedSlot + return + + var blockRoot = peer.getHeadRoot() + logScope: blockRoot + if self.isBlockKnown(blockRoot): + peer.updateScore(PeerScoreUseless) + debug "Peer's head block root is already known" + return + + const + maxRequestsPerBurst = 20 + burstDuration = chronos.seconds(40) + let bucket = TokenBucket.new(maxRequestsPerBurst, burstDuration) + + var parentSlot = peerHeadSlot + 1 + logScope: parentSlot + while true: + if self.isBlockKnown(blockRoot): + debug "Branch from peer no longer unknown" + return + if peer.getScore() < PeerScoreLowLimit: + debug "Failed to discover new branch from peer" + return + + debug "Discovering new branch from peer" + try: + await bucket.consume(1) + except CancelledError as exc: + raise exc + except CatchableError as exc: + raiseAssert "TokenBucket.consume should not fail: " & $exc.msg + let rsp = await peer.beaconBlocksByRoot_v2(BlockRootsList @[blockRoot]) + if rsp.isErr: + # `eth2_network` already descored according to the specific error + debug "Failed to receive block", err = rsp.error + await sleepAsync(RESP_TIMEOUT_DUR) + continue + template blocks: untyped = rsp.get + + # The peer was the one providing us with this block root, it should exist + if blocks.len == 0: + peer.updateScore(PeerScoreNoValues) + debug "Received no blocks", numBlocks = blocks.len + await sleepAsync(RESP_TIMEOUT_DUR) + continue + if blocks.len > 1: + peer.updateScore(PeerScoreBadResponse) + debug "Received too many blocks", numBlocks = blocks.len + return + template blck: untyped = blocks[0][] + if blck.slot >= parentSlot: + peer.updateScore(PeerScoreBadResponse) + debug "Received block older than parent", receivedSlot = blck.slot + return + if blck.root != blockRoot: + peer.updateScore(PeerScoreBadResponse) + debug "Received incorrect block", receivedRoot = blck.root + return + + var blobIds: seq[BlobIdentifier] + withBlck(blck): + when consensusFork >= ConsensusFork.Deneb: + for i in 0 ..< forkyBlck.message.body.blob_kzg_commitments.len: + blobIds.add BlobIdentifier(block_root: blockRoot, index: i.BlobIndex) + var blobs: Opt[BlobSidecars] + if blobIds.len > 0: + while true: + if self.isBlockKnown(blockRoot): + debug "Branch from peer no longer unknown" + return + if peer.getScore() < PeerScoreLowLimit: + debug "Failed to discover new branch from peer" + return + + try: + await bucket.consume(1) + except CancelledError as exc: + raise exc + except CatchableError as exc: + raiseAssert "TokenBucket.consume should not fail: " & $exc.msg + let r = await peer.blobSidecarsByRoot(BlobIdentifierList blobIds) + if r.isErr: + # `eth2_network` already descored according to the specific error + debug "Failed to receive blobs", err = r.error + await sleepAsync(RESP_TIMEOUT_DUR) + continue + template blobSidecars: untyped = r.unsafeGet + + if blobSidecars.len < blobIds.len: + peer.updateScore(PeerScoreMissingValues) + debug "Received not all blobs", + numBlobs = blobSidecars.len, expectedNumBlobs = blobIds.len + await sleepAsync(RESP_TIMEOUT_DUR) + continue + if blobSidecars.len > blobIds.len: + peer.updateScore(PeerScoreBadResponse) + debug "Received too many blobs", + numBlobs = blobSidecars.len, expectedNumBlobs = blobIds.len + return + for i, blobSidecar in blobSidecars: + let root = hash_tree_root(blobSidecar[].signed_block_header.message) + if root != blockRoot: + peer.updateScore(PeerScoreBadResponse) + debug "Received unexpected blob" + return + blobSidecar[].verify_blob_sidecar_inclusion_proof().isOkOr: + peer.updateScore(PeerScoreBadResponse) + debug "Received invalid blob" + return + blobs = Opt.some distinctBase(blobSidecars).sortedByIt(it.index) + for i, blobSidecar in blobs.get: + if blobSidecar[].index != i.BlobIndex: + peer.updateScore(PeerScoreBadResponse) + debug "Received duplicate blobs while others are missing" + return + break + + let err = (await self.blockVerifier(blck, blobs)).errorOr: + peer.updateScore(PeerScoreGoodBatchValue + PeerScoreGoodValues) + beacon_sync_branchdiscovery_discovered_blocks.inc() + info "Discovered new branch from peer" + break + case err + of VerifierError.Invalid: + peer.updateScore(PeerScoreBadResponse) + debug "Received invalid block" + return + of VerifierError.UnviableFork: + peer.updateScore(PeerScoreUnviableFork) + debug "Received unviable block" + return + of VerifierError.Duplicate: + peer.updateScore(PeerScoreGoodValues) + debug "Connected new branch from peer" + break + of VerifierError.MissingParent: + peer.updateScore(PeerScoreGoodBatchValue) + parentSlot = blck.slot + blockRoot = blck.getForkedBlockField(parent_root) + continue + +proc loop(self: ref BranchDiscovery) {.async: (raises: []).} = + try: + while true: + await self[].isActive.wait() + await sleepAsync(RESP_TIMEOUT_DUR) + + let peer = + try: + self[].network.peerPool.acquireNoWait() + except PeerPoolError as exc: + debug "Failed to acquire peer", exc = exc.msg + continue + defer: self[].network.peerPool.release(peer) + + await self[].discoverBranch(peer) + except CancelledError: + return + +func state*(self: ref BranchDiscovery): BranchDiscoveryState = + if self[].loopFuture == nil: + BranchDiscoveryState.Stopped + elif not self[].isActive.isSet: + BranchDiscoveryState.Suspended + else: + BranchDiscoveryState.Active + +proc start*(self: ref BranchDiscovery) = + doAssert self[].loopFuture == nil + info "Starting discovery of new branches" + self[].loopFuture = self.loop() + beacon_sync_branchdiscovery_state.set(self.state.ord().int64) + +proc stop*(self: ref BranchDiscovery) {.async: (raises: []).} = + if self[].loopFuture != nil: + info "Stopping discovery of new branches" + await self[].loopFuture.cancelAndWait() + self[].loopFuture = nil + beacon_sync_branchdiscovery_state.set(self.state.ord().int64) + +proc suspend*(self: ref BranchDiscovery) = + self[].isActive.clear() + beacon_sync_branchdiscovery_state.set(self.state.ord().int64) + +proc resume*(self: ref BranchDiscovery) = + self[].isActive.fire() + beacon_sync_branchdiscovery_state.set(self.state.ord().int64) diff --git a/beacon_chain/validators/beacon_validators.nim b/beacon_chain/validators/beacon_validators.nim index 076df7f27..7bce0f532 100644 --- a/beacon_chain/validators/beacon_validators.nim +++ b/beacon_chain/validators/beacon_validators.nim @@ -245,11 +245,59 @@ proc isSynced*(node: BeaconNode, head: BlockRef): bool = beaconTime = node.beaconClock.now() wallSlot = beaconTime.toSlot() + if not wallSlot.afterGenesis or + head.slot + node.config.syncHorizon >= wallSlot.slot: + node.dag.resetChainProgressWatchdog() + node.branchDiscovery.suspend() + return true + + if not node.config.splitViewsMerge: + # Continue syncing and wait for someone else to propose the next block + return false + + let + numPeers = len(node.network.peerPool) + minPeers = max(node.config.maxPeers div 4, SyncWorkersCount * 2) + if numPeers <= minPeers: + # We may have poor connectivity, wait until more peers are available. + # This could also be intermittent, as state replays while chain is degraded + # may take significant amounts of time, during which many peers are lost + node.branchDiscovery.suspend() + return false + + if node.dag.chainIsProgressing(): + # Chain is progressing, we are out of sync + node.branchDiscovery.resume() + return false + + # Network connectivity is good, but we have trouble making sync progress. + # Turn on branch discovery module until we have a recent canonical head. + # The branch discovery module specifically targets peers on alternate branches + # and supports sync manager in discovering branches that are not widely seen + # but that may still have weight from attestations. + if node.config.splitViewsMerge and + node.branchDiscovery.state == BranchDiscoveryState.Stopped: + node.branchDiscovery.start() + node.branchDiscovery.resume() + + let + maxHeadSlot = node.dag.heads.foldl(max(a, b.slot), GENESIS_SLOT) + numPeersWithHigherProgress = node.network.peerPool.peers + .countIt(it != nil and it.getHeadSlot() > maxHeadSlot) + significantNumPeers = node.config.maxPeers div 8 + if numPeersWithHigherProgress > significantNumPeers: + # A peer indicates that they are on a later slot, wait for sync manager + # to progress, or for it to kick the peer if they are faking the status + warn "Chain appears to have stalled, but peers indicate higher progress", + numPeersWithHigherProgress, numPeers, maxPeers = node.config.maxPeers, + head, maxHeadSlot + node.dag.resetChainProgressWatchdog() + return false + # TODO if everyone follows this logic, the network will not recover from a # halt: nobody will be producing blocks because everone expects someone # else to do it - not wallSlot.afterGenesis or - head.slot + node.config.syncHorizon >= wallSlot.slot + false proc handleLightClientUpdates*(node: BeaconNode, slot: Slot) {.async: (raises: [CancelledError]).} =