diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index e48bcfe7c..34f23eb00 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -25,7 +25,7 @@ import attestation_pool, sync_committee_msg_pool, validator_change_pool], ./spec/datatypes/[base, altair], ./spec/eth2_apis/dynamic_fee_recipients, - ./sync/[sync_manager, request_manager], + ./sync/[branch_discovery, sync_manager, request_manager], ./validators/[ action_tracker, message_router, validator_monitor, validator_pool, keystore_management], @@ -35,7 +35,7 @@ export osproc, chronos, presto, action_tracker, beacon_clock, beacon_chain_db, conf, light_client, attestation_pool, sync_committee_msg_pool, validator_change_pool, - eth2_network, el_manager, request_manager, sync_manager, + eth2_network, el_manager, branch_discovery, request_manager, sync_manager, eth2_processor, optimistic_processor, blockchain_dag, block_quarantine, base, message_router, validator_monitor, validator_pool, consensus_manager, dynamic_fee_recipients @@ -85,6 +85,7 @@ type requestManager*: RequestManager syncManager*: SyncManager[Peer, PeerId] backfiller*: SyncManager[Peer, PeerId] + branchDiscovery*: ref BranchDiscovery genesisSnapshotContent*: string processor*: ref Eth2Processor blockProcessor*: ref BlockProcessor diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index a2d8a1d30..177e56779 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -584,6 +584,12 @@ type defaultValue: true name: "doppelganger-detection" .}: bool + splitViewsMerge* {. + hidden + desc: "Whether or not to try and discover unknown branches in situations where the network has partitioned into split views" + defaultValue: false + name: "debug-split-views-merge" .}: bool + syncHorizon* {. hidden desc: "Number of empty slots to process before considering the client out of sync. Defaults to the number of slots in 10 minutes" diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 41fde76ed..7b04acc56 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -367,6 +367,9 @@ proc initFullNode( func getFrontfillSlot(): Slot = max(dag.frontfill.get(BlockId()).slot, dag.horizon) + func isBlockKnown(blockRoot: Eth2Digest): bool = + dag.getBlockRef(blockRoot).isSome + let quarantine = newClone( Quarantine.init()) @@ -398,6 +401,13 @@ proc initFullNode( # that should probably be reimagined more holistically in the future. blockProcessor[].addBlock( MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized) + branchDiscoveryBlockVerifier = proc( + signedBlock: ForkedSignedBeaconBlock, + blobs: Opt[BlobSidecars] + ): Future[Result[void, VerifierError]] {.async: (raises: [ + CancelledError], raw: true).} = + blockProcessor[].addBlock( + MsgSource.gossip, signedBlock, blobs, maybeFinalized = false) rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} = @@ -448,6 +458,9 @@ proc initFullNode( getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, getFrontfillSlot, dag.backfill.slot, blockVerifier, maxHeadAge = 0) + branchDiscovery = BranchDiscovery.new( + node.network, getFirstSlotAtFinalizedEpoch, isBlockKnown, + branchDiscoveryBlockVerifier) router = (ref MessageRouter)( processor: processor, network: node.network) @@ -490,6 +503,7 @@ proc initFullNode( node.requestManager = requestManager node.syncManager = syncManager node.backfiller = backfiller + node.branchDiscovery = branchDiscovery node.router = router await node.addValidators() @@ -1604,6 +1618,10 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} = await node.updateGossipStatus(slot + 1) + # Branch discovery module is only used to support ongoing sync manager tasks + if not node.syncManager.inProgress: + await node.branchDiscovery.stop() + func formatNextConsensusFork( node: BeaconNode, withVanityArt = false): Opt[string] = let consensusFork = @@ -1623,6 +1641,14 @@ func syncStatus(node: BeaconNode, wallSlot: Slot): string = let optimisticHead = not node.dag.head.executionValid if node.syncManager.inProgress: let + degradedSuffix = + case node.branchDiscovery.state + of BranchDiscoveryState.Active: + "/discovering" + of BranchDiscoveryState.Suspended: + "/degraded" + of BranchDiscoveryState.Stopped: + "" optimisticSuffix = if optimisticHead: "/opt" @@ -1645,7 +1671,7 @@ func syncStatus(node: BeaconNode, wallSlot: Slot): string = formatFloat(progress, ffDecimal, precision = 2) & "%" else: "" - node.syncManager.syncStatus & optimisticSuffix & + node.syncManager.syncStatus & degradedSuffix & optimisticSuffix & lightClientSuffix & catchingUpSuffix elif node.backfiller.inProgress: "backfill: " & node.backfiller.syncStatus diff --git a/beacon_chain/sync/branch_discovery.nim b/beacon_chain/sync/branch_discovery.nim new file mode 100644 index 000000000..4a6545bf4 --- /dev/null +++ b/beacon_chain/sync/branch_discovery.nim @@ -0,0 +1,286 @@ +# beacon_chain +# Copyright (c) 2024 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +# This module is started when a chain stall is detected, i.e., +# a long period without any chain progress, while at the same time being +# connected to a healthy number of different peers. +# +# In such a scenario, the network may partition into multiple groups of peers +# that build on separate branches. It is vital to specifically target peers +# from different partitions and download branches which are not necessarily +# popularly served. Branches may be unpopular because they are expensive to +# apply, and may have a difficult time propagating despite a large weight in +# attestations backing them. This only exacerbates the longer the split view +# scenario is ongoing. +# +# While sync manager can sync popular chains well, it cannot reliably sync +# minority chains only served by a limited number of peers. This module +# augments sync manager in the split view scenario. +# Note that request manager is not running while sync manager is running. +# +# Note that the canonical chain may not be on the highest slot number, +# as some partitions of the network may have built on top of branches +# with lower validator support while the canonical chain was not visible. + +import + std/algorithm, + chronos, chronicles, metrics, results, + ../spec/[forks, network], + ../consensus_object_pools/block_pools_types, + ../networking/[eth2_network, peer_pool], + ./sync_protocol + +logScope: + topics = "branchdiscovery" + +declareGauge beacon_sync_branchdiscovery_state, + "Branch discovery module operating state" + +declareCounter beacon_sync_branchdiscovery_discovered_blocks, + "Number of beacon blocks discovered by the branch discovery module" + +type + BranchDiscoveryState* {.pure.} = enum + Stopped, + Suspended, + Active + + GetSlotCallback* = + proc(): Slot {.gcsafe, raises: [].} + + IsBlockKnownCallback* = + proc(blockRoot: Eth2Digest): bool {.gcsafe, raises: [].} + + BlockVerifierCallback* = proc( + signedBlock: ForkedSignedBeaconBlock, + blobs: Opt[BlobSidecars] + ): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} + + BranchDiscovery* = object + network: Eth2Node + getFinalizedSlot: GetSlotCallback + isBlockKnown: IsBlockKnownCallback + blockVerifier: BlockVerifierCallback + isActive: AsyncEvent + loopFuture: Future[void].Raising([]) + +proc new*( + T: type BranchDiscovery, + network: Eth2Node, + getFinalizedSlot: GetSlotCallback, + isBlockKnown: IsBlockKnownCallback, + blockVerifier: BlockVerifierCallback): ref BranchDiscovery = + let self = (ref BranchDiscovery)( + network: network, + getFinalizedSlot: getFinalizedSlot, + isBlockKnown: isBlockKnown, + blockVerifier: blockVerifier, + isActive: newAsyncEvent()) + self[].isActive.fire() + self + +proc discoverBranch( + self: BranchDiscovery, peer: Peer) {.async: (raises: [CancelledError]).} = + logScope: + peer + peer_score = peer.getScore() + + let + finalizedSlot = self.getFinalizedSlot() + peerHeadSlot = peer.getHeadSlot() + if peerHeadSlot <= finalizedSlot: + peer.updateScore(PeerScoreUseless) + debug "Peer's head slot is already finalized", peerHeadSlot, finalizedSlot + return + + var blockRoot = peer.getHeadRoot() + logScope: blockRoot + if self.isBlockKnown(blockRoot): + peer.updateScore(PeerScoreUseless) + debug "Peer's head block root is already known" + return + + const + maxRequestsPerBurst = 20 + burstDuration = chronos.seconds(40) + let bucket = TokenBucket.new(maxRequestsPerBurst, burstDuration) + + var parentSlot = peerHeadSlot + 1 + logScope: parentSlot + while true: + if self.isBlockKnown(blockRoot): + debug "Branch from peer no longer unknown" + return + if peer.getScore() < PeerScoreLowLimit: + debug "Failed to discover new branch from peer" + return + + debug "Discovering new branch from peer" + try: + await bucket.consume(1) + except CancelledError as exc: + raise exc + except CatchableError as exc: + raiseAssert "TokenBucket.consume should not fail: " & $exc.msg + let rsp = await peer.beaconBlocksByRoot_v2(BlockRootsList @[blockRoot]) + if rsp.isErr: + # `eth2_network` already descored according to the specific error + debug "Failed to receive block", err = rsp.error + await sleepAsync(RESP_TIMEOUT_DUR) + continue + template blocks: untyped = rsp.get + + # The peer was the one providing us with this block root, it should exist + if blocks.len == 0: + peer.updateScore(PeerScoreNoValues) + debug "Received no blocks", numBlocks = blocks.len + await sleepAsync(RESP_TIMEOUT_DUR) + continue + if blocks.len > 1: + peer.updateScore(PeerScoreBadResponse) + debug "Received too many blocks", numBlocks = blocks.len + return + template blck: untyped = blocks[0][] + if blck.slot >= parentSlot: + peer.updateScore(PeerScoreBadResponse) + debug "Received block older than parent", receivedSlot = blck.slot + return + if blck.root != blockRoot: + peer.updateScore(PeerScoreBadResponse) + debug "Received incorrect block", receivedRoot = blck.root + return + + var blobIds: seq[BlobIdentifier] + withBlck(blck): + when consensusFork >= ConsensusFork.Deneb: + for i in 0 ..< forkyBlck.message.body.blob_kzg_commitments.len: + blobIds.add BlobIdentifier(block_root: blockRoot, index: i.BlobIndex) + var blobs: Opt[BlobSidecars] + if blobIds.len > 0: + while true: + if self.isBlockKnown(blockRoot): + debug "Branch from peer no longer unknown" + return + if peer.getScore() < PeerScoreLowLimit: + debug "Failed to discover new branch from peer" + return + + try: + await bucket.consume(1) + except CancelledError as exc: + raise exc + except CatchableError as exc: + raiseAssert "TokenBucket.consume should not fail: " & $exc.msg + let r = await peer.blobSidecarsByRoot(BlobIdentifierList blobIds) + if r.isErr: + # `eth2_network` already descored according to the specific error + debug "Failed to receive blobs", err = r.error + await sleepAsync(RESP_TIMEOUT_DUR) + continue + template blobSidecars: untyped = r.unsafeGet + + if blobSidecars.len < blobIds.len: + peer.updateScore(PeerScoreMissingValues) + debug "Received not all blobs", + numBlobs = blobSidecars.len, expectedNumBlobs = blobIds.len + await sleepAsync(RESP_TIMEOUT_DUR) + continue + if blobSidecars.len > blobIds.len: + peer.updateScore(PeerScoreBadResponse) + debug "Received too many blobs", + numBlobs = blobSidecars.len, expectedNumBlobs = blobIds.len + return + for i, blobSidecar in blobSidecars: + let root = hash_tree_root(blobSidecar[].signed_block_header.message) + if root != blockRoot: + peer.updateScore(PeerScoreBadResponse) + debug "Received unexpected blob" + return + blobSidecar[].verify_blob_sidecar_inclusion_proof().isOkOr: + peer.updateScore(PeerScoreBadResponse) + debug "Received invalid blob" + return + blobs = Opt.some distinctBase(blobSidecars).sortedByIt(it.index) + for i, blobSidecar in blobs.get: + if blobSidecar[].index != i.BlobIndex: + peer.updateScore(PeerScoreBadResponse) + debug "Received duplicate blobs while others are missing" + return + break + + let err = (await self.blockVerifier(blck, blobs)).errorOr: + peer.updateScore(PeerScoreGoodBatchValue + PeerScoreGoodValues) + beacon_sync_branchdiscovery_discovered_blocks.inc() + info "Discovered new branch from peer" + break + case err + of VerifierError.Invalid: + peer.updateScore(PeerScoreBadResponse) + debug "Received invalid block" + return + of VerifierError.UnviableFork: + peer.updateScore(PeerScoreUnviableFork) + debug "Received unviable block" + return + of VerifierError.Duplicate: + peer.updateScore(PeerScoreGoodValues) + debug "Connected new branch from peer" + break + of VerifierError.MissingParent: + peer.updateScore(PeerScoreGoodBatchValue) + parentSlot = blck.slot + blockRoot = blck.getForkedBlockField(parent_root) + continue + +proc loop(self: ref BranchDiscovery) {.async: (raises: []).} = + try: + while true: + await self[].isActive.wait() + await sleepAsync(RESP_TIMEOUT_DUR) + + let peer = + try: + self[].network.peerPool.acquireNoWait() + except PeerPoolError as exc: + debug "Failed to acquire peer", exc = exc.msg + continue + defer: self[].network.peerPool.release(peer) + + await self[].discoverBranch(peer) + except CancelledError: + return + +func state*(self: ref BranchDiscovery): BranchDiscoveryState = + if self[].loopFuture == nil: + BranchDiscoveryState.Stopped + elif not self[].isActive.isSet: + BranchDiscoveryState.Suspended + else: + BranchDiscoveryState.Active + +proc start*(self: ref BranchDiscovery) = + doAssert self[].loopFuture == nil + info "Starting discovery of new branches" + self[].loopFuture = self.loop() + beacon_sync_branchdiscovery_state.set(self.state.ord().int64) + +proc stop*(self: ref BranchDiscovery) {.async: (raises: []).} = + if self[].loopFuture != nil: + info "Stopping discovery of new branches" + await self[].loopFuture.cancelAndWait() + self[].loopFuture = nil + beacon_sync_branchdiscovery_state.set(self.state.ord().int64) + +proc suspend*(self: ref BranchDiscovery) = + self[].isActive.clear() + beacon_sync_branchdiscovery_state.set(self.state.ord().int64) + +proc resume*(self: ref BranchDiscovery) = + self[].isActive.fire() + beacon_sync_branchdiscovery_state.set(self.state.ord().int64) diff --git a/beacon_chain/validators/beacon_validators.nim b/beacon_chain/validators/beacon_validators.nim index a393b671b..109cea6d1 100644 --- a/beacon_chain/validators/beacon_validators.nim +++ b/beacon_chain/validators/beacon_validators.nim @@ -250,9 +250,10 @@ proc syncStatus*(node: BeaconNode, head: BlockRef): ChainSyncStatus = if not wallSlot.afterGenesis or head.slot + node.config.syncHorizon >= wallSlot.slot: node.dag.resetChainProgressWatchdog() + node.branchDiscovery.suspend() return ChainSyncStatus.Synced - if not node.config.proposeStale: + if not node.config.proposeStale and not node.config.splitViewsMerge: # Continue syncing and wait for someone else to propose the next block return ChainSyncStatus.Syncing @@ -263,12 +264,24 @@ proc syncStatus*(node: BeaconNode, head: BlockRef): ChainSyncStatus = # We may have poor connectivity, wait until more peers are available. # This could also be intermittent, as state replays while chain is degraded # may take significant amounts of time, during which many peers are lost + node.branchDiscovery.suspend() return ChainSyncStatus.Syncing if node.dag.chainIsProgressing(): # Chain is progressing, we are out of sync + node.branchDiscovery.resume() return ChainSyncStatus.Syncing + # Network connectivity is good, but we have trouble making sync progress. + # Turn on branch discovery module until we have a recent canonical head. + # The branch discovery module specifically targets peers on alternate branches + # and supports sync manager in discovering branches that are not widely seen + # but that may still have weight from attestations. + if node.config.splitViewsMerge and + node.branchDiscovery.state == BranchDiscoveryState.Stopped: + node.branchDiscovery.start() + node.branchDiscovery.resume() + let maxHeadSlot = node.dag.heads.foldl(max(a, b.slot), GENESIS_SLOT) numPeersWithHigherProgress = node.network.peerPool.peers @@ -285,6 +298,8 @@ proc syncStatus*(node: BeaconNode, head: BlockRef): ChainSyncStatus = # We are on the latest slot among all of our peers, and there has been no # chain progress for an extended period of time. + if not node.config.proposeStale: + return ChainSyncStatus.Syncing if node.dag.incrementalState == nil: # The head state is too far in the past to timely perform validator duties return ChainSyncStatus.Degraded