diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index 3aa3d0dff..bdd97d6a0 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -591,6 +591,12 @@ type defaultValueDesc: $defaultSyncHorizon name: "sync-horizon" .}: uint64 + proposeStale* {. + hidden + desc: "Whether or not to propose blocks on top of a stale head. May increase memory consumption" + defaultValue: false + name: "debug-propose-stale" .}: bool + terminalTotalDifficultyOverride* {. hidden desc: "Deprecated for removal" diff --git a/beacon_chain/consensus_object_pools/block_pools_types.nim b/beacon_chain/consensus_object_pools/block_pools_types.nim index 11a3806a6..743eec50b 100644 --- a/beacon_chain/consensus_object_pools/block_pools_types.nim +++ b/beacon_chain/consensus_object_pools/block_pools_types.nim @@ -202,10 +202,17 @@ type ## Cached state used during block clearance - must only be used in ## clearance module + incrementalState*: ref ForkedHashedBeaconState + ## State used for incrementally computing a proposal state. + ## See `getProposalState` + updateFlags*: UpdateFlags cfg*: RuntimeConfig + lastChainProgress*: Moment + ## Indicates the last wall time at which meaningful progress was made + shufflingRefs*: LRUCache[16, ShufflingRef] epochRefs*: LRUCache[32, EpochRef] diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index 56e49e81a..fbd6fa236 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -236,6 +236,8 @@ func getBlockIdAtSlot*(dag: ChainDAGRef, slot: Slot): Opt[BlockSlotId] = tryWithState dag.headState tryWithState dag.epochRefState tryWithState dag.clearanceState + if dag.incrementalState != nil: + tryWithState dag.incrementalState[] # Fallback to database, this only works for backfilled blocks let finlow = dag.db.finalizedBlocks.low.expect("at least tailRef written") @@ -1004,6 +1006,14 @@ proc applyBlock( ok() +proc resetChainProgressWatchdog*(dag: ChainDAGRef) = + dag.lastChainProgress = Moment.now() + dag.incrementalState = nil + +proc chainIsProgressing*(dag: ChainDAGRef): bool = + const watchdogDuration = chronos.minutes(60) + dag.lastChainProgress + watchdogDuration >= Moment.now() + proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, validatorMonitor: ref ValidatorMonitor, updateFlags: UpdateFlags, eraPath = ".", @@ -1044,6 +1054,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, # allow skipping some validation. updateFlags: updateFlags * {strictVerification}, cfg: cfg, + lastChainProgress: Moment.now(), vanityLogs: vanityLogs, @@ -1506,6 +1517,8 @@ proc computeRandaoMixFromMemory*( tryWithState dag.headState tryWithState dag.epochRefState tryWithState dag.clearanceState + if dag.incrementalState != nil: + tryWithState dag.incrementalState[] proc computeRandaoMixFromDatabase*( dag: ChainDAGRef, bid: BlockId, lowSlot: Slot): Opt[Eth2Digest] = @@ -1577,6 +1590,8 @@ proc computeShufflingRefFromMemory*( tryWithState dag.headState tryWithState dag.epochRefState tryWithState dag.clearanceState + if dag.incrementalState != nil: + tryWithState dag.incrementalState[] proc getShufflingRef*( dag: ChainDAGRef, blck: BlockRef, epoch: Epoch, @@ -1724,6 +1739,10 @@ proc updateState*( elif exactMatch(dag.epochRefState, bsi): assign(state, dag.epochRefState) found = true + elif dag.incrementalState != nil and + exactMatch(dag.incrementalState[], bsi): + assign(state, dag.incrementalState[]) + found = true const RewindBlockThreshold = 64 @@ -1756,6 +1775,12 @@ proc updateState*( found = true break + if dag.incrementalState != nil and + canAdvance(dag.incrementalState[], cur): + assign(state, dag.incrementalState[]) + found = true + break + if cur.isProposed(): # This is not an empty slot, so the block will need to be applied to # eventually reach bs @@ -2387,6 +2412,7 @@ proc updateHead*( quit 1 dag.head = newHead + dag.resetChainProgressWatchdog() if dag.headState.is_merge_transition_complete() and not lastHeadMergeComplete and @@ -2629,7 +2655,11 @@ proc getProposalState*( # Start with the clearance state, since this one typically has been advanced # and thus has a hot hash tree cache - let state = assignClone(dag.clearanceState) + let state = + if dag.incrementalState != nil: + dag.incrementalState + else: + assignClone(dag.clearanceState) var info = ForkedEpochInfo() @@ -2651,6 +2681,8 @@ proc getProposalState*( dag.cfg, state[], slot, cache, info, {skipLastStateRootCalculation}).expect("advancing 1 slot should not fail") + # Ensure async operations don't interfere with `incrementalState` + dag.resetChainProgressWatchdog() ok state func aggregateAll*( diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 4c9feaa95..41fde76ed 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -1275,13 +1275,21 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} = TOPIC_SUBSCRIBE_THRESHOLD_SLOTS = 64 HYSTERESIS_BUFFER = 16 + func distanceTo(headSlot: Slot, wallSlot: Slot): uint64 = + if wallSlot > headSlot: (wallSlot - headSlot).uint64 + else: 0'u64 + let head = node.dag.head - headDistance = - if slot > head.slot: (slot - head.slot).uint64 - else: 0'u64 - isBehind = - headDistance > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER + headDistance = head.slot.distanceTo(slot) + distance = + if node.dag.incrementalState != nil and + node.dag.incrementalState[].latest_block_id == head.bid: + let incrementalSlot = getStateField(node.dag.incrementalState[], slot) + incrementalSlot.distanceTo(slot) + else: + headDistance + isBehind = distance > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER targetGossipState = getTargetGossipState( slot.epoch, @@ -1541,6 +1549,38 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} = # Update 1 epoch early to block non-fork-ready peers node.network.updateForkId(epoch, node.dag.genesis_validators_root) + if node.config.proposeStale: + # If the chain has halted, we have to ensure that the EL gets synced + # so that we can perform validator duties again + if not node.dag.head.executionValid and not node.dag.chainIsProgressing(): + let beaconHead = node.attestationPool[].getBeaconHead(head) + discard await node.consensusManager.updateExecutionClientHead(beaconHead) + + # If the chain head is far behind, we have to advance it incrementally + # to avoid lag spikes when performing validator duties + if node.syncStatus(head) == ChainSyncStatus.Degraded: + let incrementalTick = Moment.now() + if node.dag.incrementalState == nil: + node.dag.incrementalState = assignClone(node.dag.headState) + elif node.dag.incrementalState[].latest_block_id != node.dag.head.bid: + node.dag.incrementalState[].assign(node.dag.headState) + else: + let + incrementalSlot = getStateField(node.dag.incrementalState[], slot) + maxSlot = max(incrementalSlot, slot + 1) + nextSlot = min((incrementalSlot.epoch + 1).start_slot, maxSlot) + var + cache: StateCache + info: ForkedEpochInfo + node.dag.advanceSlots( + node.dag.incrementalState[], nextSlot, true, cache, info) + let incrementalSlot = getStateField(node.dag.incrementalState[], slot) + info "Head state is behind, catching up", + headSlot = node.dag.head.slot, + progressSlot = incrementalSlot, + wallSlot = slot, + dur = Moment.now() - incrementalTick + # When we're not behind schedule, we'll speculatively update the clearance # state in anticipation of receiving the next block - we do it after # logging slot end since the nextActionWaitTime can be short @@ -1593,7 +1633,20 @@ func syncStatus(node: BeaconNode, wallSlot: Slot): string = " - lc: " & $shortLog(node.consensusManager[].optimisticHead) else: "" - node.syncManager.syncStatus & optimisticSuffix & lightClientSuffix + catchingUpSuffix = + if node.dag.incrementalState != nil: + let + headSlot = node.dag.head.slot + incrementalSlot = getStateField(node.dag.incrementalState[], slot) + progress = + (incrementalSlot - headSlot).float / + max(wallSlot - headSlot, 1).float * 100.float + " - catching up: " & + formatFloat(progress, ffDecimal, precision = 2) & "%" + else: + "" + node.syncManager.syncStatus & optimisticSuffix & + lightClientSuffix & catchingUpSuffix elif node.backfiller.inProgress: "backfill: " & node.backfiller.syncStatus elif optimistic_head: diff --git a/beacon_chain/validators/beacon_validators.nim b/beacon_chain/validators/beacon_validators.nim index 076df7f27..a393b671b 100644 --- a/beacon_chain/validators/beacon_validators.nim +++ b/beacon_chain/validators/beacon_validators.nim @@ -227,29 +227,80 @@ proc getGraffitiBytes*( getGraffiti(node.config.validatorsDir, node.config.defaultGraffitiBytes(), validator.pubkey) -proc isSynced*(node: BeaconNode, head: BlockRef): bool = - ## TODO This function is here as a placeholder for some better heurestics to - ## determine if we're in sync and should be producing blocks and - ## attestations. Generally, the problem is that slot time keeps advancing - ## even when there are no blocks being produced, so there's no way to - ## distinguish validators geniunely going missing from the node not being - ## well connected (during a network split or an internet outage for - ## example). It would generally be correct to simply keep running as if - ## we were the only legit node left alive, but then we run into issues: - ## with enough many empty slots, the validator pool is emptied leading - ## to empty committees and lots of empty slot processing that will be - ## thrown away as soon as we're synced again. +type ChainSyncStatus* {.pure.} = enum + Syncing, + Synced, + Degraded +proc syncStatus*(node: BeaconNode, head: BlockRef): ChainSyncStatus = + ## Generally, the problem is that slot time keeps advancing + ## even when there are no blocks being produced, so there's no way to + ## distinguish validators geniunely going missing from the node not being + ## well connected (during a network split or an internet outage for + ## example). It would generally be correct to simply keep running as if + ## we were the only legit node left alive, but then we run into issues: + ## with enough many empty slots, the validator pool is emptied leading + ## to empty committees and lots of empty slot processing that will be + ## thrown away as soon as we're synced again. let # The slot we should be at, according to the clock beaconTime = node.beaconClock.now() wallSlot = beaconTime.toSlot() - # TODO if everyone follows this logic, the network will not recover from a - # halt: nobody will be producing blocks because everone expects someone - # else to do it - not wallSlot.afterGenesis or - head.slot + node.config.syncHorizon >= wallSlot.slot + if not wallSlot.afterGenesis or + head.slot + node.config.syncHorizon >= wallSlot.slot: + node.dag.resetChainProgressWatchdog() + return ChainSyncStatus.Synced + + if not node.config.proposeStale: + # Continue syncing and wait for someone else to propose the next block + return ChainSyncStatus.Syncing + + let + numPeers = len(node.network.peerPool) + minPeers = max(node.config.maxPeers div 4, SyncWorkersCount * 2) + if numPeers <= minPeers: + # We may have poor connectivity, wait until more peers are available. + # This could also be intermittent, as state replays while chain is degraded + # may take significant amounts of time, during which many peers are lost + return ChainSyncStatus.Syncing + + if node.dag.chainIsProgressing(): + # Chain is progressing, we are out of sync + return ChainSyncStatus.Syncing + + let + maxHeadSlot = node.dag.heads.foldl(max(a, b.slot), GENESIS_SLOT) + numPeersWithHigherProgress = node.network.peerPool.peers + .countIt(it != nil and it.getHeadSlot() > maxHeadSlot) + significantNumPeers = node.config.maxPeers div 8 + if numPeersWithHigherProgress > significantNumPeers: + # A peer indicates that they are on a later slot, wait for sync manager + # to progress, or for it to kick the peer if they are faking the status + warn "Chain appears to have stalled, but peers indicate higher progress", + numPeersWithHigherProgress, numPeers, maxPeers = node.config.maxPeers, + head, maxHeadSlot + node.dag.resetChainProgressWatchdog() + return ChainSyncStatus.Syncing + + # We are on the latest slot among all of our peers, and there has been no + # chain progress for an extended period of time. + if node.dag.incrementalState == nil: + # The head state is too far in the past to timely perform validator duties + return ChainSyncStatus.Degraded + if node.dag.incrementalState[].latest_block_id != node.dag.head.bid: + # The incremental state is not yet on the correct head (see `onSlotEnd`) + return ChainSyncStatus.Degraded + let incrementalSlot = getStateField(node.dag.incrementalState[], slot) + if incrementalSlot + node.config.syncHorizon < wallSlot.slot: + # The incremental state still needs to advance further (see `onSlotEnd`) + return ChainSyncStatus.Degraded + + # It is reasonable safe to assume that the network has halted, resume duties + ChainSyncStatus.Synced + +proc isSynced*(node: BeaconNode, head: BlockRef): bool = + node.syncStatus(head) == ChainSyncStatus.Synced proc handleLightClientUpdates*(node: BeaconNode, slot: Slot) {.async: (raises: [CancelledError]).} =