use separate state when catching up to perform validator duties (#6131)
There are situations where all states in the `blockchain_dag` are occupied and cannot be borrowed. - headState: Many assumptions in the code that it cannot be advanced - clearanceState: Resets every time a new block gets imported, including blocks from non-canonical branches - epochRefState: Used even more frequently than clearanceState This means that during the catch-up mechanic where the head state is slowly advanced to wall clock to catch up on validator duties in the situation where the canonical head is way behind non-canonical heads, we cannot use any of the three existing states. In that situation, Nimbus already consumes an increased amount of memory due to all the `BlockRef`, fork choice states and so on, so experience is degraded. It seems reasonable to allocate a fourth state temporarily during that mechanic, until a new proposal could be made on the canonical chain. Note that currently, on `unstable`, proposals _do_ happen every couple hours because sync manager doesn't manage to discover additional heads in a split-view scenario on Goerli. However, with the branch discovery module, new blocks are discovered all the time, and the clearanceState may no longer be borrowed as it is reset to different branch too often. The extra state could also find other uses in the future, e.g., for incremental computations as in reindexing the database, or online collection of historical light client data.
This commit is contained in:
parent
c4a5bca629
commit
66a9304fea
|
@ -135,52 +135,25 @@ proc checkStateTransition(
|
|||
else:
|
||||
ok()
|
||||
|
||||
proc advanceClearanceState*(
|
||||
dag: ChainDAGRef, wallSlot: Slot, chainIsDegraded: bool) =
|
||||
proc advanceClearanceState*(dag: ChainDAGRef) =
|
||||
# When the chain is synced, the most likely block to be produced is the block
|
||||
# right after head - we can exploit this assumption and advance the state
|
||||
# to that slot before the block arrives, thus allowing us to do the expensive
|
||||
# epoch transition ahead of time.
|
||||
# Notably, we use the clearance state here because that's where the block will
|
||||
# first be seen - later, this state will be copied to the head state!
|
||||
|
||||
if chainIsDegraded and dag.clearanceState.latest_block_id != dag.head.bid:
|
||||
# The last block that was resolved may not be canonical.
|
||||
# If that's the case, we first have to copy `headState` to `clearanceState`
|
||||
assign(dag.clearanceState, dag.headState)
|
||||
|
||||
let advanced = withState(dag.clearanceState):
|
||||
forkyState.data.slot > forkyState.data.latest_block_header.slot
|
||||
if not advanced or chainIsDegraded:
|
||||
if not advanced:
|
||||
let
|
||||
clearanceSlot = getStateField(dag.clearanceState, slot)
|
||||
next =
|
||||
if not chainIsDegraded:
|
||||
clearanceSlot + 1
|
||||
else:
|
||||
# The chain seems to have halted.
|
||||
# Advance one epoch at a time to avoid long lag spikes
|
||||
# so that new blocks may be produced once more
|
||||
let maxSlot = max(clearanceSlot, wallSlot)
|
||||
min((clearanceSlot.epoch + 1).start_slot, maxSlot)
|
||||
|
||||
let startTick = Moment.now()
|
||||
startTick = Moment.now()
|
||||
next = getStateField(dag.clearanceState, slot) + 1
|
||||
var
|
||||
cache = StateCache()
|
||||
info = ForkedEpochInfo()
|
||||
|
||||
dag.advanceSlots(dag.clearanceState, next, true, cache, info)
|
||||
|
||||
logScope:
|
||||
oldSlot = clearanceSlot
|
||||
newSlot = next
|
||||
wallSlot
|
||||
updateStateDur = Moment.now() - startTick
|
||||
if not chainIsDegraded:
|
||||
debug "Prepared clearance state for next block"
|
||||
else:
|
||||
let activeBalance = withEpochInfo(info): info.balances.current_epoch
|
||||
info "Prepared clearance state for next block", activeBalance
|
||||
debug "Prepared clearance state for next block",
|
||||
next, updateStateDur = Moment.now() - startTick
|
||||
|
||||
proc checkHeadBlock*(
|
||||
dag: ChainDAGRef, signedBlock: ForkySignedBeaconBlock):
|
||||
|
|
|
@ -202,6 +202,10 @@ type
|
|||
## Cached state used during block clearance - must only be used in
|
||||
## clearance module
|
||||
|
||||
incrementalState*: ref ForkedHashedBeaconState
|
||||
## State used for intermittent results of expensive computations that
|
||||
## may take minutes - is only used if unavoidable, and nil otherwise
|
||||
|
||||
updateFlags*: UpdateFlags
|
||||
|
||||
cfg*: RuntimeConfig
|
||||
|
|
|
@ -236,6 +236,8 @@ func getBlockIdAtSlot*(dag: ChainDAGRef, slot: Slot): Opt[BlockSlotId] =
|
|||
tryWithState dag.headState
|
||||
tryWithState dag.epochRefState
|
||||
tryWithState dag.clearanceState
|
||||
if dag.incrementalState != nil:
|
||||
tryWithState dag.incrementalState[]
|
||||
|
||||
# Fallback to database, this only works for backfilled blocks
|
||||
let finlow = dag.db.finalizedBlocks.low.expect("at least tailRef written")
|
||||
|
@ -1006,6 +1008,7 @@ proc applyBlock(
|
|||
|
||||
proc resetChainProgressWatchdog*(dag: ChainDAGRef) =
|
||||
dag.lastChainProgress = Moment.now()
|
||||
dag.incrementalState = nil
|
||||
|
||||
proc chainIsProgressing*(dag: ChainDAGRef): bool =
|
||||
const watchdogDuration = chronos.minutes(60)
|
||||
|
@ -1514,6 +1517,8 @@ proc computeRandaoMixFromMemory*(
|
|||
tryWithState dag.headState
|
||||
tryWithState dag.epochRefState
|
||||
tryWithState dag.clearanceState
|
||||
if dag.incrementalState != nil:
|
||||
tryWithState dag.incrementalState[]
|
||||
|
||||
proc computeRandaoMixFromDatabase*(
|
||||
dag: ChainDAGRef, bid: BlockId, lowSlot: Slot): Opt[Eth2Digest] =
|
||||
|
@ -1585,6 +1590,8 @@ proc computeShufflingRefFromMemory*(
|
|||
tryWithState dag.headState
|
||||
tryWithState dag.epochRefState
|
||||
tryWithState dag.clearanceState
|
||||
if dag.incrementalState != nil:
|
||||
tryWithState dag.incrementalState[]
|
||||
|
||||
proc getShufflingRef*(
|
||||
dag: ChainDAGRef, blck: BlockRef, epoch: Epoch,
|
||||
|
@ -1732,6 +1739,10 @@ proc updateState*(
|
|||
elif exactMatch(dag.epochRefState, bsi):
|
||||
assign(state, dag.epochRefState)
|
||||
found = true
|
||||
elif dag.incrementalState != nil and
|
||||
exactMatch(dag.incrementalState[], bsi):
|
||||
assign(state, dag.incrementalState[])
|
||||
found = true
|
||||
|
||||
const RewindBlockThreshold = 64
|
||||
|
||||
|
@ -1764,6 +1775,12 @@ proc updateState*(
|
|||
found = true
|
||||
break
|
||||
|
||||
if dag.incrementalState != nil and
|
||||
canAdvance(dag.incrementalState[], cur):
|
||||
assign(state, dag.incrementalState[])
|
||||
found = true
|
||||
break
|
||||
|
||||
if cur.isProposed():
|
||||
# This is not an empty slot, so the block will need to be applied to
|
||||
# eventually reach bs
|
||||
|
@ -2637,7 +2654,12 @@ proc getProposalState*(
|
|||
|
||||
# Start with the clearance state, since this one typically has been advanced
|
||||
# and thus has a hot hash tree cache
|
||||
let state = assignClone(dag.clearanceState)
|
||||
let state =
|
||||
if dag.incrementalState != nil and
|
||||
dag.incrementalState[].latest_block_id == head.bid:
|
||||
assignClone(dag.incrementalState[])
|
||||
else:
|
||||
assignClone(dag.clearanceState)
|
||||
|
||||
var
|
||||
info = ForkedEpochInfo()
|
||||
|
|
|
@ -1542,26 +1542,49 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
|
|||
# Update 1 epoch early to block non-fork-ready peers
|
||||
node.network.updateForkId(epoch, node.dag.genesis_validators_root)
|
||||
|
||||
# If the chain has halted, we have to ensure that the EL gets synced
|
||||
# so that we can perform validator duties again
|
||||
if not node.dag.head.executionValid and not node.dag.chainIsProgressing():
|
||||
let beaconHead = node.attestationPool[].getBeaconHead(head)
|
||||
discard await node.consensusManager.updateExecutionClientHead(beaconHead)
|
||||
|
||||
# If the chain head is far behind, we have to advance it incrementally
|
||||
# to avoid lag spikes when performing validator duties
|
||||
if node.syncStatus(head) == ChainSyncStatus.Degraded:
|
||||
let incrementalTick = Moment.now()
|
||||
if node.dag.incrementalState == nil:
|
||||
node.dag.incrementalState = assignClone(node.dag.headState)
|
||||
elif node.dag.incrementalState[].latest_block_id != node.dag.head.bid:
|
||||
node.dag.incrementalState[].assign(node.dag.headState)
|
||||
else:
|
||||
let
|
||||
incrementalSlot = getStateField(node.dag.incrementalState[], slot)
|
||||
maxSlot = max(incrementalSlot, slot + 1)
|
||||
nextSlot = min((incrementalSlot.epoch + 1).start_slot, maxSlot)
|
||||
var
|
||||
cache: StateCache
|
||||
info: ForkedEpochInfo
|
||||
node.dag.advanceSlots(
|
||||
node.dag.incrementalState[], nextSlot, true, cache, info)
|
||||
let incrementalSlot = getStateField(node.dag.incrementalState[], slot)
|
||||
info "Head state is behind, catching up",
|
||||
headSlot = node.dag.head.slot,
|
||||
progressSlot = incrementalSlot,
|
||||
wallSlot = slot,
|
||||
dur = Moment.now() - incrementalTick
|
||||
|
||||
# When we're not behind schedule, we'll speculatively update the clearance
|
||||
# state in anticipation of receiving the next block - we do it after logging
|
||||
# slot end since the nextActionWaitTime can be short
|
||||
let
|
||||
advanceCutoff = node.beaconClock.fromNow(
|
||||
slot.start_beacon_time() + chronos.seconds(int(SECONDS_PER_SLOT - 1)))
|
||||
# state in anticipation of receiving the next block - we do it after
|
||||
# logging slot end since the nextActionWaitTime can be short
|
||||
let advanceCutoff = node.beaconClock.fromNow(
|
||||
slot.start_beacon_time() + chronos.seconds(int(SECONDS_PER_SLOT - 1)))
|
||||
if advanceCutoff.inFuture:
|
||||
# We wait until there's only a second left before the next slot begins, then
|
||||
# we advance the clearance state to the next slot - this gives us a high
|
||||
# probability of being prepared for the block that will arrive and the
|
||||
# epoch processing that follows
|
||||
await sleepAsync(advanceCutoff.offset)
|
||||
node.dag.advanceClearanceState(slot,
|
||||
chainIsDegraded = (node.syncStatus(head) == ChainSyncStatus.Degraded))
|
||||
|
||||
# If the chain has halted, we have to ensure that the EL gets synced
|
||||
# so that we can perform validator duties again
|
||||
if not node.dag.head.executionValid and not node.dag.chainIsProgressing():
|
||||
let beaconHead = node.attestationPool[].getBeaconHead(head)
|
||||
discard await node.consensusManager.updateExecutionClientHead(beaconHead)
|
||||
node.dag.advanceClearanceState()
|
||||
|
||||
# Prepare action tracker for the next slot
|
||||
node.consensusManager[].actionTracker.updateSlot(slot + 1)
|
||||
|
@ -1589,11 +1612,11 @@ func formatNextConsensusFork(
|
|||
$nextConsensusFork & ":" & $nextForkEpoch)
|
||||
|
||||
func syncStatus(node: BeaconNode, wallSlot: Slot): string =
|
||||
let optimistic_head = not node.dag.head.executionValid
|
||||
let optimisticHead = not node.dag.head.executionValid
|
||||
if node.syncManager.inProgress:
|
||||
let
|
||||
optimisticSuffix =
|
||||
if optimistic_head:
|
||||
if optimisticHead:
|
||||
"/opt"
|
||||
else:
|
||||
""
|
||||
|
@ -1602,7 +1625,20 @@ func syncStatus(node: BeaconNode, wallSlot: Slot): string =
|
|||
" - lc: " & $shortLog(node.consensusManager[].optimisticHead)
|
||||
else:
|
||||
""
|
||||
node.syncManager.syncStatus & optimisticSuffix & lightClientSuffix
|
||||
catchingUpSuffix =
|
||||
if node.dag.incrementalState != nil:
|
||||
let
|
||||
headSlot = node.dag.head.slot
|
||||
incrementalSlot = getStateField(node.dag.incrementalState[], slot)
|
||||
progress =
|
||||
(incrementalSlot - headSlot).float /
|
||||
max(wallSlot - headSlot, 1).float * 100.float
|
||||
" - catching up: " &
|
||||
formatFloat(progress, ffDecimal, precision = 2) & "%"
|
||||
else:
|
||||
""
|
||||
node.syncManager.syncStatus & optimisticSuffix &
|
||||
lightClientSuffix & catchingUpSuffix
|
||||
elif node.backfiller.inProgress:
|
||||
"backfill: " & node.backfiller.syncStatus
|
||||
elif optimistic_head:
|
||||
|
|
|
@ -278,10 +278,15 @@ proc syncStatus*(node: BeaconNode, head: BlockRef): ChainSyncStatus =
|
|||
|
||||
# We are on the latest slot among all of our peers, and there has been no
|
||||
# chain progress for an extended period of time.
|
||||
let clearanceSlot = getStateField(node.dag.clearanceState, slot)
|
||||
if clearanceSlot + node.config.syncHorizon < wallSlot.slot:
|
||||
# If we were to propose a block now, we would incur a large lag spike
|
||||
# that makes our block be way too late to be gossiped
|
||||
if node.dag.incrementalState == nil:
|
||||
# The head state is too far in the past to timely perform validator duties
|
||||
return ChainSyncStatus.Degraded
|
||||
if node.dag.incrementalState[].latest_block_id != node.dag.head.bid:
|
||||
# The incremental state is not yet on the correct head (see `onSlotEnd`)
|
||||
return ChainSyncStatus.Degraded
|
||||
let incrementalSlot = getStateField(node.dag.incrementalState[], slot)
|
||||
if incrementalSlot + node.config.syncHorizon < wallSlot.slot:
|
||||
# The incremental state still needs to advance further (see `onSlotEnd`)
|
||||
return ChainSyncStatus.Degraded
|
||||
|
||||
# It is reasonable safe to assume that the network has halted, resume duties
|
||||
|
|
Loading…
Reference in New Issue