add option to incrementally compute proposal state if behind

There can be situations where proposals need to be made on top of stale
parent state. Because the distance to the wall slot is big, the proposal
state (`getProposalState`) has to be computed incrementally to avoid lag
spikes. Proposals have to be timely to have a chance to propagate on the
network. Peak memory requirements don't change, but the proposal state
needs to be allocated for a longer duration than usual.
This commit is contained in:
Etan Kissling 2024-03-25 19:23:33 +01:00
parent 2dbe24c740
commit c5352cf89f
No known key found for this signature in database
GPG Key ID: B21DA824C5A3D03D
5 changed files with 173 additions and 24 deletions

View File

@ -591,6 +591,12 @@ type
defaultValueDesc: $defaultSyncHorizon defaultValueDesc: $defaultSyncHorizon
name: "sync-horizon" .}: uint64 name: "sync-horizon" .}: uint64
proposeStale* {.
hidden
desc: "Whether or not to propose blocks on top of a stale head. May increase memory consumption"
defaultValue: false
name: "debug-propose-stale" .}: bool
terminalTotalDifficultyOverride* {. terminalTotalDifficultyOverride* {.
hidden hidden
desc: "Deprecated for removal" desc: "Deprecated for removal"

View File

@ -202,10 +202,17 @@ type
## Cached state used during block clearance - must only be used in ## Cached state used during block clearance - must only be used in
## clearance module ## clearance module
incrementalState*: ref ForkedHashedBeaconState
## State used for incrementally computing a proposal state.
## See `getProposalState`
updateFlags*: UpdateFlags updateFlags*: UpdateFlags
cfg*: RuntimeConfig cfg*: RuntimeConfig
lastChainProgress*: Moment
## Indicates the last wall time at which meaningful progress was made
shufflingRefs*: LRUCache[16, ShufflingRef] shufflingRefs*: LRUCache[16, ShufflingRef]
epochRefs*: LRUCache[32, EpochRef] epochRefs*: LRUCache[32, EpochRef]

View File

@ -236,6 +236,8 @@ func getBlockIdAtSlot*(dag: ChainDAGRef, slot: Slot): Opt[BlockSlotId] =
tryWithState dag.headState tryWithState dag.headState
tryWithState dag.epochRefState tryWithState dag.epochRefState
tryWithState dag.clearanceState tryWithState dag.clearanceState
if dag.incrementalState != nil:
tryWithState dag.incrementalState[]
# Fallback to database, this only works for backfilled blocks # Fallback to database, this only works for backfilled blocks
let finlow = dag.db.finalizedBlocks.low.expect("at least tailRef written") let finlow = dag.db.finalizedBlocks.low.expect("at least tailRef written")
@ -1004,6 +1006,14 @@ proc applyBlock(
ok() ok()
proc resetChainProgressWatchdog*(dag: ChainDAGRef) =
dag.lastChainProgress = Moment.now()
dag.incrementalState = nil
proc chainIsProgressing*(dag: ChainDAGRef): bool =
const watchdogDuration = chronos.minutes(60)
dag.lastChainProgress + watchdogDuration >= Moment.now()
proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
validatorMonitor: ref ValidatorMonitor, updateFlags: UpdateFlags, validatorMonitor: ref ValidatorMonitor, updateFlags: UpdateFlags,
eraPath = ".", eraPath = ".",
@ -1044,6 +1054,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
# allow skipping some validation. # allow skipping some validation.
updateFlags: updateFlags * {strictVerification}, updateFlags: updateFlags * {strictVerification},
cfg: cfg, cfg: cfg,
lastChainProgress: Moment.now(),
vanityLogs: vanityLogs, vanityLogs: vanityLogs,
@ -1506,6 +1517,8 @@ proc computeRandaoMixFromMemory*(
tryWithState dag.headState tryWithState dag.headState
tryWithState dag.epochRefState tryWithState dag.epochRefState
tryWithState dag.clearanceState tryWithState dag.clearanceState
if dag.incrementalState != nil:
tryWithState dag.incrementalState[]
proc computeRandaoMixFromDatabase*( proc computeRandaoMixFromDatabase*(
dag: ChainDAGRef, bid: BlockId, lowSlot: Slot): Opt[Eth2Digest] = dag: ChainDAGRef, bid: BlockId, lowSlot: Slot): Opt[Eth2Digest] =
@ -1577,6 +1590,8 @@ proc computeShufflingRefFromMemory*(
tryWithState dag.headState tryWithState dag.headState
tryWithState dag.epochRefState tryWithState dag.epochRefState
tryWithState dag.clearanceState tryWithState dag.clearanceState
if dag.incrementalState != nil:
tryWithState dag.incrementalState[]
proc getShufflingRef*( proc getShufflingRef*(
dag: ChainDAGRef, blck: BlockRef, epoch: Epoch, dag: ChainDAGRef, blck: BlockRef, epoch: Epoch,
@ -1724,6 +1739,10 @@ proc updateState*(
elif exactMatch(dag.epochRefState, bsi): elif exactMatch(dag.epochRefState, bsi):
assign(state, dag.epochRefState) assign(state, dag.epochRefState)
found = true found = true
elif dag.incrementalState != nil and
exactMatch(dag.incrementalState[], bsi):
assign(state, dag.incrementalState[])
found = true
const RewindBlockThreshold = 64 const RewindBlockThreshold = 64
@ -1756,6 +1775,12 @@ proc updateState*(
found = true found = true
break break
if dag.incrementalState != nil and
canAdvance(dag.incrementalState[], cur):
assign(state, dag.incrementalState[])
found = true
break
if cur.isProposed(): if cur.isProposed():
# This is not an empty slot, so the block will need to be applied to # This is not an empty slot, so the block will need to be applied to
# eventually reach bs # eventually reach bs
@ -2387,6 +2412,7 @@ proc updateHead*(
quit 1 quit 1
dag.head = newHead dag.head = newHead
dag.resetChainProgressWatchdog()
if dag.headState.is_merge_transition_complete() and not if dag.headState.is_merge_transition_complete() and not
lastHeadMergeComplete and lastHeadMergeComplete and
@ -2629,7 +2655,11 @@ proc getProposalState*(
# Start with the clearance state, since this one typically has been advanced # Start with the clearance state, since this one typically has been advanced
# and thus has a hot hash tree cache # and thus has a hot hash tree cache
let state = assignClone(dag.clearanceState) let state =
if dag.incrementalState != nil:
dag.incrementalState
else:
assignClone(dag.clearanceState)
var var
info = ForkedEpochInfo() info = ForkedEpochInfo()
@ -2651,6 +2681,8 @@ proc getProposalState*(
dag.cfg, state[], slot, cache, info, dag.cfg, state[], slot, cache, info,
{skipLastStateRootCalculation}).expect("advancing 1 slot should not fail") {skipLastStateRootCalculation}).expect("advancing 1 slot should not fail")
# Ensure async operations don't interfere with `incrementalState`
dag.resetChainProgressWatchdog()
ok state ok state
func aggregateAll*( func aggregateAll*(

View File

@ -1275,13 +1275,21 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
TOPIC_SUBSCRIBE_THRESHOLD_SLOTS = 64 TOPIC_SUBSCRIBE_THRESHOLD_SLOTS = 64
HYSTERESIS_BUFFER = 16 HYSTERESIS_BUFFER = 16
func distanceTo(headSlot: Slot, wallSlot: Slot): uint64 =
if wallSlot > headSlot: (wallSlot - headSlot).uint64
else: 0'u64
let let
head = node.dag.head head = node.dag.head
headDistance = headDistance = head.slot.distanceTo(slot)
if slot > head.slot: (slot - head.slot).uint64 distance =
else: 0'u64 if node.dag.incrementalState != nil and
isBehind = node.dag.incrementalState[].latest_block_id == head.bid:
headDistance > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER let incrementalSlot = getStateField(node.dag.incrementalState[], slot)
incrementalSlot.distanceTo(slot)
else:
headDistance
isBehind = distance > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER
targetGossipState = targetGossipState =
getTargetGossipState( getTargetGossipState(
slot.epoch, slot.epoch,
@ -1541,6 +1549,38 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
# Update 1 epoch early to block non-fork-ready peers # Update 1 epoch early to block non-fork-ready peers
node.network.updateForkId(epoch, node.dag.genesis_validators_root) node.network.updateForkId(epoch, node.dag.genesis_validators_root)
if node.config.proposeStale:
# If the chain has halted, we have to ensure that the EL gets synced
# so that we can perform validator duties again
if not node.dag.head.executionValid and not node.dag.chainIsProgressing():
let beaconHead = node.attestationPool[].getBeaconHead(head)
discard await node.consensusManager.updateExecutionClientHead(beaconHead)
# If the chain head is far behind, we have to advance it incrementally
# to avoid lag spikes when performing validator duties
if node.syncStatus(head) == ChainSyncStatus.Degraded:
let incrementalTick = Moment.now()
if node.dag.incrementalState == nil:
node.dag.incrementalState = assignClone(node.dag.headState)
elif node.dag.incrementalState[].latest_block_id != node.dag.head.bid:
node.dag.incrementalState[].assign(node.dag.headState)
else:
let
incrementalSlot = getStateField(node.dag.incrementalState[], slot)
maxSlot = max(incrementalSlot, slot + 1)
nextSlot = min((incrementalSlot.epoch + 1).start_slot, maxSlot)
var
cache: StateCache
info: ForkedEpochInfo
node.dag.advanceSlots(
node.dag.incrementalState[], nextSlot, true, cache, info)
let incrementalSlot = getStateField(node.dag.incrementalState[], slot)
info "Head state is behind, catching up",
headSlot = node.dag.head.slot,
progressSlot = incrementalSlot,
wallSlot = slot,
dur = Moment.now() - incrementalTick
# When we're not behind schedule, we'll speculatively update the clearance # When we're not behind schedule, we'll speculatively update the clearance
# state in anticipation of receiving the next block - we do it after # state in anticipation of receiving the next block - we do it after
# logging slot end since the nextActionWaitTime can be short # logging slot end since the nextActionWaitTime can be short
@ -1593,7 +1633,20 @@ func syncStatus(node: BeaconNode, wallSlot: Slot): string =
" - lc: " & $shortLog(node.consensusManager[].optimisticHead) " - lc: " & $shortLog(node.consensusManager[].optimisticHead)
else: else:
"" ""
node.syncManager.syncStatus & optimisticSuffix & lightClientSuffix catchingUpSuffix =
if node.dag.incrementalState != nil:
let
headSlot = node.dag.head.slot
incrementalSlot = getStateField(node.dag.incrementalState[], slot)
progress =
(incrementalSlot - headSlot).float /
max(wallSlot - headSlot, 1).float * 100.float
" - catching up: " &
formatFloat(progress, ffDecimal, precision = 2) & "%"
else:
""
node.syncManager.syncStatus & optimisticSuffix &
lightClientSuffix & catchingUpSuffix
elif node.backfiller.inProgress: elif node.backfiller.inProgress:
"backfill: " & node.backfiller.syncStatus "backfill: " & node.backfiller.syncStatus
elif optimistic_head: elif optimistic_head:

View File

@ -227,29 +227,80 @@ proc getGraffitiBytes*(
getGraffiti(node.config.validatorsDir, node.config.defaultGraffitiBytes(), getGraffiti(node.config.validatorsDir, node.config.defaultGraffitiBytes(),
validator.pubkey) validator.pubkey)
proc isSynced*(node: BeaconNode, head: BlockRef): bool = type ChainSyncStatus* {.pure.} = enum
## TODO This function is here as a placeholder for some better heurestics to Syncing,
## determine if we're in sync and should be producing blocks and Synced,
## attestations. Generally, the problem is that slot time keeps advancing Degraded
## even when there are no blocks being produced, so there's no way to
## distinguish validators geniunely going missing from the node not being
## well connected (during a network split or an internet outage for
## example). It would generally be correct to simply keep running as if
## we were the only legit node left alive, but then we run into issues:
## with enough many empty slots, the validator pool is emptied leading
## to empty committees and lots of empty slot processing that will be
## thrown away as soon as we're synced again.
proc syncStatus*(node: BeaconNode, head: BlockRef): ChainSyncStatus =
## Generally, the problem is that slot time keeps advancing
## even when there are no blocks being produced, so there's no way to
## distinguish validators geniunely going missing from the node not being
## well connected (during a network split or an internet outage for
## example). It would generally be correct to simply keep running as if
## we were the only legit node left alive, but then we run into issues:
## with enough many empty slots, the validator pool is emptied leading
## to empty committees and lots of empty slot processing that will be
## thrown away as soon as we're synced again.
let let
# The slot we should be at, according to the clock # The slot we should be at, according to the clock
beaconTime = node.beaconClock.now() beaconTime = node.beaconClock.now()
wallSlot = beaconTime.toSlot() wallSlot = beaconTime.toSlot()
# TODO if everyone follows this logic, the network will not recover from a if not wallSlot.afterGenesis or
# halt: nobody will be producing blocks because everone expects someone head.slot + node.config.syncHorizon >= wallSlot.slot:
# else to do it node.dag.resetChainProgressWatchdog()
not wallSlot.afterGenesis or return ChainSyncStatus.Synced
head.slot + node.config.syncHorizon >= wallSlot.slot
if not node.config.proposeStale:
# Continue syncing and wait for someone else to propose the next block
return ChainSyncStatus.Syncing
let
numPeers = len(node.network.peerPool)
minPeers = max(node.config.maxPeers div 4, SyncWorkersCount * 2)
if numPeers <= minPeers:
# We may have poor connectivity, wait until more peers are available.
# This could also be intermittent, as state replays while chain is degraded
# may take significant amounts of time, during which many peers are lost
return ChainSyncStatus.Syncing
if node.dag.chainIsProgressing():
# Chain is progressing, we are out of sync
return ChainSyncStatus.Syncing
let
maxHeadSlot = node.dag.heads.foldl(max(a, b.slot), GENESIS_SLOT)
numPeersWithHigherProgress = node.network.peerPool.peers
.countIt(it != nil and it.getHeadSlot() > maxHeadSlot)
significantNumPeers = node.config.maxPeers div 8
if numPeersWithHigherProgress > significantNumPeers:
# A peer indicates that they are on a later slot, wait for sync manager
# to progress, or for it to kick the peer if they are faking the status
warn "Chain appears to have stalled, but peers indicate higher progress",
numPeersWithHigherProgress, numPeers, maxPeers = node.config.maxPeers,
head, maxHeadSlot
node.dag.resetChainProgressWatchdog()
return ChainSyncStatus.Syncing
# We are on the latest slot among all of our peers, and there has been no
# chain progress for an extended period of time.
if node.dag.incrementalState == nil:
# The head state is too far in the past to timely perform validator duties
return ChainSyncStatus.Degraded
if node.dag.incrementalState[].latest_block_id != node.dag.head.bid:
# The incremental state is not yet on the correct head (see `onSlotEnd`)
return ChainSyncStatus.Degraded
let incrementalSlot = getStateField(node.dag.incrementalState[], slot)
if incrementalSlot + node.config.syncHorizon < wallSlot.slot:
# The incremental state still needs to advance further (see `onSlotEnd`)
return ChainSyncStatus.Degraded
# It is reasonable safe to assume that the network has halted, resume duties
ChainSyncStatus.Synced
proc isSynced*(node: BeaconNode, head: BlockRef): bool =
node.syncStatus(head) == ChainSyncStatus.Synced
proc handleLightClientUpdates*(node: BeaconNode, slot: Slot) proc handleLightClientUpdates*(node: BeaconNode, slot: Slot)
{.async: (raises: [CancelledError]).} = {.async: (raises: [CancelledError]).} =