diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index e48bcfe7c..34f23eb00 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -25,7 +25,7 @@ import attestation_pool, sync_committee_msg_pool, validator_change_pool], ./spec/datatypes/[base, altair], ./spec/eth2_apis/dynamic_fee_recipients, - ./sync/[sync_manager, request_manager], + ./sync/[branch_discovery, sync_manager, request_manager], ./validators/[ action_tracker, message_router, validator_monitor, validator_pool, keystore_management], @@ -35,7 +35,7 @@ export osproc, chronos, presto, action_tracker, beacon_clock, beacon_chain_db, conf, light_client, attestation_pool, sync_committee_msg_pool, validator_change_pool, - eth2_network, el_manager, request_manager, sync_manager, + eth2_network, el_manager, branch_discovery, request_manager, sync_manager, eth2_processor, optimistic_processor, blockchain_dag, block_quarantine, base, message_router, validator_monitor, validator_pool, consensus_manager, dynamic_fee_recipients @@ -85,6 +85,7 @@ type requestManager*: RequestManager syncManager*: SyncManager[Peer, PeerId] backfiller*: SyncManager[Peer, PeerId] + branchDiscovery*: ref BranchDiscovery genesisSnapshotContent*: string processor*: ref Eth2Processor blockProcessor*: ref BlockProcessor diff --git a/beacon_chain/consensus_object_pools/block_clearance.nim b/beacon_chain/consensus_object_pools/block_clearance.nim index c50c7f400..89362ffd7 100644 --- a/beacon_chain/consensus_object_pools/block_clearance.nim +++ b/beacon_chain/consensus_object_pools/block_clearance.nim @@ -81,7 +81,6 @@ proc addResolvedHeadBlock( epochRef = dag.getEpochRef(state, cache) epochRefTick = Moment.now() - dag.resetChainProgressWatchdog() debug "Block resolved", blockRoot = shortLog(blockRoot), blck = shortLog(trustedBlock.message), diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index 14d32e2b2..b1824a3f2 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -2412,6 +2412,7 @@ proc updateHead*( quit 1 dag.head = newHead + dag.resetChainProgressWatchdog() if dag.headState.is_merge_transition_complete() and not lastHeadMergeComplete and diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index 0a3a8e1e5..92143f55f 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -55,7 +55,6 @@ const ## Number of slots from wall time that we start processing every payload type - BlobSidecars* = seq[ref BlobSidecar] BlockEntry = object blck*: ForkedSignedBeaconBlock blobs*: Opt[BlobSidecars] diff --git a/beacon_chain/networking/peer_protocol.nim b/beacon_chain/networking/peer_protocol.nim index 16348f621..ff47b17dd 100644 --- a/beacon_chain/networking/peer_protocol.nim +++ b/beacon_chain/networking/peer_protocol.nim @@ -219,6 +219,10 @@ proc updateStatus*(peer: Peer): Future[bool] {.async: (raises: [CancelledError]) await peer.handleStatus(nstate, theirStatus) +proc getHeadRoot*(peer: Peer): Eth2Digest = + ## Returns head root for specific peer ``peer``. + peer.state(PeerSync).statusMsg.headRoot + proc getHeadSlot*(peer: Peer): Slot = ## Returns head slot for specific peer ``peer``. peer.state(PeerSync).statusMsg.headSlot diff --git a/beacon_chain/networking/peer_scores.nim b/beacon_chain/networking/peer_scores.nim index 35fcaf965..c6f2c10bd 100644 --- a/beacon_chain/networking/peer_scores.nim +++ b/beacon_chain/networking/peer_scores.nim @@ -29,6 +29,8 @@ const ## Peer's `status` answer is fine. PeerScoreNoValues* = -100 ## Peer did not respond in time to a request. + PeerScoreGoodBatchValue* = 5 + ## Individual portion of peer's multi-step answer is fine. PeerScoreGoodValues* = 100 ## Peer's answer to our request is fine. PeerScoreBadValues* = -1000 diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 120bd8648..e6c6ad21b 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -367,6 +367,9 @@ proc initFullNode( func getFrontfillSlot(): Slot = max(dag.frontfill.get(BlockId()).slot, dag.horizon) + func isBlockKnown(blockRoot: Eth2Digest): bool = + dag.getBlockRef(blockRoot).isSome + let quarantine = newClone( Quarantine.init()) @@ -398,6 +401,13 @@ proc initFullNode( # that should probably be reimagined more holistically in the future. blockProcessor[].addBlock( MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized) + branchDiscoveryBlockVerifier = proc( + signedBlock: ForkedSignedBeaconBlock, + blobs: Opt[BlobSidecars] + ): Future[Result[void, VerifierError]] {.async: (raises: [ + CancelledError], raw: true).} = + blockProcessor[].addBlock( + MsgSource.gossip, signedBlock, blobs, maybeFinalized = false) rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} = @@ -448,6 +458,9 @@ proc initFullNode( getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, getFrontfillSlot, dag.backfill.slot, blockVerifier, maxHeadAge = 0) + branchDiscovery = BranchDiscovery.new( + node.network, getFirstSlotAtFinalizedEpoch, isBlockKnown, + branchDiscoveryBlockVerifier) router = (ref MessageRouter)( processor: processor, network: node.network) @@ -490,6 +503,7 @@ proc initFullNode( node.requestManager = requestManager node.syncManager = syncManager node.backfiller = backfiller + node.branchDiscovery = branchDiscovery node.router = router await node.addValidators() @@ -1596,6 +1610,10 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} = await node.updateGossipStatus(slot + 1) + # Branch discovery module is only used to support ongoing sync manager tasks + if not node.syncManager.inProgress: + await node.branchDiscovery.stop() + func formatNextConsensusFork( node: BeaconNode, withVanityArt = false): Opt[string] = let consensusFork = @@ -1615,6 +1633,14 @@ func syncStatus(node: BeaconNode, wallSlot: Slot): string = let optimisticHead = not node.dag.head.executionValid if node.syncManager.inProgress: let + degradedSuffix = + case node.branchDiscovery.state + of BranchDiscoveryState.Active: + "/discovering" + of BranchDiscoveryState.Suspended: + "/degraded" + of BranchDiscoveryState.Stopped: + "" optimisticSuffix = if optimisticHead: "/opt" @@ -1637,7 +1663,7 @@ func syncStatus(node: BeaconNode, wallSlot: Slot): string = formatFloat(progress, ffDecimal, precision = 2) & "%" else: "" - node.syncManager.syncStatus & optimisticSuffix & + node.syncManager.syncStatus & degradedSuffix & optimisticSuffix & lightClientSuffix & catchingUpSuffix elif node.backfiller.inProgress: "backfill: " & node.backfiller.syncStatus diff --git a/beacon_chain/spec/datatypes/deneb.nim b/beacon_chain/spec/datatypes/deneb.nim index 8c9b149d5..1aa0e248f 100644 --- a/beacon_chain/spec/datatypes/deneb.nim +++ b/beacon_chain/spec/datatypes/deneb.nim @@ -64,6 +64,7 @@ type signed_block_header*: SignedBeaconBlockHeader kzg_commitment_inclusion_proof*: array[KZG_COMMITMENT_INCLUSION_PROOF_DEPTH, Eth2Digest] + BlobSidecars* = seq[ref BlobSidecar] # https://github.com/ethereum/beacon-APIs/blob/4882aa0803b622b75bab286b285599d70b7a2429/apis/eventstream/index.yaml#L138-L142 # Spec object, not only internal, because it gets serialized out for the diff --git a/beacon_chain/sync/branch_discovery.nim b/beacon_chain/sync/branch_discovery.nim new file mode 100644 index 000000000..fd950f835 --- /dev/null +++ b/beacon_chain/sync/branch_discovery.nim @@ -0,0 +1,292 @@ +# beacon_chain +# Copyright (c) 2024 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +# This module is started when a chain stall is detected, i.e., +# a long period without any chain progress, while at the same time being +# connected to a healthy number of different peers. +# +# In such a scenario, the network may partition into multiple groups of peers +# that build on separate branches. It is vital to specifically target peers +# from different partitions and download branches which are not necessarily +# popularly served. Branches may be unpopular because they are expensive to +# apply, and may have a difficult time propagating despite a large weight in +# attestations backing them. This only exacerbates the longer the split view +# scenario is ongoing. +# +# While sync manager can sync popular chains well, it cannot reliably sync +# minority chains only served by a limited number of peers. This module +# augments sync manager in the split view scenario. +# Note that request manager is not running while sync manager is running. +# +# Once both sync manager and branch discovery stopped resolving new blocks, +# `syncStatus` will report `ChainSyncStatus.Degraded` and `blockchain_dag` state +# will be gradually advanced to the wall slot to prepare for block proposal. +# If at that time, no additional branches were discovered, validator duties +# will be performed based on local fork choice. +# +# Note that the canonical chain may not be on the highest slot number, +# as some partitions of the network may have built on top of branches +# with lower validator support while the canonical chain was not visible. + +import + std/algorithm, + chronos, chronicles, metrics, results, + ../spec/[forks, network], + ../consensus_object_pools/block_pools_types, + ../networking/[eth2_network, peer_pool], + ./sync_protocol + +logScope: + topics = "branchdiscovery" + +declareGauge beacon_sync_branchdiscovery_state, + "Branch discovery module operating state" + +declareCounter beacon_sync_branchdiscovery_discovered_blocks, + "Number of beacon blocks discovered by the branch discovery module" + +type + BranchDiscoveryState* {.pure.} = enum + Stopped, + Suspended, + Active + + GetSlotCallback* = + proc(): Slot {.gcsafe, raises: [].} + + IsBlockKnownCallback* = + proc(blockRoot: Eth2Digest): bool {.gcsafe, raises: [].} + + BlockVerifierCallback* = proc( + signedBlock: ForkedSignedBeaconBlock, + blobs: Opt[BlobSidecars] + ): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} + + BranchDiscovery* = object + network: Eth2Node + getFinalizedSlot: GetSlotCallback + isBlockKnown: IsBlockKnownCallback + blockVerifier: BlockVerifierCallback + isActive: AsyncEvent + loopFuture: Future[void].Raising([]) + +proc new*( + T: type BranchDiscovery, + network: Eth2Node, + getFinalizedSlot: GetSlotCallback, + isBlockKnown: IsBlockKnownCallback, + blockVerifier: BlockVerifierCallback): ref BranchDiscovery = + let self = (ref BranchDiscovery)( + network: network, + getFinalizedSlot: getFinalizedSlot, + isBlockKnown: isBlockKnown, + blockVerifier: blockVerifier, + isActive: newAsyncEvent()) + self[].isActive.fire() + self + +proc discoverBranch( + self: BranchDiscovery, peer: Peer) {.async: (raises: [CancelledError]).} = + logScope: + peer + peer_score = peer.getScore() + + let + finalizedSlot = self.getFinalizedSlot() + peerHeadSlot = peer.getHeadSlot() + if peerHeadSlot <= finalizedSlot: + peer.updateScore(PeerScoreUseless) + debug "Peer's head slot is already finalized", peerHeadSlot, finalizedSlot + return + + var blockRoot = peer.getHeadRoot() + logScope: blockRoot + if self.isBlockKnown(blockRoot): + peer.updateScore(PeerScoreUseless) + debug "Peer's head block root is already known" + return + + const + maxRequestsPerBurst = 20 + burstDuration = chronos.seconds(40) + let bucket = TokenBucket.new(maxRequestsPerBurst, burstDuration) + + var parentSlot = peerHeadSlot + 1 + logScope: parentSlot + while true: + if self.isBlockKnown(blockRoot): + debug "Branch from peer no longer unknown" + return + if peer.getScore() < PeerScoreLowLimit: + debug "Failed to discover new branch from peer" + return + + debug "Discovering new branch from peer" + try: + await bucket.consume(1) + except CancelledError as exc: + raise exc + except CatchableError as exc: + raiseAssert "TokenBucket.consume should not fail: " & $exc.msg + let rsp = await peer.beaconBlocksByRoot_v2(BlockRootsList @[blockRoot]) + if rsp.isErr: + # `eth2_network` already descored according to the specific error + debug "Failed to receive block", err = rsp.error + await sleepAsync(RESP_TIMEOUT_DUR) + continue + template blocks: untyped = rsp.get + + # The peer was the one providing us with this block root, it should exist + if blocks.len == 0: + peer.updateScore(PeerScoreNoValues) + debug "Received no blocks", numBlocks = blocks.len + await sleepAsync(RESP_TIMEOUT_DUR) + continue + if blocks.len > 1: + peer.updateScore(PeerScoreBadResponse) + debug "Received too many blocks", numBlocks = blocks.len + return + template blck: untyped = blocks[0][] + if blck.slot >= parentSlot: + peer.updateScore(PeerScoreBadResponse) + debug "Received block older than parent", receivedSlot = blck.slot + return + if blck.root != blockRoot: + peer.updateScore(PeerScoreBadResponse) + debug "Received incorrect block", receivedRoot = blck.root + return + + var blobIds: seq[BlobIdentifier] + withBlck(blck): + when consensusFork >= ConsensusFork.Deneb: + for i in 0 ..< forkyBlck.message.body.blob_kzg_commitments.len: + blobIds.add BlobIdentifier(block_root: blockRoot, index: i.BlobIndex) + var blobs: Opt[BlobSidecars] + if blobIds.len > 0: + while true: + if self.isBlockKnown(blockRoot): + debug "Branch from peer no longer unknown" + return + if peer.getScore() < PeerScoreLowLimit: + debug "Failed to discover new branch from peer" + return + + try: + await bucket.consume(1) + except CancelledError as exc: + raise exc + except CatchableError as exc: + raiseAssert "TokenBucket.consume should not fail: " & $exc.msg + let r = await peer.blobSidecarsByRoot(BlobIdentifierList blobIds) + if r.isErr: + # `eth2_network` already descored according to the specific error + debug "Failed to receive blobs", err = r.error + await sleepAsync(RESP_TIMEOUT_DUR) + continue + template blobSidecars: untyped = r.unsafeGet + + if blobSidecars.len < blobIds.len: + peer.updateScore(PeerScoreMissingValues) + debug "Received not all blobs", + numBlobs = blobSidecars.len, expectedNumBlobs = blobIds.len + await sleepAsync(RESP_TIMEOUT_DUR) + continue + if blobSidecars.len > blobIds.len: + peer.updateScore(PeerScoreBadResponse) + debug "Received too many blobs", + numBlobs = blobSidecars.len, expectedNumBlobs = blobIds.len + return + for i, blobSidecar in blobSidecars: + let root = hash_tree_root(blobSidecar[].signed_block_header.message) + if root != blockRoot: + peer.updateScore(PeerScoreBadResponse) + debug "Received unexpected blob" + return + blobSidecar[].verify_blob_sidecar_inclusion_proof().isOkOr: + peer.updateScore(PeerScoreBadResponse) + debug "Received invalid blob" + return + blobs = Opt.some distinctBase(blobSidecars).sortedByIt(it.index) + for i, blobSidecar in blobs.get: + if blobSidecar[].index != i.BlobIndex: + peer.updateScore(PeerScoreBadResponse) + debug "Received duplicate blobs while others are missing" + return + break + + let err = (await self.blockVerifier(blck, blobs)).errorOr: + peer.updateScore(PeerScoreGoodBatchValue + PeerScoreGoodValues) + beacon_sync_branchdiscovery_discovered_blocks.inc() + info "Discovered new branch from peer" + break + case err + of VerifierError.Invalid: + peer.updateScore(PeerScoreBadResponse) + debug "Received invalid block" + return + of VerifierError.UnviableFork: + peer.updateScore(PeerScoreUnviableFork) + debug "Received unviable block" + return + of VerifierError.Duplicate: + peer.updateScore(PeerScoreGoodValues) + debug "Connected new branch from peer" + break + of VerifierError.MissingParent: + peer.updateScore(PeerScoreGoodBatchValue) + parentSlot = blck.slot + blockRoot = blck.getForkedBlockField(parent_root) + continue + +proc loop(self: ref BranchDiscovery) {.async: (raises: []).} = + try: + while true: + await self[].isActive.wait() + await sleepAsync(RESP_TIMEOUT_DUR) + + let peer = + try: + self[].network.peerPool.acquireNoWait() + except PeerPoolError as exc: + debug "Failed to acquire peer", exc = exc.msg + continue + defer: self[].network.peerPool.release(peer) + + await self[].discoverBranch(peer) + except CancelledError: + return + +func state*(self: ref BranchDiscovery): BranchDiscoveryState = + if self[].loopFuture == nil: + BranchDiscoveryState.Stopped + elif not self[].isActive.isSet: + BranchDiscoveryState.Suspended + else: + BranchDiscoveryState.Active + +proc start*(self: ref BranchDiscovery) = + doAssert self[].loopFuture == nil + info "Starting discovery of new branches" + self[].loopFuture = self.loop() + beacon_sync_branchdiscovery_state.set(self.state.ord().int64) + +proc stop*(self: ref BranchDiscovery) {.async: (raises: []).} = + if self[].loopFuture != nil: + info "Stopping discovery of new branches" + await self[].loopFuture.cancelAndWait() + self[].loopFuture = nil + beacon_sync_branchdiscovery_state.set(self.state.ord().int64) + +proc suspend*(self: ref BranchDiscovery) = + self[].isActive.clear() + beacon_sync_branchdiscovery_state.set(self.state.ord().int64) + +proc resume*(self: ref BranchDiscovery) = + self[].isActive.fire() + beacon_sync_branchdiscovery_state.set(self.state.ord().int64) diff --git a/beacon_chain/validators/beacon_validators.nim b/beacon_chain/validators/beacon_validators.nim index 5bd28e938..268e84c60 100644 --- a/beacon_chain/validators/beacon_validators.nim +++ b/beacon_chain/validators/beacon_validators.nim @@ -250,19 +250,31 @@ proc syncStatus*(node: BeaconNode, head: BlockRef): ChainSyncStatus = if not wallSlot.afterGenesis or head.slot + node.config.syncHorizon >= wallSlot.slot: node.dag.resetChainProgressWatchdog() + node.branchDiscovery.suspend() return ChainSyncStatus.Synced - if node.dag.chainIsProgressing(): - # Chain is progressing, we are out of sync - return ChainSyncStatus.Syncing - let numPeers = len(node.network.peerPool) if numPeers <= node.config.maxPeers div 4: # We may have poor connectivity, wait until more peers are available. # This could also be intermittent, as state replays while chain is degraded # may take significant amounts of time, during which many peers are lost + node.branchDiscovery.suspend() return ChainSyncStatus.Syncing + if node.dag.chainIsProgressing(): + # Chain is progressing, we are out of sync + node.branchDiscovery.resume() + return ChainSyncStatus.Syncing + + # Network connectivity is good, but we have trouble making sync progress. + # Turn on branch discovery module until we have a recent canonical head. + # The branch discovery module specifically targets peers on alternate branches + # and supports sync manager in discovering branches that are not widely seen + # but that may still have weight from attestations. + if node.branchDiscovery.state == BranchDiscoveryState.Stopped: + node.branchDiscovery.start() + node.branchDiscovery.resume() + let maxHeadSlot = node.dag.heads.foldl(max(a, b.slot), GENESIS_SLOT) numPeersWithHigherProgress = node.network.peerPool.peers