From 2dbe24c7404e8d770f1b8b119fa8919e724943ef Mon Sep 17 00:00:00 2001 From: Etan Kissling Date: Mon, 25 Mar 2024 19:09:31 +0100 Subject: [PATCH] move split view catchup to research branch (#6133) Using a dedicated branch for researching the effectiveness of split view scenario handling simplifies testing and avoids having partial work on `unstable`. If we want, we can reintroduce it under a `--debug` flag at a later time. But for now, Goerli is a rare opoprtunity to test this, maybe just for another week or so. - https://github.com/status-im/infra-nimbus/pull/179 --- beacon_chain/beacon_node.nim | 5 +- .../block_pools_types.nim | 7 - .../consensus_object_pools/blockchain_dag.nim | 33 +- beacon_chain/nimbus_beacon_node.nim | 75 +---- beacon_chain/sync/branch_discovery.nim | 292 ------------------ beacon_chain/validators/beacon_validators.nim | 90 +----- 6 files changed, 22 insertions(+), 480 deletions(-) delete mode 100644 beacon_chain/sync/branch_discovery.nim diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index 34f23eb00..e48bcfe7c 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -25,7 +25,7 @@ import attestation_pool, sync_committee_msg_pool, validator_change_pool], ./spec/datatypes/[base, altair], ./spec/eth2_apis/dynamic_fee_recipients, - ./sync/[branch_discovery, sync_manager, request_manager], + ./sync/[sync_manager, request_manager], ./validators/[ action_tracker, message_router, validator_monitor, validator_pool, keystore_management], @@ -35,7 +35,7 @@ export osproc, chronos, presto, action_tracker, beacon_clock, beacon_chain_db, conf, light_client, attestation_pool, sync_committee_msg_pool, validator_change_pool, - eth2_network, el_manager, branch_discovery, request_manager, sync_manager, + eth2_network, el_manager, request_manager, sync_manager, eth2_processor, optimistic_processor, blockchain_dag, block_quarantine, base, message_router, validator_monitor, validator_pool, consensus_manager, dynamic_fee_recipients @@ -85,7 +85,6 @@ type requestManager*: RequestManager syncManager*: SyncManager[Peer, PeerId] backfiller*: SyncManager[Peer, PeerId] - branchDiscovery*: ref BranchDiscovery genesisSnapshotContent*: string processor*: ref Eth2Processor blockProcessor*: ref BlockProcessor diff --git a/beacon_chain/consensus_object_pools/block_pools_types.nim b/beacon_chain/consensus_object_pools/block_pools_types.nim index 57be79088..11a3806a6 100644 --- a/beacon_chain/consensus_object_pools/block_pools_types.nim +++ b/beacon_chain/consensus_object_pools/block_pools_types.nim @@ -202,17 +202,10 @@ type ## Cached state used during block clearance - must only be used in ## clearance module - incrementalState*: ref ForkedHashedBeaconState - ## State used for intermittent results of expensive computations that - ## may take minutes - is only used if unavoidable, and nil otherwise - updateFlags*: UpdateFlags cfg*: RuntimeConfig - lastChainProgress*: Moment - ## Indicates the last wall time at which meaningful progress was made - shufflingRefs*: LRUCache[16, ShufflingRef] epochRefs*: LRUCache[32, EpochRef] diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index b1824a3f2..56e49e81a 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -236,8 +236,6 @@ func getBlockIdAtSlot*(dag: ChainDAGRef, slot: Slot): Opt[BlockSlotId] = tryWithState dag.headState tryWithState dag.epochRefState tryWithState dag.clearanceState - if dag.incrementalState != nil: - tryWithState dag.incrementalState[] # Fallback to database, this only works for backfilled blocks let finlow = dag.db.finalizedBlocks.low.expect("at least tailRef written") @@ -1006,14 +1004,6 @@ proc applyBlock( ok() -proc resetChainProgressWatchdog*(dag: ChainDAGRef) = - dag.lastChainProgress = Moment.now() - dag.incrementalState = nil - -proc chainIsProgressing*(dag: ChainDAGRef): bool = - const watchdogDuration = chronos.minutes(60) - dag.lastChainProgress + watchdogDuration >= Moment.now() - proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, validatorMonitor: ref ValidatorMonitor, updateFlags: UpdateFlags, eraPath = ".", @@ -1054,7 +1044,6 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, # allow skipping some validation. updateFlags: updateFlags * {strictVerification}, cfg: cfg, - lastChainProgress: Moment.now(), vanityLogs: vanityLogs, @@ -1517,8 +1506,6 @@ proc computeRandaoMixFromMemory*( tryWithState dag.headState tryWithState dag.epochRefState tryWithState dag.clearanceState - if dag.incrementalState != nil: - tryWithState dag.incrementalState[] proc computeRandaoMixFromDatabase*( dag: ChainDAGRef, bid: BlockId, lowSlot: Slot): Opt[Eth2Digest] = @@ -1590,8 +1577,6 @@ proc computeShufflingRefFromMemory*( tryWithState dag.headState tryWithState dag.epochRefState tryWithState dag.clearanceState - if dag.incrementalState != nil: - tryWithState dag.incrementalState[] proc getShufflingRef*( dag: ChainDAGRef, blck: BlockRef, epoch: Epoch, @@ -1739,10 +1724,6 @@ proc updateState*( elif exactMatch(dag.epochRefState, bsi): assign(state, dag.epochRefState) found = true - elif dag.incrementalState != nil and - exactMatch(dag.incrementalState[], bsi): - assign(state, dag.incrementalState[]) - found = true const RewindBlockThreshold = 64 @@ -1775,12 +1756,6 @@ proc updateState*( found = true break - if dag.incrementalState != nil and - canAdvance(dag.incrementalState[], cur): - assign(state, dag.incrementalState[]) - found = true - break - if cur.isProposed(): # This is not an empty slot, so the block will need to be applied to # eventually reach bs @@ -2412,7 +2387,6 @@ proc updateHead*( quit 1 dag.head = newHead - dag.resetChainProgressWatchdog() if dag.headState.is_merge_transition_complete() and not lastHeadMergeComplete and @@ -2655,12 +2629,7 @@ proc getProposalState*( # Start with the clearance state, since this one typically has been advanced # and thus has a hot hash tree cache - let state = - if dag.incrementalState != nil and - dag.incrementalState[].latest_block_id == head.bid: - assignClone(dag.incrementalState[]) - else: - assignClone(dag.clearanceState) + let state = assignClone(dag.clearanceState) var info = ForkedEpochInfo() diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index e6c6ad21b..4c9feaa95 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -367,9 +367,6 @@ proc initFullNode( func getFrontfillSlot(): Slot = max(dag.frontfill.get(BlockId()).slot, dag.horizon) - func isBlockKnown(blockRoot: Eth2Digest): bool = - dag.getBlockRef(blockRoot).isSome - let quarantine = newClone( Quarantine.init()) @@ -401,13 +398,6 @@ proc initFullNode( # that should probably be reimagined more holistically in the future. blockProcessor[].addBlock( MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized) - branchDiscoveryBlockVerifier = proc( - signedBlock: ForkedSignedBeaconBlock, - blobs: Opt[BlobSidecars] - ): Future[Result[void, VerifierError]] {.async: (raises: [ - CancelledError], raw: true).} = - blockProcessor[].addBlock( - MsgSource.gossip, signedBlock, blobs, maybeFinalized = false) rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} = @@ -458,9 +448,6 @@ proc initFullNode( getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, getFrontfillSlot, dag.backfill.slot, blockVerifier, maxHeadAge = 0) - branchDiscovery = BranchDiscovery.new( - node.network, getFirstSlotAtFinalizedEpoch, isBlockKnown, - branchDiscoveryBlockVerifier) router = (ref MessageRouter)( processor: processor, network: node.network) @@ -503,7 +490,6 @@ proc initFullNode( node.requestManager = requestManager node.syncManager = syncManager node.backfiller = backfiller - node.branchDiscovery = branchDiscovery node.router = router await node.addValidators() @@ -1295,8 +1281,7 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} = if slot > head.slot: (slot - head.slot).uint64 else: 0'u64 isBehind = - headDistance > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER and - node.syncStatus(head) == ChainSyncStatus.Syncing + headDistance > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER targetGossipState = getTargetGossipState( slot.epoch, @@ -1556,37 +1541,6 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} = # Update 1 epoch early to block non-fork-ready peers node.network.updateForkId(epoch, node.dag.genesis_validators_root) - # If the chain has halted, we have to ensure that the EL gets synced - # so that we can perform validator duties again - if not node.dag.head.executionValid and not node.dag.chainIsProgressing(): - let beaconHead = node.attestationPool[].getBeaconHead(head) - discard await node.consensusManager.updateExecutionClientHead(beaconHead) - - # If the chain head is far behind, we have to advance it incrementally - # to avoid lag spikes when performing validator duties - if node.syncStatus(head) == ChainSyncStatus.Degraded: - let incrementalTick = Moment.now() - if node.dag.incrementalState == nil: - node.dag.incrementalState = assignClone(node.dag.headState) - elif node.dag.incrementalState[].latest_block_id != node.dag.head.bid: - node.dag.incrementalState[].assign(node.dag.headState) - else: - let - incrementalSlot = getStateField(node.dag.incrementalState[], slot) - maxSlot = max(incrementalSlot, slot + 1) - nextSlot = min((incrementalSlot.epoch + 1).start_slot, maxSlot) - var - cache: StateCache - info: ForkedEpochInfo - node.dag.advanceSlots( - node.dag.incrementalState[], nextSlot, true, cache, info) - let incrementalSlot = getStateField(node.dag.incrementalState[], slot) - info "Head state is behind, catching up", - headSlot = node.dag.head.slot, - progressSlot = incrementalSlot, - wallSlot = slot, - dur = Moment.now() - incrementalTick - # When we're not behind schedule, we'll speculatively update the clearance # state in anticipation of receiving the next block - we do it after # logging slot end since the nextActionWaitTime can be short @@ -1610,10 +1564,6 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} = await node.updateGossipStatus(slot + 1) - # Branch discovery module is only used to support ongoing sync manager tasks - if not node.syncManager.inProgress: - await node.branchDiscovery.stop() - func formatNextConsensusFork( node: BeaconNode, withVanityArt = false): Opt[string] = let consensusFork = @@ -1633,14 +1583,6 @@ func syncStatus(node: BeaconNode, wallSlot: Slot): string = let optimisticHead = not node.dag.head.executionValid if node.syncManager.inProgress: let - degradedSuffix = - case node.branchDiscovery.state - of BranchDiscoveryState.Active: - "/discovering" - of BranchDiscoveryState.Suspended: - "/degraded" - of BranchDiscoveryState.Stopped: - "" optimisticSuffix = if optimisticHead: "/opt" @@ -1651,20 +1593,7 @@ func syncStatus(node: BeaconNode, wallSlot: Slot): string = " - lc: " & $shortLog(node.consensusManager[].optimisticHead) else: "" - catchingUpSuffix = - if node.dag.incrementalState != nil: - let - headSlot = node.dag.head.slot - incrementalSlot = getStateField(node.dag.incrementalState[], slot) - progress = - (incrementalSlot - headSlot).float / - max(wallSlot - headSlot, 1).float * 100.float - " - catching up: " & - formatFloat(progress, ffDecimal, precision = 2) & "%" - else: - "" - node.syncManager.syncStatus & degradedSuffix & optimisticSuffix & - lightClientSuffix & catchingUpSuffix + node.syncManager.syncStatus & optimisticSuffix & lightClientSuffix elif node.backfiller.inProgress: "backfill: " & node.backfiller.syncStatus elif optimistic_head: diff --git a/beacon_chain/sync/branch_discovery.nim b/beacon_chain/sync/branch_discovery.nim deleted file mode 100644 index fd950f835..000000000 --- a/beacon_chain/sync/branch_discovery.nim +++ /dev/null @@ -1,292 +0,0 @@ -# beacon_chain -# Copyright (c) 2024 Status Research & Development GmbH -# Licensed and distributed under either of -# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). -# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). -# at your option. This file may not be copied, modified, or distributed except according to those terms. - -{.push raises: [].} - -# This module is started when a chain stall is detected, i.e., -# a long period without any chain progress, while at the same time being -# connected to a healthy number of different peers. -# -# In such a scenario, the network may partition into multiple groups of peers -# that build on separate branches. It is vital to specifically target peers -# from different partitions and download branches which are not necessarily -# popularly served. Branches may be unpopular because they are expensive to -# apply, and may have a difficult time propagating despite a large weight in -# attestations backing them. This only exacerbates the longer the split view -# scenario is ongoing. -# -# While sync manager can sync popular chains well, it cannot reliably sync -# minority chains only served by a limited number of peers. This module -# augments sync manager in the split view scenario. -# Note that request manager is not running while sync manager is running. -# -# Once both sync manager and branch discovery stopped resolving new blocks, -# `syncStatus` will report `ChainSyncStatus.Degraded` and `blockchain_dag` state -# will be gradually advanced to the wall slot to prepare for block proposal. -# If at that time, no additional branches were discovered, validator duties -# will be performed based on local fork choice. -# -# Note that the canonical chain may not be on the highest slot number, -# as some partitions of the network may have built on top of branches -# with lower validator support while the canonical chain was not visible. - -import - std/algorithm, - chronos, chronicles, metrics, results, - ../spec/[forks, network], - ../consensus_object_pools/block_pools_types, - ../networking/[eth2_network, peer_pool], - ./sync_protocol - -logScope: - topics = "branchdiscovery" - -declareGauge beacon_sync_branchdiscovery_state, - "Branch discovery module operating state" - -declareCounter beacon_sync_branchdiscovery_discovered_blocks, - "Number of beacon blocks discovered by the branch discovery module" - -type - BranchDiscoveryState* {.pure.} = enum - Stopped, - Suspended, - Active - - GetSlotCallback* = - proc(): Slot {.gcsafe, raises: [].} - - IsBlockKnownCallback* = - proc(blockRoot: Eth2Digest): bool {.gcsafe, raises: [].} - - BlockVerifierCallback* = proc( - signedBlock: ForkedSignedBeaconBlock, - blobs: Opt[BlobSidecars] - ): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} - - BranchDiscovery* = object - network: Eth2Node - getFinalizedSlot: GetSlotCallback - isBlockKnown: IsBlockKnownCallback - blockVerifier: BlockVerifierCallback - isActive: AsyncEvent - loopFuture: Future[void].Raising([]) - -proc new*( - T: type BranchDiscovery, - network: Eth2Node, - getFinalizedSlot: GetSlotCallback, - isBlockKnown: IsBlockKnownCallback, - blockVerifier: BlockVerifierCallback): ref BranchDiscovery = - let self = (ref BranchDiscovery)( - network: network, - getFinalizedSlot: getFinalizedSlot, - isBlockKnown: isBlockKnown, - blockVerifier: blockVerifier, - isActive: newAsyncEvent()) - self[].isActive.fire() - self - -proc discoverBranch( - self: BranchDiscovery, peer: Peer) {.async: (raises: [CancelledError]).} = - logScope: - peer - peer_score = peer.getScore() - - let - finalizedSlot = self.getFinalizedSlot() - peerHeadSlot = peer.getHeadSlot() - if peerHeadSlot <= finalizedSlot: - peer.updateScore(PeerScoreUseless) - debug "Peer's head slot is already finalized", peerHeadSlot, finalizedSlot - return - - var blockRoot = peer.getHeadRoot() - logScope: blockRoot - if self.isBlockKnown(blockRoot): - peer.updateScore(PeerScoreUseless) - debug "Peer's head block root is already known" - return - - const - maxRequestsPerBurst = 20 - burstDuration = chronos.seconds(40) - let bucket = TokenBucket.new(maxRequestsPerBurst, burstDuration) - - var parentSlot = peerHeadSlot + 1 - logScope: parentSlot - while true: - if self.isBlockKnown(blockRoot): - debug "Branch from peer no longer unknown" - return - if peer.getScore() < PeerScoreLowLimit: - debug "Failed to discover new branch from peer" - return - - debug "Discovering new branch from peer" - try: - await bucket.consume(1) - except CancelledError as exc: - raise exc - except CatchableError as exc: - raiseAssert "TokenBucket.consume should not fail: " & $exc.msg - let rsp = await peer.beaconBlocksByRoot_v2(BlockRootsList @[blockRoot]) - if rsp.isErr: - # `eth2_network` already descored according to the specific error - debug "Failed to receive block", err = rsp.error - await sleepAsync(RESP_TIMEOUT_DUR) - continue - template blocks: untyped = rsp.get - - # The peer was the one providing us with this block root, it should exist - if blocks.len == 0: - peer.updateScore(PeerScoreNoValues) - debug "Received no blocks", numBlocks = blocks.len - await sleepAsync(RESP_TIMEOUT_DUR) - continue - if blocks.len > 1: - peer.updateScore(PeerScoreBadResponse) - debug "Received too many blocks", numBlocks = blocks.len - return - template blck: untyped = blocks[0][] - if blck.slot >= parentSlot: - peer.updateScore(PeerScoreBadResponse) - debug "Received block older than parent", receivedSlot = blck.slot - return - if blck.root != blockRoot: - peer.updateScore(PeerScoreBadResponse) - debug "Received incorrect block", receivedRoot = blck.root - return - - var blobIds: seq[BlobIdentifier] - withBlck(blck): - when consensusFork >= ConsensusFork.Deneb: - for i in 0 ..< forkyBlck.message.body.blob_kzg_commitments.len: - blobIds.add BlobIdentifier(block_root: blockRoot, index: i.BlobIndex) - var blobs: Opt[BlobSidecars] - if blobIds.len > 0: - while true: - if self.isBlockKnown(blockRoot): - debug "Branch from peer no longer unknown" - return - if peer.getScore() < PeerScoreLowLimit: - debug "Failed to discover new branch from peer" - return - - try: - await bucket.consume(1) - except CancelledError as exc: - raise exc - except CatchableError as exc: - raiseAssert "TokenBucket.consume should not fail: " & $exc.msg - let r = await peer.blobSidecarsByRoot(BlobIdentifierList blobIds) - if r.isErr: - # `eth2_network` already descored according to the specific error - debug "Failed to receive blobs", err = r.error - await sleepAsync(RESP_TIMEOUT_DUR) - continue - template blobSidecars: untyped = r.unsafeGet - - if blobSidecars.len < blobIds.len: - peer.updateScore(PeerScoreMissingValues) - debug "Received not all blobs", - numBlobs = blobSidecars.len, expectedNumBlobs = blobIds.len - await sleepAsync(RESP_TIMEOUT_DUR) - continue - if blobSidecars.len > blobIds.len: - peer.updateScore(PeerScoreBadResponse) - debug "Received too many blobs", - numBlobs = blobSidecars.len, expectedNumBlobs = blobIds.len - return - for i, blobSidecar in blobSidecars: - let root = hash_tree_root(blobSidecar[].signed_block_header.message) - if root != blockRoot: - peer.updateScore(PeerScoreBadResponse) - debug "Received unexpected blob" - return - blobSidecar[].verify_blob_sidecar_inclusion_proof().isOkOr: - peer.updateScore(PeerScoreBadResponse) - debug "Received invalid blob" - return - blobs = Opt.some distinctBase(blobSidecars).sortedByIt(it.index) - for i, blobSidecar in blobs.get: - if blobSidecar[].index != i.BlobIndex: - peer.updateScore(PeerScoreBadResponse) - debug "Received duplicate blobs while others are missing" - return - break - - let err = (await self.blockVerifier(blck, blobs)).errorOr: - peer.updateScore(PeerScoreGoodBatchValue + PeerScoreGoodValues) - beacon_sync_branchdiscovery_discovered_blocks.inc() - info "Discovered new branch from peer" - break - case err - of VerifierError.Invalid: - peer.updateScore(PeerScoreBadResponse) - debug "Received invalid block" - return - of VerifierError.UnviableFork: - peer.updateScore(PeerScoreUnviableFork) - debug "Received unviable block" - return - of VerifierError.Duplicate: - peer.updateScore(PeerScoreGoodValues) - debug "Connected new branch from peer" - break - of VerifierError.MissingParent: - peer.updateScore(PeerScoreGoodBatchValue) - parentSlot = blck.slot - blockRoot = blck.getForkedBlockField(parent_root) - continue - -proc loop(self: ref BranchDiscovery) {.async: (raises: []).} = - try: - while true: - await self[].isActive.wait() - await sleepAsync(RESP_TIMEOUT_DUR) - - let peer = - try: - self[].network.peerPool.acquireNoWait() - except PeerPoolError as exc: - debug "Failed to acquire peer", exc = exc.msg - continue - defer: self[].network.peerPool.release(peer) - - await self[].discoverBranch(peer) - except CancelledError: - return - -func state*(self: ref BranchDiscovery): BranchDiscoveryState = - if self[].loopFuture == nil: - BranchDiscoveryState.Stopped - elif not self[].isActive.isSet: - BranchDiscoveryState.Suspended - else: - BranchDiscoveryState.Active - -proc start*(self: ref BranchDiscovery) = - doAssert self[].loopFuture == nil - info "Starting discovery of new branches" - self[].loopFuture = self.loop() - beacon_sync_branchdiscovery_state.set(self.state.ord().int64) - -proc stop*(self: ref BranchDiscovery) {.async: (raises: []).} = - if self[].loopFuture != nil: - info "Stopping discovery of new branches" - await self[].loopFuture.cancelAndWait() - self[].loopFuture = nil - beacon_sync_branchdiscovery_state.set(self.state.ord().int64) - -proc suspend*(self: ref BranchDiscovery) = - self[].isActive.clear() - beacon_sync_branchdiscovery_state.set(self.state.ord().int64) - -proc resume*(self: ref BranchDiscovery) = - self[].isActive.fire() - beacon_sync_branchdiscovery_state.set(self.state.ord().int64) diff --git a/beacon_chain/validators/beacon_validators.nim b/beacon_chain/validators/beacon_validators.nim index 268e84c60..076df7f27 100644 --- a/beacon_chain/validators/beacon_validators.nim +++ b/beacon_chain/validators/beacon_validators.nim @@ -227,85 +227,29 @@ proc getGraffitiBytes*( getGraffiti(node.config.validatorsDir, node.config.defaultGraffitiBytes(), validator.pubkey) -type ChainSyncStatus* {.pure.} = enum - Syncing, - Synced, - Degraded +proc isSynced*(node: BeaconNode, head: BlockRef): bool = + ## TODO This function is here as a placeholder for some better heurestics to + ## determine if we're in sync and should be producing blocks and + ## attestations. Generally, the problem is that slot time keeps advancing + ## even when there are no blocks being produced, so there's no way to + ## distinguish validators geniunely going missing from the node not being + ## well connected (during a network split or an internet outage for + ## example). It would generally be correct to simply keep running as if + ## we were the only legit node left alive, but then we run into issues: + ## with enough many empty slots, the validator pool is emptied leading + ## to empty committees and lots of empty slot processing that will be + ## thrown away as soon as we're synced again. -proc syncStatus*(node: BeaconNode, head: BlockRef): ChainSyncStatus = - ## Generally, the problem is that slot time keeps advancing - ## even when there are no blocks being produced, so there's no way to - ## distinguish validators geniunely going missing from the node not being - ## well connected (during a network split or an internet outage for - ## example). It would generally be correct to simply keep running as if - ## we were the only legit node left alive, but then we run into issues: - ## with enough many empty slots, the validator pool is emptied leading - ## to empty committees and lots of empty slot processing that will be - ## thrown away as soon as we're synced again. let # The slot we should be at, according to the clock beaconTime = node.beaconClock.now() wallSlot = beaconTime.toSlot() - if not wallSlot.afterGenesis or - head.slot + node.config.syncHorizon >= wallSlot.slot: - node.dag.resetChainProgressWatchdog() - node.branchDiscovery.suspend() - return ChainSyncStatus.Synced - - let numPeers = len(node.network.peerPool) - if numPeers <= node.config.maxPeers div 4: - # We may have poor connectivity, wait until more peers are available. - # This could also be intermittent, as state replays while chain is degraded - # may take significant amounts of time, during which many peers are lost - node.branchDiscovery.suspend() - return ChainSyncStatus.Syncing - - if node.dag.chainIsProgressing(): - # Chain is progressing, we are out of sync - node.branchDiscovery.resume() - return ChainSyncStatus.Syncing - - # Network connectivity is good, but we have trouble making sync progress. - # Turn on branch discovery module until we have a recent canonical head. - # The branch discovery module specifically targets peers on alternate branches - # and supports sync manager in discovering branches that are not widely seen - # but that may still have weight from attestations. - if node.branchDiscovery.state == BranchDiscoveryState.Stopped: - node.branchDiscovery.start() - node.branchDiscovery.resume() - - let - maxHeadSlot = node.dag.heads.foldl(max(a, b.slot), GENESIS_SLOT) - numPeersWithHigherProgress = node.network.peerPool.peers - .countIt(it != nil and it.getHeadSlot() > maxHeadSlot) - if numPeersWithHigherProgress > node.config.maxPeers div 8: - # A peer indicates that they are on a later slot, wait for sync manager - # to progress, or for it to kick the peer if they are faking the status - warn "Chain appears to have stalled, but peers indicate higher progress", - numPeersWithHigherProgress, numPeers, maxPeers = node.config.maxPeers, - head, maxHeadSlot - node.dag.resetChainProgressWatchdog() - return ChainSyncStatus.Syncing - - # We are on the latest slot among all of our peers, and there has been no - # chain progress for an extended period of time. - if node.dag.incrementalState == nil: - # The head state is too far in the past to timely perform validator duties - return ChainSyncStatus.Degraded - if node.dag.incrementalState[].latest_block_id != node.dag.head.bid: - # The incremental state is not yet on the correct head (see `onSlotEnd`) - return ChainSyncStatus.Degraded - let incrementalSlot = getStateField(node.dag.incrementalState[], slot) - if incrementalSlot + node.config.syncHorizon < wallSlot.slot: - # The incremental state still needs to advance further (see `onSlotEnd`) - return ChainSyncStatus.Degraded - - # It is reasonable safe to assume that the network has halted, resume duties - ChainSyncStatus.Synced - -proc isSynced*(node: BeaconNode, head: BlockRef): bool = - node.syncStatus(head) == ChainSyncStatus.Synced + # TODO if everyone follows this logic, the network will not recover from a + # halt: nobody will be producing blocks because everone expects someone + # else to do it + not wallSlot.afterGenesis or + head.slot + node.config.syncHorizon >= wallSlot.slot proc handleLightClientUpdates*(node: BeaconNode, slot: Slot) {.async: (raises: [CancelledError]).} =