Merge branch 'dev/etan/vd-incprop' into feat/splitview
This commit is contained in:
commit
dddb8b12cc
|
@ -591,6 +591,12 @@ type
|
|||
defaultValueDesc: $defaultSyncHorizon
|
||||
name: "sync-horizon" .}: uint64
|
||||
|
||||
proposeStale* {.
|
||||
hidden
|
||||
desc: "Whether or not to propose blocks on top of a stale head. May increase memory consumption"
|
||||
defaultValue: false
|
||||
name: "debug-propose-stale" .}: bool
|
||||
|
||||
terminalTotalDifficultyOverride* {.
|
||||
hidden
|
||||
desc: "Deprecated for removal"
|
||||
|
|
|
@ -202,10 +202,17 @@ type
|
|||
## Cached state used during block clearance - must only be used in
|
||||
## clearance module
|
||||
|
||||
incrementalState*: ref ForkedHashedBeaconState
|
||||
## State used for incrementally computing a proposal state.
|
||||
## See `getProposalState`
|
||||
|
||||
updateFlags*: UpdateFlags
|
||||
|
||||
cfg*: RuntimeConfig
|
||||
|
||||
lastChainProgress*: Moment
|
||||
## Indicates the last wall time at which meaningful progress was made
|
||||
|
||||
shufflingRefs*: LRUCache[16, ShufflingRef]
|
||||
|
||||
epochRefs*: LRUCache[32, EpochRef]
|
||||
|
|
|
@ -236,6 +236,8 @@ func getBlockIdAtSlot*(dag: ChainDAGRef, slot: Slot): Opt[BlockSlotId] =
|
|||
tryWithState dag.headState
|
||||
tryWithState dag.epochRefState
|
||||
tryWithState dag.clearanceState
|
||||
if dag.incrementalState != nil:
|
||||
tryWithState dag.incrementalState[]
|
||||
|
||||
# Fallback to database, this only works for backfilled blocks
|
||||
let finlow = dag.db.finalizedBlocks.low.expect("at least tailRef written")
|
||||
|
@ -1004,6 +1006,14 @@ proc applyBlock(
|
|||
|
||||
ok()
|
||||
|
||||
proc resetChainProgressWatchdog*(dag: ChainDAGRef) =
|
||||
dag.lastChainProgress = Moment.now()
|
||||
dag.incrementalState = nil
|
||||
|
||||
proc chainIsProgressing*(dag: ChainDAGRef): bool =
|
||||
const watchdogDuration = chronos.minutes(60)
|
||||
dag.lastChainProgress + watchdogDuration >= Moment.now()
|
||||
|
||||
proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
|
||||
validatorMonitor: ref ValidatorMonitor, updateFlags: UpdateFlags,
|
||||
eraPath = ".",
|
||||
|
@ -1044,6 +1054,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
|
|||
# allow skipping some validation.
|
||||
updateFlags: updateFlags * {strictVerification},
|
||||
cfg: cfg,
|
||||
lastChainProgress: Moment.now(),
|
||||
|
||||
vanityLogs: vanityLogs,
|
||||
|
||||
|
@ -1506,6 +1517,8 @@ proc computeRandaoMixFromMemory*(
|
|||
tryWithState dag.headState
|
||||
tryWithState dag.epochRefState
|
||||
tryWithState dag.clearanceState
|
||||
if dag.incrementalState != nil:
|
||||
tryWithState dag.incrementalState[]
|
||||
|
||||
proc computeRandaoMixFromDatabase*(
|
||||
dag: ChainDAGRef, bid: BlockId, lowSlot: Slot): Opt[Eth2Digest] =
|
||||
|
@ -1577,6 +1590,8 @@ proc computeShufflingRefFromMemory*(
|
|||
tryWithState dag.headState
|
||||
tryWithState dag.epochRefState
|
||||
tryWithState dag.clearanceState
|
||||
if dag.incrementalState != nil:
|
||||
tryWithState dag.incrementalState[]
|
||||
|
||||
proc getShufflingRef*(
|
||||
dag: ChainDAGRef, blck: BlockRef, epoch: Epoch,
|
||||
|
@ -1724,6 +1739,10 @@ proc updateState*(
|
|||
elif exactMatch(dag.epochRefState, bsi):
|
||||
assign(state, dag.epochRefState)
|
||||
found = true
|
||||
elif dag.incrementalState != nil and
|
||||
exactMatch(dag.incrementalState[], bsi):
|
||||
assign(state, dag.incrementalState[])
|
||||
found = true
|
||||
|
||||
const RewindBlockThreshold = 64
|
||||
|
||||
|
@ -1756,6 +1775,12 @@ proc updateState*(
|
|||
found = true
|
||||
break
|
||||
|
||||
if dag.incrementalState != nil and
|
||||
canAdvance(dag.incrementalState[], cur):
|
||||
assign(state, dag.incrementalState[])
|
||||
found = true
|
||||
break
|
||||
|
||||
if cur.isProposed():
|
||||
# This is not an empty slot, so the block will need to be applied to
|
||||
# eventually reach bs
|
||||
|
@ -2387,6 +2412,7 @@ proc updateHead*(
|
|||
quit 1
|
||||
|
||||
dag.head = newHead
|
||||
dag.resetChainProgressWatchdog()
|
||||
|
||||
if dag.headState.is_merge_transition_complete() and not
|
||||
lastHeadMergeComplete and
|
||||
|
@ -2629,7 +2655,11 @@ proc getProposalState*(
|
|||
|
||||
# Start with the clearance state, since this one typically has been advanced
|
||||
# and thus has a hot hash tree cache
|
||||
let state = assignClone(dag.clearanceState)
|
||||
let state =
|
||||
if dag.incrementalState != nil:
|
||||
dag.incrementalState
|
||||
else:
|
||||
assignClone(dag.clearanceState)
|
||||
|
||||
var
|
||||
info = ForkedEpochInfo()
|
||||
|
@ -2651,6 +2681,8 @@ proc getProposalState*(
|
|||
dag.cfg, state[], slot, cache, info,
|
||||
{skipLastStateRootCalculation}).expect("advancing 1 slot should not fail")
|
||||
|
||||
# Ensure async operations don't interfere with `incrementalState`
|
||||
dag.resetChainProgressWatchdog()
|
||||
ok state
|
||||
|
||||
func aggregateAll*(
|
||||
|
|
|
@ -1275,13 +1275,21 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
|
|||
TOPIC_SUBSCRIBE_THRESHOLD_SLOTS = 64
|
||||
HYSTERESIS_BUFFER = 16
|
||||
|
||||
func distanceTo(headSlot: Slot, wallSlot: Slot): uint64 =
|
||||
if wallSlot > headSlot: (wallSlot - headSlot).uint64
|
||||
else: 0'u64
|
||||
|
||||
let
|
||||
head = node.dag.head
|
||||
headDistance =
|
||||
if slot > head.slot: (slot - head.slot).uint64
|
||||
else: 0'u64
|
||||
isBehind =
|
||||
headDistance > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER
|
||||
headDistance = head.slot.distanceTo(slot)
|
||||
distance =
|
||||
if node.dag.incrementalState != nil and
|
||||
node.dag.incrementalState[].latest_block_id == head.bid:
|
||||
let incrementalSlot = getStateField(node.dag.incrementalState[], slot)
|
||||
incrementalSlot.distanceTo(slot)
|
||||
else:
|
||||
headDistance
|
||||
isBehind = distance > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER
|
||||
targetGossipState =
|
||||
getTargetGossipState(
|
||||
slot.epoch,
|
||||
|
@ -1541,6 +1549,38 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
|
|||
# Update 1 epoch early to block non-fork-ready peers
|
||||
node.network.updateForkId(epoch, node.dag.genesis_validators_root)
|
||||
|
||||
if node.config.proposeStale:
|
||||
# If the chain has halted, we have to ensure that the EL gets synced
|
||||
# so that we can perform validator duties again
|
||||
if not node.dag.head.executionValid and not node.dag.chainIsProgressing():
|
||||
let beaconHead = node.attestationPool[].getBeaconHead(head)
|
||||
discard await node.consensusManager.updateExecutionClientHead(beaconHead)
|
||||
|
||||
# If the chain head is far behind, we have to advance it incrementally
|
||||
# to avoid lag spikes when performing validator duties
|
||||
if node.syncStatus(head) == ChainSyncStatus.Degraded:
|
||||
let incrementalTick = Moment.now()
|
||||
if node.dag.incrementalState == nil:
|
||||
node.dag.incrementalState = assignClone(node.dag.headState)
|
||||
elif node.dag.incrementalState[].latest_block_id != node.dag.head.bid:
|
||||
node.dag.incrementalState[].assign(node.dag.headState)
|
||||
else:
|
||||
let
|
||||
incrementalSlot = getStateField(node.dag.incrementalState[], slot)
|
||||
maxSlot = max(incrementalSlot, slot + 1)
|
||||
nextSlot = min((incrementalSlot.epoch + 1).start_slot, maxSlot)
|
||||
var
|
||||
cache: StateCache
|
||||
info: ForkedEpochInfo
|
||||
node.dag.advanceSlots(
|
||||
node.dag.incrementalState[], nextSlot, true, cache, info)
|
||||
let incrementalSlot = getStateField(node.dag.incrementalState[], slot)
|
||||
info "Head state is behind, catching up",
|
||||
headSlot = node.dag.head.slot,
|
||||
progressSlot = incrementalSlot,
|
||||
wallSlot = slot,
|
||||
dur = Moment.now() - incrementalTick
|
||||
|
||||
# When we're not behind schedule, we'll speculatively update the clearance
|
||||
# state in anticipation of receiving the next block - we do it after
|
||||
# logging slot end since the nextActionWaitTime can be short
|
||||
|
@ -1593,7 +1633,20 @@ func syncStatus(node: BeaconNode, wallSlot: Slot): string =
|
|||
" - lc: " & $shortLog(node.consensusManager[].optimisticHead)
|
||||
else:
|
||||
""
|
||||
node.syncManager.syncStatus & optimisticSuffix & lightClientSuffix
|
||||
catchingUpSuffix =
|
||||
if node.dag.incrementalState != nil:
|
||||
let
|
||||
headSlot = node.dag.head.slot
|
||||
incrementalSlot = getStateField(node.dag.incrementalState[], slot)
|
||||
progress =
|
||||
(incrementalSlot - headSlot).float /
|
||||
max(wallSlot - headSlot, 1).float * 100.float
|
||||
" - catching up: " &
|
||||
formatFloat(progress, ffDecimal, precision = 2) & "%"
|
||||
else:
|
||||
""
|
||||
node.syncManager.syncStatus & optimisticSuffix &
|
||||
lightClientSuffix & catchingUpSuffix
|
||||
elif node.backfiller.inProgress:
|
||||
"backfill: " & node.backfiller.syncStatus
|
||||
elif optimistic_head:
|
||||
|
|
|
@ -227,10 +227,13 @@ proc getGraffitiBytes*(
|
|||
getGraffiti(node.config.validatorsDir, node.config.defaultGraffitiBytes(),
|
||||
validator.pubkey)
|
||||
|
||||
proc isSynced*(node: BeaconNode, head: BlockRef): bool =
|
||||
## TODO This function is here as a placeholder for some better heurestics to
|
||||
## determine if we're in sync and should be producing blocks and
|
||||
## attestations. Generally, the problem is that slot time keeps advancing
|
||||
type ChainSyncStatus* {.pure.} = enum
|
||||
Syncing,
|
||||
Synced,
|
||||
Degraded
|
||||
|
||||
proc syncStatus*(node: BeaconNode, head: BlockRef): ChainSyncStatus =
|
||||
## Generally, the problem is that slot time keeps advancing
|
||||
## even when there are no blocks being produced, so there's no way to
|
||||
## distinguish validators geniunely going missing from the node not being
|
||||
## well connected (during a network split or an internet outage for
|
||||
|
@ -239,17 +242,65 @@ proc isSynced*(node: BeaconNode, head: BlockRef): bool =
|
|||
## with enough many empty slots, the validator pool is emptied leading
|
||||
## to empty committees and lots of empty slot processing that will be
|
||||
## thrown away as soon as we're synced again.
|
||||
|
||||
let
|
||||
# The slot we should be at, according to the clock
|
||||
beaconTime = node.beaconClock.now()
|
||||
wallSlot = beaconTime.toSlot()
|
||||
|
||||
# TODO if everyone follows this logic, the network will not recover from a
|
||||
# halt: nobody will be producing blocks because everone expects someone
|
||||
# else to do it
|
||||
not wallSlot.afterGenesis or
|
||||
head.slot + node.config.syncHorizon >= wallSlot.slot
|
||||
if not wallSlot.afterGenesis or
|
||||
head.slot + node.config.syncHorizon >= wallSlot.slot:
|
||||
node.dag.resetChainProgressWatchdog()
|
||||
return ChainSyncStatus.Synced
|
||||
|
||||
if not node.config.proposeStale:
|
||||
# Continue syncing and wait for someone else to propose the next block
|
||||
return ChainSyncStatus.Syncing
|
||||
|
||||
let
|
||||
numPeers = len(node.network.peerPool)
|
||||
minPeers = max(node.config.maxPeers div 4, SyncWorkersCount * 2)
|
||||
if numPeers <= minPeers:
|
||||
# We may have poor connectivity, wait until more peers are available.
|
||||
# This could also be intermittent, as state replays while chain is degraded
|
||||
# may take significant amounts of time, during which many peers are lost
|
||||
return ChainSyncStatus.Syncing
|
||||
|
||||
if node.dag.chainIsProgressing():
|
||||
# Chain is progressing, we are out of sync
|
||||
return ChainSyncStatus.Syncing
|
||||
|
||||
let
|
||||
maxHeadSlot = node.dag.heads.foldl(max(a, b.slot), GENESIS_SLOT)
|
||||
numPeersWithHigherProgress = node.network.peerPool.peers
|
||||
.countIt(it != nil and it.getHeadSlot() > maxHeadSlot)
|
||||
significantNumPeers = node.config.maxPeers div 8
|
||||
if numPeersWithHigherProgress > significantNumPeers:
|
||||
# A peer indicates that they are on a later slot, wait for sync manager
|
||||
# to progress, or for it to kick the peer if they are faking the status
|
||||
warn "Chain appears to have stalled, but peers indicate higher progress",
|
||||
numPeersWithHigherProgress, numPeers, maxPeers = node.config.maxPeers,
|
||||
head, maxHeadSlot
|
||||
node.dag.resetChainProgressWatchdog()
|
||||
return ChainSyncStatus.Syncing
|
||||
|
||||
# We are on the latest slot among all of our peers, and there has been no
|
||||
# chain progress for an extended period of time.
|
||||
if node.dag.incrementalState == nil:
|
||||
# The head state is too far in the past to timely perform validator duties
|
||||
return ChainSyncStatus.Degraded
|
||||
if node.dag.incrementalState[].latest_block_id != node.dag.head.bid:
|
||||
# The incremental state is not yet on the correct head (see `onSlotEnd`)
|
||||
return ChainSyncStatus.Degraded
|
||||
let incrementalSlot = getStateField(node.dag.incrementalState[], slot)
|
||||
if incrementalSlot + node.config.syncHorizon < wallSlot.slot:
|
||||
# The incremental state still needs to advance further (see `onSlotEnd`)
|
||||
return ChainSyncStatus.Degraded
|
||||
|
||||
# It is reasonable safe to assume that the network has halted, resume duties
|
||||
ChainSyncStatus.Synced
|
||||
|
||||
proc isSynced*(node: BeaconNode, head: BlockRef): bool =
|
||||
node.syncStatus(head) == ChainSyncStatus.Synced
|
||||
|
||||
proc handleLightClientUpdates*(node: BeaconNode, slot: Slot)
|
||||
{.async: (raises: [CancelledError]).} =
|
||||
|
|
Loading…
Reference in New Issue