From c5352cf89f4ba10f49fe2d32fc951bf9e3365a39 Mon Sep 17 00:00:00 2001 From: Etan Kissling Date: Mon, 25 Mar 2024 19:23:33 +0100 Subject: [PATCH] add option to incrementally compute proposal state if behind There can be situations where proposals need to be made on top of stale parent state. Because the distance to the wall slot is big, the proposal state (`getProposalState`) has to be computed incrementally to avoid lag spikes. Proposals have to be timely to have a chance to propagate on the network. Peak memory requirements don't change, but the proposal state needs to be allocated for a longer duration than usual. --- beacon_chain/conf.nim | 6 ++ .../block_pools_types.nim | 7 ++ .../consensus_object_pools/blockchain_dag.nim | 34 +++++++- beacon_chain/nimbus_beacon_node.nim | 65 ++++++++++++-- beacon_chain/validators/beacon_validators.nim | 85 +++++++++++++++---- 5 files changed, 173 insertions(+), 24 deletions(-) diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index 3aa3d0dff..bdd97d6a0 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -591,6 +591,12 @@ type defaultValueDesc: $defaultSyncHorizon name: "sync-horizon" .}: uint64 + proposeStale* {. + hidden + desc: "Whether or not to propose blocks on top of a stale head. May increase memory consumption" + defaultValue: false + name: "debug-propose-stale" .}: bool + terminalTotalDifficultyOverride* {. hidden desc: "Deprecated for removal" diff --git a/beacon_chain/consensus_object_pools/block_pools_types.nim b/beacon_chain/consensus_object_pools/block_pools_types.nim index 11a3806a6..743eec50b 100644 --- a/beacon_chain/consensus_object_pools/block_pools_types.nim +++ b/beacon_chain/consensus_object_pools/block_pools_types.nim @@ -202,10 +202,17 @@ type ## Cached state used during block clearance - must only be used in ## clearance module + incrementalState*: ref ForkedHashedBeaconState + ## State used for incrementally computing a proposal state. + ## See `getProposalState` + updateFlags*: UpdateFlags cfg*: RuntimeConfig + lastChainProgress*: Moment + ## Indicates the last wall time at which meaningful progress was made + shufflingRefs*: LRUCache[16, ShufflingRef] epochRefs*: LRUCache[32, EpochRef] diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index 56e49e81a..fbd6fa236 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -236,6 +236,8 @@ func getBlockIdAtSlot*(dag: ChainDAGRef, slot: Slot): Opt[BlockSlotId] = tryWithState dag.headState tryWithState dag.epochRefState tryWithState dag.clearanceState + if dag.incrementalState != nil: + tryWithState dag.incrementalState[] # Fallback to database, this only works for backfilled blocks let finlow = dag.db.finalizedBlocks.low.expect("at least tailRef written") @@ -1004,6 +1006,14 @@ proc applyBlock( ok() +proc resetChainProgressWatchdog*(dag: ChainDAGRef) = + dag.lastChainProgress = Moment.now() + dag.incrementalState = nil + +proc chainIsProgressing*(dag: ChainDAGRef): bool = + const watchdogDuration = chronos.minutes(60) + dag.lastChainProgress + watchdogDuration >= Moment.now() + proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, validatorMonitor: ref ValidatorMonitor, updateFlags: UpdateFlags, eraPath = ".", @@ -1044,6 +1054,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, # allow skipping some validation. updateFlags: updateFlags * {strictVerification}, cfg: cfg, + lastChainProgress: Moment.now(), vanityLogs: vanityLogs, @@ -1506,6 +1517,8 @@ proc computeRandaoMixFromMemory*( tryWithState dag.headState tryWithState dag.epochRefState tryWithState dag.clearanceState + if dag.incrementalState != nil: + tryWithState dag.incrementalState[] proc computeRandaoMixFromDatabase*( dag: ChainDAGRef, bid: BlockId, lowSlot: Slot): Opt[Eth2Digest] = @@ -1577,6 +1590,8 @@ proc computeShufflingRefFromMemory*( tryWithState dag.headState tryWithState dag.epochRefState tryWithState dag.clearanceState + if dag.incrementalState != nil: + tryWithState dag.incrementalState[] proc getShufflingRef*( dag: ChainDAGRef, blck: BlockRef, epoch: Epoch, @@ -1724,6 +1739,10 @@ proc updateState*( elif exactMatch(dag.epochRefState, bsi): assign(state, dag.epochRefState) found = true + elif dag.incrementalState != nil and + exactMatch(dag.incrementalState[], bsi): + assign(state, dag.incrementalState[]) + found = true const RewindBlockThreshold = 64 @@ -1756,6 +1775,12 @@ proc updateState*( found = true break + if dag.incrementalState != nil and + canAdvance(dag.incrementalState[], cur): + assign(state, dag.incrementalState[]) + found = true + break + if cur.isProposed(): # This is not an empty slot, so the block will need to be applied to # eventually reach bs @@ -2387,6 +2412,7 @@ proc updateHead*( quit 1 dag.head = newHead + dag.resetChainProgressWatchdog() if dag.headState.is_merge_transition_complete() and not lastHeadMergeComplete and @@ -2629,7 +2655,11 @@ proc getProposalState*( # Start with the clearance state, since this one typically has been advanced # and thus has a hot hash tree cache - let state = assignClone(dag.clearanceState) + let state = + if dag.incrementalState != nil: + dag.incrementalState + else: + assignClone(dag.clearanceState) var info = ForkedEpochInfo() @@ -2651,6 +2681,8 @@ proc getProposalState*( dag.cfg, state[], slot, cache, info, {skipLastStateRootCalculation}).expect("advancing 1 slot should not fail") + # Ensure async operations don't interfere with `incrementalState` + dag.resetChainProgressWatchdog() ok state func aggregateAll*( diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 4c9feaa95..41fde76ed 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -1275,13 +1275,21 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} = TOPIC_SUBSCRIBE_THRESHOLD_SLOTS = 64 HYSTERESIS_BUFFER = 16 + func distanceTo(headSlot: Slot, wallSlot: Slot): uint64 = + if wallSlot > headSlot: (wallSlot - headSlot).uint64 + else: 0'u64 + let head = node.dag.head - headDistance = - if slot > head.slot: (slot - head.slot).uint64 - else: 0'u64 - isBehind = - headDistance > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER + headDistance = head.slot.distanceTo(slot) + distance = + if node.dag.incrementalState != nil and + node.dag.incrementalState[].latest_block_id == head.bid: + let incrementalSlot = getStateField(node.dag.incrementalState[], slot) + incrementalSlot.distanceTo(slot) + else: + headDistance + isBehind = distance > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER targetGossipState = getTargetGossipState( slot.epoch, @@ -1541,6 +1549,38 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} = # Update 1 epoch early to block non-fork-ready peers node.network.updateForkId(epoch, node.dag.genesis_validators_root) + if node.config.proposeStale: + # If the chain has halted, we have to ensure that the EL gets synced + # so that we can perform validator duties again + if not node.dag.head.executionValid and not node.dag.chainIsProgressing(): + let beaconHead = node.attestationPool[].getBeaconHead(head) + discard await node.consensusManager.updateExecutionClientHead(beaconHead) + + # If the chain head is far behind, we have to advance it incrementally + # to avoid lag spikes when performing validator duties + if node.syncStatus(head) == ChainSyncStatus.Degraded: + let incrementalTick = Moment.now() + if node.dag.incrementalState == nil: + node.dag.incrementalState = assignClone(node.dag.headState) + elif node.dag.incrementalState[].latest_block_id != node.dag.head.bid: + node.dag.incrementalState[].assign(node.dag.headState) + else: + let + incrementalSlot = getStateField(node.dag.incrementalState[], slot) + maxSlot = max(incrementalSlot, slot + 1) + nextSlot = min((incrementalSlot.epoch + 1).start_slot, maxSlot) + var + cache: StateCache + info: ForkedEpochInfo + node.dag.advanceSlots( + node.dag.incrementalState[], nextSlot, true, cache, info) + let incrementalSlot = getStateField(node.dag.incrementalState[], slot) + info "Head state is behind, catching up", + headSlot = node.dag.head.slot, + progressSlot = incrementalSlot, + wallSlot = slot, + dur = Moment.now() - incrementalTick + # When we're not behind schedule, we'll speculatively update the clearance # state in anticipation of receiving the next block - we do it after # logging slot end since the nextActionWaitTime can be short @@ -1593,7 +1633,20 @@ func syncStatus(node: BeaconNode, wallSlot: Slot): string = " - lc: " & $shortLog(node.consensusManager[].optimisticHead) else: "" - node.syncManager.syncStatus & optimisticSuffix & lightClientSuffix + catchingUpSuffix = + if node.dag.incrementalState != nil: + let + headSlot = node.dag.head.slot + incrementalSlot = getStateField(node.dag.incrementalState[], slot) + progress = + (incrementalSlot - headSlot).float / + max(wallSlot - headSlot, 1).float * 100.float + " - catching up: " & + formatFloat(progress, ffDecimal, precision = 2) & "%" + else: + "" + node.syncManager.syncStatus & optimisticSuffix & + lightClientSuffix & catchingUpSuffix elif node.backfiller.inProgress: "backfill: " & node.backfiller.syncStatus elif optimistic_head: diff --git a/beacon_chain/validators/beacon_validators.nim b/beacon_chain/validators/beacon_validators.nim index 076df7f27..a393b671b 100644 --- a/beacon_chain/validators/beacon_validators.nim +++ b/beacon_chain/validators/beacon_validators.nim @@ -227,29 +227,80 @@ proc getGraffitiBytes*( getGraffiti(node.config.validatorsDir, node.config.defaultGraffitiBytes(), validator.pubkey) -proc isSynced*(node: BeaconNode, head: BlockRef): bool = - ## TODO This function is here as a placeholder for some better heurestics to - ## determine if we're in sync and should be producing blocks and - ## attestations. Generally, the problem is that slot time keeps advancing - ## even when there are no blocks being produced, so there's no way to - ## distinguish validators geniunely going missing from the node not being - ## well connected (during a network split or an internet outage for - ## example). It would generally be correct to simply keep running as if - ## we were the only legit node left alive, but then we run into issues: - ## with enough many empty slots, the validator pool is emptied leading - ## to empty committees and lots of empty slot processing that will be - ## thrown away as soon as we're synced again. +type ChainSyncStatus* {.pure.} = enum + Syncing, + Synced, + Degraded +proc syncStatus*(node: BeaconNode, head: BlockRef): ChainSyncStatus = + ## Generally, the problem is that slot time keeps advancing + ## even when there are no blocks being produced, so there's no way to + ## distinguish validators geniunely going missing from the node not being + ## well connected (during a network split or an internet outage for + ## example). It would generally be correct to simply keep running as if + ## we were the only legit node left alive, but then we run into issues: + ## with enough many empty slots, the validator pool is emptied leading + ## to empty committees and lots of empty slot processing that will be + ## thrown away as soon as we're synced again. let # The slot we should be at, according to the clock beaconTime = node.beaconClock.now() wallSlot = beaconTime.toSlot() - # TODO if everyone follows this logic, the network will not recover from a - # halt: nobody will be producing blocks because everone expects someone - # else to do it - not wallSlot.afterGenesis or - head.slot + node.config.syncHorizon >= wallSlot.slot + if not wallSlot.afterGenesis or + head.slot + node.config.syncHorizon >= wallSlot.slot: + node.dag.resetChainProgressWatchdog() + return ChainSyncStatus.Synced + + if not node.config.proposeStale: + # Continue syncing and wait for someone else to propose the next block + return ChainSyncStatus.Syncing + + let + numPeers = len(node.network.peerPool) + minPeers = max(node.config.maxPeers div 4, SyncWorkersCount * 2) + if numPeers <= minPeers: + # We may have poor connectivity, wait until more peers are available. + # This could also be intermittent, as state replays while chain is degraded + # may take significant amounts of time, during which many peers are lost + return ChainSyncStatus.Syncing + + if node.dag.chainIsProgressing(): + # Chain is progressing, we are out of sync + return ChainSyncStatus.Syncing + + let + maxHeadSlot = node.dag.heads.foldl(max(a, b.slot), GENESIS_SLOT) + numPeersWithHigherProgress = node.network.peerPool.peers + .countIt(it != nil and it.getHeadSlot() > maxHeadSlot) + significantNumPeers = node.config.maxPeers div 8 + if numPeersWithHigherProgress > significantNumPeers: + # A peer indicates that they are on a later slot, wait for sync manager + # to progress, or for it to kick the peer if they are faking the status + warn "Chain appears to have stalled, but peers indicate higher progress", + numPeersWithHigherProgress, numPeers, maxPeers = node.config.maxPeers, + head, maxHeadSlot + node.dag.resetChainProgressWatchdog() + return ChainSyncStatus.Syncing + + # We are on the latest slot among all of our peers, and there has been no + # chain progress for an extended period of time. + if node.dag.incrementalState == nil: + # The head state is too far in the past to timely perform validator duties + return ChainSyncStatus.Degraded + if node.dag.incrementalState[].latest_block_id != node.dag.head.bid: + # The incremental state is not yet on the correct head (see `onSlotEnd`) + return ChainSyncStatus.Degraded + let incrementalSlot = getStateField(node.dag.incrementalState[], slot) + if incrementalSlot + node.config.syncHorizon < wallSlot.slot: + # The incremental state still needs to advance further (see `onSlotEnd`) + return ChainSyncStatus.Degraded + + # It is reasonable safe to assume that the network has halted, resume duties + ChainSyncStatus.Synced + +proc isSynced*(node: BeaconNode, head: BlockRef): bool = + node.syncStatus(head) == ChainSyncStatus.Synced proc handleLightClientUpdates*(node: BeaconNode, slot: Slot) {.async: (raises: [CancelledError]).} =