add option to configure max historic LC data periods (#3799)

Adds a `--light-client-data-max-periods` option to override the number
of sync committee periods to retain light client data.
Raising it above the default enables archive nodes to serve full data.
Lowering below the default speeds up import times (still no persistence)
This commit is contained in:
Etan Kissling 2022-06-27 13:24:38 +02:00 committed by GitHub
parent b318a74d83
commit 91d543440a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 379 additions and 368 deletions

View File

@ -456,6 +456,11 @@ type
"Must be one of: none, only-new, full (slow startup), on-demand (may miss validator duties)" "Must be one of: none, only-new, full (slow startup), on-demand (may miss validator duties)"
name: "light-client-data-import-mode" .}: Option[LightClientDataImportMode] name: "light-client-data-import-mode" .}: Option[LightClientDataImportMode]
lightClientDataMaxPeriods* {.
hidden
desc: "BETA: Maximum number of sync committee periods to retain light client data"
name: "light-client-data-max-periods" .}: Option[uint64]
inProcessValidators* {. inProcessValidators* {.
desc: "Disable the push model (the beacon node tells a signing process with the private keys of the validators what to sign and when) and load the validators in the beacon node itself" desc: "Disable the push model (the beacon node tells a signing process with the private keys of the validators what to sign and when) and load the validators in the beacon node itself"
defaultValue: true # the use of the nimbus_signing_process binary by default will be delayed until async I/O over stdin/stdout is developed for the child process. defaultValue: true # the use of the nimbus_signing_process binary by default will be delayed until async I/O over stdin/stdout is developed for the child process.

View File

@ -77,7 +77,7 @@ type
## Tracks light client data for the latest slot that was signed by ## Tracks light client data for the latest slot that was signed by
## at least `MIN_SYNC_COMMITTEE_PARTICIPANTS`. May be older than head. ## at least `MIN_SYNC_COMMITTEE_PARTICIPANTS`. May be older than head.
importTailSlot*: Slot tailSlot*: Slot
## The earliest slot for which light client data is imported. ## The earliest slot for which light client data is imported.
LightClientDataStore* = object LightClientDataStore* = object
@ -94,6 +94,8 @@ type
## Whether to make local light client data available or not ## Whether to make local light client data available or not
importMode*: LightClientDataImportMode importMode*: LightClientDataImportMode
## Which classes of light client data to import ## Which classes of light client data to import
maxPeriods*: uint64
## Maximum number of sync committee periods to retain light client data
# ----------------------------------- # -----------------------------------
# Callbacks # Callbacks

View File

@ -689,6 +689,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
onLCOptimisticUpdateCb: OnLightClientOptimisticUpdateCallback = nil, onLCOptimisticUpdateCb: OnLightClientOptimisticUpdateCallback = nil,
lightClientDataServe = false, lightClientDataServe = false,
lightClientDataImportMode = LightClientDataImportMode.None, lightClientDataImportMode = LightClientDataImportMode.None,
lightClientDataMaxPeriods = none(uint64),
vanityLogs = default(VanityLogs)): ChainDAGRef = vanityLogs = default(VanityLogs)): ChainDAGRef =
cfg.checkForkConsistency() cfg.checkForkConsistency()
@ -728,6 +729,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
lcDataStore: initLightClientDataStore( lcDataStore: initLightClientDataStore(
serve = lightClientDataServe, serve = lightClientDataServe,
importMode = lightClientDataImportMode, importMode = lightClientDataImportMode,
maxPeriods = lightClientDataMaxPeriods.get(cfg.defaultLCDataMaxPeriods),
onLCFinalityUpdateCb = onLCFinalityUpdateCb, onLCFinalityUpdateCb = onLCFinalityUpdateCb,
onLCOptimisticUpdateCb = onLCOptimisticUpdateCb), onLCOptimisticUpdateCb = onLCOptimisticUpdateCb),

View File

@ -34,28 +34,6 @@ template nextEpochBoundarySlot(slot: Slot): Slot =
## referring to a block at given slot. ## referring to a block at given slot.
(slot + (SLOTS_PER_EPOCH - 1)).epoch.start_slot (slot + (SLOTS_PER_EPOCH - 1)).epoch.start_slot
func computeEarliestLightClientSlot(dag: ChainDAGRef): Slot =
## Compute the earliest slot for which light client data is retained.
let
minSupportedSlot = max(
dag.cfg.ALTAIR_FORK_EPOCH.start_slot,
dag.lcDataStore.cache.importTailSlot)
currentSlot = getStateField(dag.headState, slot)
if currentSlot < minSupportedSlot:
return minSupportedSlot
let
MIN_EPOCHS_FOR_BLOCK_REQUESTS =
dag.cfg.MIN_VALIDATOR_WITHDRAWABILITY_DELAY +
dag.cfg.CHURN_LIMIT_QUOTIENT div 2
MIN_SLOTS_FOR_BLOCK_REQUESTS =
MIN_EPOCHS_FOR_BLOCK_REQUESTS * SLOTS_PER_EPOCH
if currentSlot - minSupportedSlot < MIN_SLOTS_FOR_BLOCK_REQUESTS:
return minSupportedSlot
let earliestSlot = currentSlot - MIN_SLOTS_FOR_BLOCK_REQUESTS
max(earliestSlot.sync_committee_period.start_slot, minSupportedSlot)
proc updateExistingState( proc updateExistingState(
dag: ChainDAGRef, state: var ForkedHashedBeaconState, bsi: BlockSlotId, dag: ChainDAGRef, state: var ForkedHashedBeaconState, bsi: BlockSlotId,
save: bool, cache: var StateCache): bool = save: bool, cache: var StateCache): bool =
@ -145,6 +123,245 @@ proc syncCommitteeRootForPeriod(
else: raiseAssert "Unreachable" else: raiseAssert "Unreachable"
do: err() do: err()
func initLightClientDataStore*(
serve: bool, importMode: LightClientDataImportMode, maxPeriods: uint64,
onLCFinalityUpdateCb: OnLightClientFinalityUpdateCallback = nil,
onLCOptimisticUpdateCb: OnLightClientOptimisticUpdateCallback = nil
): LightClientDataStore =
## Initialize light client data store.
LightClientDataStore(
serve: serve,
importMode: importMode,
maxPeriods: maxPeriods,
onLightClientFinalityUpdate: onLCFinalityUpdateCb,
onLightClientOptimisticUpdate: onLCOptimisticUpdateCb)
func targetLightClientTailSlot(dag: ChainDAGRef): Slot =
## Earliest slot for which light client data is retained.
let
maxPeriods = dag.lcDataStore.maxPeriods
headPeriod = dag.head.slot.sync_committee_period
lowSlot = max(dag.tail.slot, dag.cfg.ALTAIR_FORK_EPOCH.start_slot)
tail = max(headPeriod + 1, maxPeriods.SyncCommitteePeriod) - maxPeriods
max(tail.start_slot, lowSlot)
func handleUnexpectedLightClientError(dag: ChainDAGRef, buggedSlot: Slot) =
## If there is an unexpected error, adjust `tailSlot` to keep track of the
## section for which complete light client data is available, and to avoid
## failed lookups of cached light client data.
doAssert verifyFinalization notin dag.updateFlags
if buggedSlot >= dag.lcDataStore.cache.tailSlot:
dag.lcDataStore.cache.tailSlot = buggedSlot + 1
proc initLightClientBootstrapForPeriod(
dag: ChainDAGRef,
period: SyncCommitteePeriod) =
## Compute and cache `LightClientBootstrap` data for all finalized
## epoch boundary blocks within a given sync committee period.
if not dag.isNextSyncCommitteeFinalized(period):
return
let startTick = Moment.now()
debug "Caching historic LC bootstrap data", period
defer:
let endTick = Moment.now()
debug "Historic LC bootstrap data cached", period,
cacheDur = endTick - startTick
let
periodStartSlot = period.start_slot
periodEndSlot = periodStartSlot + SLOTS_PER_SYNC_COMMITTEE_PERIOD - 1
lowSlot = max(periodStartSlot, dag.targetLightClientTailSlot)
highSlot = min(periodEndSlot, dag.finalizedHead.blck.slot)
lowBoundarySlot = lowSlot.nextEpochBoundarySlot
highBoundarySlot = highSlot.nextEpochBoundarySlot
var
tmpState = assignClone(dag.headState)
tmpCache: StateCache
nextBoundarySlot = lowBoundarySlot
while nextBoundarySlot <= highBoundarySlot:
defer: nextBoundarySlot += SLOTS_PER_EPOCH
let
bsi = dag.getExistingBlockIdAtSlot(nextBoundarySlot).valueOr:
dag.handleUnexpectedLightClientError(nextBoundarySlot)
continue
bid = bsi.bid
boundarySlot = bid.slot.nextEpochBoundarySlot
if boundarySlot == nextBoundarySlot and bid.slot >= lowSlot and
not dag.lcDataStore.cache.bootstrap.hasKey(bid.slot):
if not dag.updateExistingState(
tmpState[], bid.atSlot, save = false, tmpCache):
dag.handleUnexpectedLightClientError(bid.slot)
continue
var cachedBootstrap {.noinit.}: CachedLightClientBootstrap
cachedBootstrap.current_sync_committee_branch = withState(tmpState[]):
when stateFork >= BeaconStateFork.Altair:
state.data.build_proof(altair.CURRENT_SYNC_COMMITTEE_INDEX).get
else: raiseAssert "Unreachable"
dag.lcDataStore.cache.bootstrap[bid.slot] = cachedBootstrap
proc initLightClientUpdateForPeriod(
dag: ChainDAGRef, period: SyncCommitteePeriod) =
## Compute and cache the best `LightClientUpdate` within a given
## sync committee period up through the finalized head block.
## Non-finalized blocks are processed incrementally.
if not dag.isNextSyncCommitteeFinalized(period):
return
if dag.lcDataStore.cache.best.hasKey(period):
return
let startTick = Moment.now()
debug "Computing best historic LC update", period
proc logBest(endTick = Moment.now()) =
# Using a helper function reduces code size as the `defer` beneath is
# replicated on every `return`, and the log statement allocates another
# copy of the arguments on the stack for each instantiation (~1 MB stack!)
debug "Best historic LC update computed",
period, update = dag.lcDataStore.cache.best.getOrDefault(period),
computeDur = endTick - startTick
defer: logBest()
proc maxParticipantsBlock(
dag: ChainDAGRef, highBid: BlockId, lowSlot: Slot
): tuple[bid: Opt[BlockId], ok: bool] =
## Determine the earliest block with most sync committee signatures among
## ancestors of `highBid` with at least `lowSlot` as parent block slot.
## Return `err` if no block with `MIN_SYNC_COMMITTEE_PARTICIPANTS` exists.
## `bool` in result indicates whether no unexpected errors occurred.
var
maxParticipants = MIN_SYNC_COMMITTEE_PARTICIPANTS
maxBid: Opt[BlockId]
allOk = true
bid = highBid
while true:
if bid.slot <= lowSlot:
break
let parentBid = dag.existingParent(bid).valueOr:
dag.handleUnexpectedLightClientError(bid.slot)
allOk = false
break
if parentBid.slot < lowSlot:
break
let
bdata = dag.getExistingForkedBlock(bid).valueOr:
dag.handleUnexpectedLightClientError(bid.slot)
allOk = false
break
numParticipants =
withBlck(bdata):
when stateFork >= BeaconStateFork.Altair:
countOnes(blck.message.body.sync_aggregate.sync_committee_bits)
else: raiseAssert "Unreachable"
if numParticipants >= maxParticipants:
maxParticipants = numParticipants
maxBid.ok bid
bid = parentBid
(bid: maxBid, ok: allOk)
# Determine the block in the period with highest sync committee participation
let
periodStartSlot = period.start_slot
periodEndSlot = periodStartSlot + SLOTS_PER_SYNC_COMMITTEE_PERIOD - 1
lowSlot = max(periodStartSlot, dag.targetLightClientTailSlot)
highSlot = min(periodEndSlot, dag.finalizedHead.blck.slot)
highBsi = dag.getExistingBlockIdAtSlot(highSlot).valueOr:
dag.handleUnexpectedLightClientError(highSlot)
return
highBid = highBsi.bid
maxParticipantsRes = dag.maxParticipantsBlock(highBid, lowSlot)
maxParticipantsBid = maxParticipantsRes.bid.valueOr:
if maxParticipantsRes.ok: # No single valid block exists in the period
dag.lcDataStore.cache.best[period] = default(altair.LightClientUpdate)
return
# The block with highest participation may refer to a `finalized_checkpoint`
# in a different sync committee period. If that is the case, search for a
# later block with a `finalized_checkpoint` within the given sync committee
# period, despite it having a lower sync committee participation
var
tmpState = assignClone(dag.headState)
signatureBid {.noinit.}, finalizedBid {.noinit.}: BlockId
signatureBid.slot = FAR_FUTURE_SLOT
finalizedBid.slot = FAR_FUTURE_SLOT
while true:
if signatureBid.slot == FAR_FUTURE_SLOT:
signatureBid = maxParticipantsBid
else:
let
nextLowSlot = signatureBid.slot + 1
signatureRes = dag.maxParticipantsBlock(highBid, nextLowSlot)
signatureBid = signatureRes.bid.valueOr:
signatureBid = maxParticipantsBid
break
let
attestedBid = dag.existingParent(signatureBid).valueOr:
dag.handleUnexpectedLightClientError(signatureBid.slot)
continue
finalizedEpoch = block:
dag.withUpdatedExistingState(tmpState[], attestedBid.atSlot) do:
withState(state):
when stateFork >= BeaconStateFork.Altair:
state.data.finalized_checkpoint.epoch
else: raiseAssert "Unreachable"
do:
dag.handleUnexpectedLightClientError(attestedBid.slot)
continue
finalizedSlot = finalizedEpoch.start_slot
finalizedBsi =
if finalizedSlot >= dag.tail.slot:
dag.getExistingBlockIdAtSlot(finalizedSlot).valueOr:
dag.handleUnexpectedLightClientError(finalizedSlot)
continue
else:
continue
if finalizedBid.slot >= lowSlot:
finalizedBid = finalizedBsi.bid
break
if signatureBid == maxParticipantsBid:
finalizedBid = finalizedBsi.bid # For fallback `break` at start of loop
# Save best light client data for given period
var update {.noinit.}: altair.LightClientUpdate
let attestedBid = dag.existingParent(signatureBid).valueOr:
dag.handleUnexpectedLightClientError(signatureBid.slot)
return
dag.withUpdatedExistingState(tmpState[], attestedBid.atSlot) do:
let bdata = dag.getExistingForkedBlock(bid).valueOr:
dag.handleUnexpectedLightClientError(bid.slot)
return
withStateAndBlck(state, bdata):
when stateFork >= BeaconStateFork.Altair:
update.attested_header = blck.toBeaconBlockHeader()
update.next_sync_committee = state.data.next_sync_committee
update.next_sync_committee_branch =
state.data.build_proof(altair.NEXT_SYNC_COMMITTEE_INDEX).get
if finalizedBid.slot == FAR_FUTURE_SLOT:
update.finality_branch.reset()
else:
update.finality_branch =
state.data.build_proof(altair.FINALIZED_ROOT_INDEX).get
else: raiseAssert "Unreachable"
do:
dag.handleUnexpectedLightClientError(attestedBid.slot)
return
if finalizedBid.slot == FAR_FUTURE_SLOT or finalizedBid.slot == GENESIS_SLOT:
update.finalized_header.reset()
else:
let bdata = dag.getExistingForkedBlock(finalizedBid).valueOr:
dag.handleUnexpectedLightClientError(finalizedBid.slot)
return
withBlck(bdata):
update.finalized_header = blck.toBeaconBlockHeader()
let bdata = dag.getExistingForkedBlock(signatureBid).valueOr:
dag.handleUnexpectedLightClientError(signatureBid.slot)
return
withBlck(bdata):
when stateFork >= BeaconStateFork.Altair:
update.sync_aggregate = blck.asSigned().message.body.sync_aggregate
else: raiseAssert "Unreachable"
update.signature_slot = signatureBid.slot
dag.lcDataStore.cache.best[period] = update
proc getLightClientData( proc getLightClientData(
dag: ChainDAGRef, dag: ChainDAGRef,
bid: BlockId): CachedLightClientData = bid: BlockId): CachedLightClientData =
@ -178,14 +395,6 @@ proc deleteLightClientData*(dag: ChainDAGRef, bid: BlockId) =
dag.lcDataStore.cache.data.del bid dag.lcDataStore.cache.data.del bid
func handleUnexpectedLightClientError(dag: ChainDAGRef, buggedSlot: Slot) =
## If there is an unexpected error, adjust `importTailSlot` to keep track of
## section for which complete light client data is available, and to avoid
## failed lookups of cached light client data.
doAssert verifyFinalization notin dag.updateFlags
if buggedSlot >= dag.lcDataStore.cache.importTailSlot:
dag.lcDataStore.cache.importTailSlot = buggedSlot + 1
template lazy_header(name: untyped): untyped {.dirty.} = template lazy_header(name: untyped): untyped {.dirty.} =
## `createLightClientUpdates` helper to lazily load a known block header. ## `createLightClientUpdates` helper to lazily load a known block header.
var var
@ -246,10 +455,8 @@ proc createLightClientUpdates(
# Verify attested block (parent) is recent enough and that state is available # Verify attested block (parent) is recent enough and that state is available
template attested_bid(): auto = parent_bid template attested_bid(): auto = parent_bid
let let attested_slot = attested_bid.slot
earliest_slot = dag.computeEarliestLightClientSlot if attested_slot < dag.lcDataStore.cache.tailSlot:
attested_slot = attested_bid.slot
if attested_slot < earliest_slot:
return return
# Lazy variables to hold historic data # Lazy variables to hold historic data
@ -337,19 +544,115 @@ proc createLightClientUpdates(
if isCommitteeFinalized: if isCommitteeFinalized:
dag.lcDataStore.cache.best[attested_period] = best dag.lcDataStore.cache.best[attested_period] = best
debug "Best LC update for period improved", debug "Best LC update improved", period = attested_period, update = best
period = attested_period, update = best
else: else:
let key = (attested_period, state.syncCommitteeRoot) let key = (attested_period, state.syncCommitteeRoot)
dag.lcDataStore.cache.pendingBest[key] = best dag.lcDataStore.cache.pendingBest[key] = best
debug "Best LC update for period improved", debug "Best LC update improved", period = key, update = best
period = key, update = best
if newFinality and dag.lcDataStore.onLightClientFinalityUpdate != nil: if newFinality and dag.lcDataStore.onLightClientFinalityUpdate != nil:
dag.lcDataStore.onLightClientFinalityUpdate(latest) dag.lcDataStore.onLightClientFinalityUpdate(latest)
if newOptimistic and dag.lcDataStore.onLightClientOptimisticUpdate != nil: if newOptimistic and dag.lcDataStore.onLightClientOptimisticUpdate != nil:
dag.lcDataStore.onLightClientOptimisticUpdate(latest.toOptimistic) dag.lcDataStore.onLightClientOptimisticUpdate(latest.toOptimistic)
proc initLightClientDataCache*(dag: ChainDAGRef) =
## Initialize cached light client data
if dag.lcDataStore.importMode == LightClientDataImportMode.None:
return
# Initialize tail slot
let targetTailSlot = dag.targetLightClientTailSlot
dag.lcDataStore.cache.tailSlot = max(dag.head.slot, targetTailSlot)
# Import head state
if dag.head.slot < dag.lcDataStore.cache.tailSlot:
return
withState(dag.headState):
when stateFork >= BeaconStateFork.Altair:
dag.cacheLightClientData(state, dag.head.bid)
else: raiseAssert "Unreachable" # `tailSlot` cannot be before Altair
if dag.lcDataStore.importMode == LightClientDataImportMode.OnlyNew:
return
# Import light client data for finalized period through finalized head
let finalizedSlot = max(dag.finalizedHead.blck.slot, targetTailSlot)
if finalizedSlot >= dag.lcDataStore.cache.tailSlot:
return
dag.lcDataStore.cache.tailSlot = finalizedSlot
let finalizedPeriod = finalizedSlot.sync_committee_period
dag.initLightClientBootstrapForPeriod(finalizedPeriod)
dag.initLightClientUpdateForPeriod(finalizedPeriod)
let lightClientStartTick = Moment.now()
logScope: lightClientDataMaxPeriods = dag.lcDataStore.maxPeriods
debug "Initializing cached LC data"
# Build list of block to process.
# As it is slow to load states in descending order,
# build a reverse todo list to then process them in ascending order
var
blocks = newSeqOfCap[BlockId](dag.head.slot - finalizedSlot + 1)
bid = dag.head.bid
while bid.slot > finalizedSlot:
blocks.add bid
bid = dag.existingParent(bid).valueOr:
dag.handleUnexpectedLightClientError(bid.slot)
break
if bid.slot == finalizedSlot:
blocks.add bid
# Process blocks (reuses `dag.headState`, but restores it to the current head)
var
tmpState = assignClone(dag.headState)
tmpCache, cache: StateCache
oldCheckpoint: Checkpoint
cpIndex = 0
for i in countdown(blocks.high, blocks.low):
bid = blocks[i]
if not dag.updateExistingState(
dag.headState, bid.atSlot, save = false, cache):
dag.handleUnexpectedLightClientError(bid.slot)
continue
let bdata = dag.getExistingForkedBlock(bid).valueOr:
dag.handleUnexpectedLightClientError(bid.slot)
continue
withStateAndBlck(dag.headState, bdata):
when stateFork >= BeaconStateFork.Altair:
# Cache light client data (non-finalized blocks may refer to this)
if i != blocks.low:
dag.cacheLightClientData(state, bid) # `dag.head` already cached
# Create `LightClientUpdate` instances
if i < blocks.high:
dag.createLightClientUpdates(state, blck, parentBid = blocks[i + 1])
else: raiseAssert "Unreachable"
let lightClientEndTick = Moment.now()
debug "Initialized cached LC data",
initDur = lightClientEndTick - lightClientStartTick
if dag.lcDataStore.importMode == LightClientDataImportMode.OnDemand:
return
# Import historic data
dag.lcDataStore.cache.tailSlot = targetTailSlot
let targetTailPeriod = targetTailSlot.sync_committee_period
if targetTailPeriod < finalizedPeriod:
# `countdown` through 0 fails on distinct `uint64`
# https://github.com/nim-lang/Nim/pull/19926
var period = finalizedPeriod - 1
while period >= targetTailPeriod:
dag.initLightClientBootstrapForPeriod(period)
dag.initLightClientUpdateForPeriod(period)
if period <= targetTailPeriod:
break
dec period
if dag.lcDataStore.cache.tailSlot == targetTailSlot:
debug "Historic LC data imported"
else:
# `handleUnexpectedLightClientError` updates `tailSlot`
warn "Error while importing historic LC data",
errorSlot = dag.lcDataStore.cache.tailSlot - 1, targetTailSlot
proc processNewBlockForLightClient*( proc processNewBlockForLightClient*(
dag: ChainDAGRef, dag: ChainDAGRef,
state: ForkedHashedBeaconState, state: ForkedHashedBeaconState,
@ -358,7 +661,7 @@ proc processNewBlockForLightClient*(
## Update light client data with information from a new block. ## Update light client data with information from a new block.
if dag.lcDataStore.importMode == LightClientDataImportMode.None: if dag.lcDataStore.importMode == LightClientDataImportMode.None:
return return
if signedBlock.message.slot < dag.computeEarliestLightClientSlot: if signedBlock.message.slot < dag.lcDataStore.cache.tailSlot:
return return
when signedBlock is bellatrix.TrustedSignedBeaconBlock: when signedBlock is bellatrix.TrustedSignedBeaconBlock:
@ -368,7 +671,7 @@ proc processNewBlockForLightClient*(
dag.cacheLightClientData(state.altairData, signedBlock.toBlockId()) dag.cacheLightClientData(state.altairData, signedBlock.toBlockId())
dag.createLightClientUpdates(state.altairData, signedBlock, parentBid) dag.createLightClientUpdates(state.altairData, signedBlock, parentBid)
elif signedBlock is phase0.TrustedSignedBeaconBlock: elif signedBlock is phase0.TrustedSignedBeaconBlock:
raiseAssert "Unreachable" # `earliestSlot` cannot be before Altair raiseAssert "Unreachable" # `tailSlot` cannot be before Altair
else: else:
{.error: "Unreachable".} {.error: "Unreachable".}
@ -377,8 +680,7 @@ proc processHeadChangeForLightClient*(dag: ChainDAGRef) =
## Note that `dag.finalizedHead` is not yet updated when this is called. ## Note that `dag.finalizedHead` is not yet updated when this is called.
if dag.lcDataStore.importMode == LightClientDataImportMode.None: if dag.lcDataStore.importMode == LightClientDataImportMode.None:
return return
let earliestSlot = dag.computeEarliestLightClientSlot if dag.head.slot < dag.lcDataStore.cache.tailSlot:
if dag.head.slot < earliestSlot:
return return
# Update `best` from `pendingBest` to ensure light client data # Update `best` from `pendingBest` to ensure light client data
@ -386,8 +688,8 @@ proc processHeadChangeForLightClient*(dag: ChainDAGRef) =
let headPeriod = dag.head.slot.sync_committee_period let headPeriod = dag.head.slot.sync_committee_period
if not dag.isNextSyncCommitteeFinalized(headPeriod): if not dag.isNextSyncCommitteeFinalized(headPeriod):
let let
earliestPeriod = earliestSlot.sync_committee_period tailPeriod = dag.lcDataStore.cache.tailSlot.sync_committee_period
lowPeriod = max(dag.firstNonFinalizedPeriod, earliestPeriod) lowPeriod = max(dag.firstNonFinalizedPeriod, tailPeriod)
if headPeriod > lowPeriod: if headPeriod > lowPeriod:
var tmpState = assignClone(dag.headState) var tmpState = assignClone(dag.headState)
for period in lowPeriod ..< headPeriod: for period in lowPeriod ..< headPeriod:
@ -404,7 +706,7 @@ proc processHeadChangeForLightClient*(dag: ChainDAGRef) =
let key = (headPeriod, state.syncCommitteeRoot) let key = (headPeriod, state.syncCommitteeRoot)
dag.lcDataStore.cache.best[headPeriod] = dag.lcDataStore.cache.best[headPeriod] =
dag.lcDataStore.cache.pendingBest.getOrDefault(key) dag.lcDataStore.cache.pendingBest.getOrDefault(key)
else: raiseAssert "Unreachable" else: raiseAssert "Unreachable" # `tailSlot` cannot be before Altair
proc processFinalizationForLightClient*( proc processFinalizationForLightClient*(
dag: ChainDAGRef, oldFinalizedHead: BlockSlot) = dag: ChainDAGRef, oldFinalizedHead: BlockSlot) =
@ -413,14 +715,14 @@ proc processFinalizationForLightClient*(
## This needs to be called whenever `finalized_checkpoint` changes. ## This needs to be called whenever `finalized_checkpoint` changes.
if dag.lcDataStore.importMode == LightClientDataImportMode.None: if dag.lcDataStore.importMode == LightClientDataImportMode.None:
return return
let let finalizedSlot = dag.finalizedHead.slot
earliestSlot = dag.computeEarliestLightClientSlot if finalizedSlot < dag.lcDataStore.cache.tailSlot:
finalizedSlot = dag.finalizedHead.slot
if finalizedSlot < earliestSlot:
return return
# Cache `LightClientBootstrap` for newly finalized epoch boundary blocks # Cache `LightClientBootstrap` for newly finalized epoch boundary blocks
let lowSlot = max(oldFinalizedHead.slot + 1, earliestSlot) let
firstNewSlot = oldFinalizedHead.slot + 1
lowSlot = max(firstNewSlot, dag.lcDataStore.cache.tailSlot)
var boundarySlot = finalizedSlot var boundarySlot = finalizedSlot
while boundarySlot >= lowSlot: while boundarySlot >= lowSlot:
let let
@ -447,19 +749,22 @@ proc processFinalizationForLightClient*(
for bid in bidsToDelete: for bid in bidsToDelete:
dag.lcDataStore.cache.data.del bid dag.lcDataStore.cache.data.del bid
let
targetTailSlot = dag.targetLightClientTailSlot
targetTailPeriod = targetTailSlot.sync_committee_period
# Prune bootstrap data that is no longer relevant # Prune bootstrap data that is no longer relevant
var slotsToDelete: seq[Slot] var slotsToDelete: seq[Slot]
for slot in dag.lcDataStore.cache.bootstrap.keys: for slot in dag.lcDataStore.cache.bootstrap.keys:
if slot < earliestSlot: if slot < targetTailSlot:
slotsToDelete.add slot slotsToDelete.add slot
for slot in slotsToDelete: for slot in slotsToDelete:
dag.lcDataStore.cache.bootstrap.del slot dag.lcDataStore.cache.bootstrap.del slot
# Prune best `LightClientUpdate` that are no longer relevant # Prune best `LightClientUpdate` that are no longer relevant
let earliestPeriod = earliestSlot.sync_committee_period
var periodsToDelete: seq[SyncCommitteePeriod] var periodsToDelete: seq[SyncCommitteePeriod]
for period in dag.lcDataStore.cache.best.keys: for period in dag.lcDataStore.cache.best.keys:
if period < earliestPeriod: if period < targetTailPeriod:
periodsToDelete.add period periodsToDelete.add period
for period in periodsToDelete: for period in periodsToDelete:
dag.lcDataStore.cache.best.del period dag.lcDataStore.cache.best.del period
@ -474,318 +779,6 @@ proc processFinalizationForLightClient*(
for key in keysToDelete: for key in keysToDelete:
dag.lcDataStore.cache.pendingBest.del key dag.lcDataStore.cache.pendingBest.del key
func initLightClientDataStore*(
serve: bool, importMode: LightClientDataImportMode,
onLCFinalityUpdateCb: OnLightClientFinalityUpdateCallback = nil,
onLCOptimisticUpdateCb: OnLightClientOptimisticUpdateCallback = nil
): LightClientDataStore =
## Initialize light client data collector.
LightClientDataStore(
serve: serve,
importMode: importMode,
onLightClientFinalityUpdate: onLCFinalityUpdateCb,
onLightClientOptimisticUpdate: onLCOptimisticUpdateCb)
proc initLightClientBootstrapForPeriod(
dag: ChainDAGRef,
period: SyncCommitteePeriod) =
## Compute and cache `LightClientBootstrap` data for all finalized
## epoch boundary blocks within a given sync committee period.
if not dag.isNextSyncCommitteeFinalized(period):
return
let
earliestSlot = dag.computeEarliestLightClientSlot
periodStartSlot = period.start_slot
periodEndSlot = periodStartSlot + SLOTS_PER_SYNC_COMMITTEE_PERIOD - 1
if periodEndSlot < earliestSlot:
return
let startTick = Moment.now()
debug "Caching LC bootstrap data for period", period
defer:
let endTick = Moment.now()
debug "LC bootstrap data for period cached", period,
cacheDur = endTick - startTick
let
lowSlot = max(periodStartSlot, earliestSlot)
highSlot = min(periodEndSlot, dag.finalizedHead.blck.slot)
lowBoundarySlot = lowSlot.nextEpochBoundarySlot
highBoundarySlot = highSlot.nextEpochBoundarySlot
var
tmpState = assignClone(dag.headState)
tmpCache: StateCache
nextBoundarySlot = lowBoundarySlot
while nextBoundarySlot <= highBoundarySlot:
defer: nextBoundarySlot += SLOTS_PER_EPOCH
let
bsi = dag.getExistingBlockIdAtSlot(nextBoundarySlot).valueOr:
dag.handleUnexpectedLightClientError(nextBoundarySlot)
continue
bid = bsi.bid
boundarySlot = bid.slot.nextEpochBoundarySlot
if boundarySlot == nextBoundarySlot and bid.slot >= lowSlot and
not dag.lcDataStore.cache.bootstrap.hasKey(bid.slot):
if not dag.updateExistingState(
tmpState[], bid.atSlot, save = false, tmpCache):
dag.handleUnexpectedLightClientError(bid.slot)
continue
var cachedBootstrap {.noinit.}: CachedLightClientBootstrap
cachedBootstrap.current_sync_committee_branch = withState(tmpState[]):
when stateFork >= BeaconStateFork.Altair:
state.data.build_proof(altair.CURRENT_SYNC_COMMITTEE_INDEX).get
else: raiseAssert "Unreachable"
dag.lcDataStore.cache.bootstrap[bid.slot] = cachedBootstrap
proc initLightClientUpdateForPeriod(
dag: ChainDAGRef, period: SyncCommitteePeriod) =
## Compute and cache the best `LightClientUpdate` within a given
## sync committee period up through the finalized head block.
## Non-finalized blocks are processed incrementally.
if not dag.isNextSyncCommitteeFinalized(period):
return
let
earliestSlot = dag.computeEarliestLightClientSlot
periodStartSlot = period.start_slot
periodEndSlot = periodStartSlot + SLOTS_PER_SYNC_COMMITTEE_PERIOD - 1
if periodEndSlot < earliestSlot:
return
if dag.lcDataStore.cache.best.hasKey(period):
return
let startTick = Moment.now()
debug "Computing best LC update for period", period
proc logBest(endTick = Moment.now()) =
# Using a helper function reduces code size as the `defer` beneath is
# replicated on every `return`, and the log statement allocates another
# copy of the arguments on the stack for each instantiation (~1 MB stack!)
debug "Best LC update for period computed",
period, update = dag.lcDataStore.cache.best.getOrDefault(period),
computeDur = endTick - startTick
defer: logBest()
proc maxParticipantsBlock(
dag: ChainDAGRef, highBid: BlockId, lowSlot: Slot
): tuple[bid: Opt[BlockId], ok: bool] =
## Determine the earliest block with most sync committee signatures among
## ancestors of `highBid` with at least `lowSlot` as parent block slot.
## Return `err` if no block with `MIN_SYNC_COMMITTEE_PARTICIPANTS` exists.
## `bool` in result indicates whether no unexpected errors occurred.
var
maxParticipants = MIN_SYNC_COMMITTEE_PARTICIPANTS
maxBid: Opt[BlockId]
allOk = true
bid = highBid
while true:
if bid.slot <= lowSlot:
break
let parentBid = dag.existingParent(bid).valueOr:
dag.handleUnexpectedLightClientError(bid.slot)
allOk = false
break
if parentBid.slot < lowSlot:
break
let
bdata = dag.getExistingForkedBlock(bid).valueOr:
dag.handleUnexpectedLightClientError(bid.slot)
allOk = false
break
numParticipants =
withBlck(bdata):
when stateFork >= BeaconStateFork.Altair:
countOnes(blck.message.body.sync_aggregate.sync_committee_bits)
else: raiseAssert "Unreachable"
if numParticipants >= maxParticipants:
maxParticipants = numParticipants
maxBid.ok bid
bid = parentBid
(bid: maxBid, ok: allOk)
# Determine the block in the period with highest sync committee participation
let
lowSlot = max(periodStartSlot, earliestSlot)
highSlot = min(periodEndSlot, dag.finalizedHead.blck.slot)
highBsi = dag.getExistingBlockIdAtSlot(highSlot).valueOr:
dag.handleUnexpectedLightClientError(highSlot)
return
highBid = highBsi.bid
maxParticipantsRes = dag.maxParticipantsBlock(highBid, lowSlot)
maxParticipantsBid = maxParticipantsRes.bid.valueOr:
if maxParticipantsRes.ok: # No single valid block exists in the period
dag.lcDataStore.cache.best[period] = default(altair.LightClientUpdate)
return
# The block with highest participation may refer to a `finalized_checkpoint`
# in a different sync committee period. If that is the case, search for a
# later block with a `finalized_checkpoint` within the given sync committee
# period, despite it having a lower sync committee participation
var
tmpState = assignClone(dag.headState)
signatureBid {.noinit.}, finalizedBid {.noinit.}: BlockId
signatureBid.slot = FAR_FUTURE_SLOT
finalizedBid.slot = FAR_FUTURE_SLOT
while true:
if signatureBid.slot == FAR_FUTURE_SLOT:
signatureBid = maxParticipantsBid
else:
let
nextLowSlot = signatureBid.slot + 1
signatureRes = dag.maxParticipantsBlock(highBid, nextLowSlot)
signatureBid = signatureRes.bid.valueOr:
signatureBid = maxParticipantsBid
break
let
attestedBid = dag.existingParent(signatureBid).valueOr:
dag.handleUnexpectedLightClientError(signatureBid.slot)
continue
finalizedEpoch = block:
dag.withUpdatedExistingState(tmpState[], attestedBid.atSlot) do:
withState(state):
when stateFork >= BeaconStateFork.Altair:
state.data.finalized_checkpoint.epoch
else: raiseAssert "Unreachable"
do:
dag.handleUnexpectedLightClientError(attestedBid.slot)
continue
finalizedSlot = finalizedEpoch.start_slot
finalizedBsi =
if finalizedSlot >= dag.tail.slot:
dag.getExistingBlockIdAtSlot(finalizedSlot).valueOr:
dag.handleUnexpectedLightClientError(finalizedSlot)
continue
else:
continue
if finalizedBid.slot >= lowSlot:
finalizedBid = finalizedBsi.bid
break
if signatureBid == maxParticipantsBid:
finalizedBid = finalizedBsi.bid # For fallback `break` at start of loop
# Save best light client data for given period
var update {.noinit.}: altair.LightClientUpdate
let attestedBid = dag.existingParent(signatureBid).valueOr:
dag.handleUnexpectedLightClientError(signatureBid.slot)
return
dag.withUpdatedExistingState(tmpState[], attestedBid.atSlot) do:
let bdata = dag.getExistingForkedBlock(bid).valueOr:
dag.handleUnexpectedLightClientError(bid.slot)
return
withStateAndBlck(state, bdata):
when stateFork >= BeaconStateFork.Altair:
update.attested_header = blck.toBeaconBlockHeader()
update.next_sync_committee = state.data.next_sync_committee
update.next_sync_committee_branch =
state.data.build_proof(altair.NEXT_SYNC_COMMITTEE_INDEX).get
if finalizedBid.slot == FAR_FUTURE_SLOT:
update.finality_branch.reset()
else:
update.finality_branch =
state.data.build_proof(altair.FINALIZED_ROOT_INDEX).get
else: raiseAssert "Unreachable"
do:
dag.handleUnexpectedLightClientError(attestedBid.slot)
return
if finalizedBid.slot == FAR_FUTURE_SLOT or finalizedBid.slot == GENESIS_SLOT:
update.finalized_header.reset()
else:
let bdata = dag.getExistingForkedBlock(finalizedBid).valueOr:
dag.handleUnexpectedLightClientError(finalizedBid.slot)
return
withBlck(bdata):
update.finalized_header = blck.toBeaconBlockHeader()
let bdata = dag.getExistingForkedBlock(signatureBid).valueOr:
dag.handleUnexpectedLightClientError(signatureBid.slot)
return
withBlck(bdata):
when stateFork >= BeaconStateFork.Altair:
update.sync_aggregate = blck.asSigned().message.body.sync_aggregate
else: raiseAssert "Unreachable"
update.signature_slot = signatureBid.slot
dag.lcDataStore.cache.best[period] = update
proc initLightClientDataCache*(dag: ChainDAGRef) =
## Initialize cached light client data
if dag.lcDataStore.importMode == LightClientDataImportMode.None:
return
dag.lcDataStore.cache.importTailSlot = dag.tail.slot
if dag.lcDataStore.importMode == LightClientDataImportMode.OnlyNew:
dag.lcDataStore.cache.importTailSlot = dag.head.slot
var earliestSlot = dag.computeEarliestLightClientSlot
if dag.head.slot < earliestSlot:
return
# Import light client data for finalized period through finalized head
let
finalizedSlot = dag.finalizedHead.slot
finalizedPeriod = finalizedSlot.sync_committee_period
dag.initLightClientBootstrapForPeriod(finalizedPeriod)
dag.initLightClientUpdateForPeriod(finalizedPeriod)
let lightClientStartTick = Moment.now()
debug "Initializing cached light client data"
template handleUnexpectedError(buggedBid: BlockId) =
# Light client data is expected to be available from `earliestSlot` onward.
# If there is an error, adjust `importTailSlot` to avoid failed lookups of
# cached light client data. For new blocks / states, the caches can always
# be updated incrementally, because those blocks / states are passed in
# directly. It is only historical blocks (or sync committees) that depend
# on a potentially corrupted database.
doAssert buggedBid.slot > dag.lcDataStore.cache.importTailSlot
dag.handleUnexpectedLightClientError(buggedBid.slot)
earliestSlot = dag.computeEarliestLightClientSlot
# Build list of block to process.
# As it is slow to load states in descending order,
# build a reverse todo list to then process them in ascending order
let lowSlot = max(finalizedSlot, earliestSlot)
var
blocks = newSeqOfCap[BlockId](dag.head.slot - lowSlot + 1)
bid = dag.head.bid
while bid.slot > lowSlot:
blocks.add bid
bid = dag.existingParent(bid).valueOr:
handleUnexpectedError(bid)
break
if bid.slot == lowSlot:
blocks.add bid
# Process blocks (reuses `dag.headState`, but restores it to the current head)
var
tmpState = assignClone(dag.headState)
tmpCache, cache: StateCache
oldCheckpoint: Checkpoint
cpIndex = 0
for i in countdown(blocks.high, blocks.low):
bid = blocks[i]
if not dag.updateExistingState(
dag.headState, bid.atSlot, save = false, cache):
handleUnexpectedError(bid)
continue
let bdata = dag.getExistingForkedBlock(bid).valueOr:
handleUnexpectedError(bid)
continue
withStateAndBlck(dag.headState, bdata):
when stateFork >= BeaconStateFork.Altair:
# Cache light client data (non-finalized blocks may refer to this)
dag.cacheLightClientData(state, blck.toBlockId())
# Create `LightClientUpdate` instances
if bid.slot != lowSlot:
dag.createLightClientUpdates(state, blck, parentBid = blocks[i + 1])
else: raiseAssert "Unreachable"
let lightClientEndTick = Moment.now()
debug "Initialized cached light client data",
initDur = lightClientEndTick - lightClientStartTick
# Import historic data
if dag.lcDataStore.importMode == LightClientDataImportMode.Full:
let earliestPeriod = earliestSlot.sync_committee_period
for period in earliestPeriod ..< finalizedPeriod:
dag.initLightClientBootstrapForPeriod(period)
dag.initLightClientUpdateForPeriod(period)
proc getLightClientBootstrap*( proc getLightClientBootstrap*(
dag: ChainDAGRef, dag: ChainDAGRef,
blockRoot: Eth2Digest): Opt[altair.LightClientBootstrap] = blockRoot: Eth2Digest): Opt[altair.LightClientBootstrap] =
@ -799,8 +792,7 @@ proc getLightClientBootstrap*(
withBlck(bdata): withBlck(bdata):
let slot = blck.message.slot let slot = blck.message.slot
when stateFork >= BeaconStateFork.Altair: when stateFork >= BeaconStateFork.Altair:
let earliestSlot = dag.computeEarliestLightClientSlot if slot < dag.lcDataStore.cache.tailSlot:
if slot < earliestSlot:
debug "LC bootstrap unavailable: Block too old", slot debug "LC bootstrap unavailable: Block too old", slot
return err() return err()
if slot > dag.finalizedHead.blck.slot: if slot > dag.finalizedHead.blck.slot:

View File

@ -194,6 +194,7 @@ proc loadChainDag(
onLCOptimisticUpdateCb = onLightClientOptimisticUpdateCb, onLCOptimisticUpdateCb = onLightClientOptimisticUpdateCb,
lightClientDataServe = config.lightClientDataServe.get, lightClientDataServe = config.lightClientDataServe.get,
lightClientDataImportMode = config.lightClientDataImportMode.get, lightClientDataImportMode = config.lightClientDataImportMode.get,
lightClientDataMaxPeriods = config.lightClientDataMaxPeriods,
vanityLogs = getPandas(detectTTY(config.logStdout))) vanityLogs = getPandas(detectTTY(config.logStdout)))
let let

View File

@ -479,3 +479,12 @@ template name*(cfg: RuntimeConfig): string =
cfg.CONFIG_NAME cfg.CONFIG_NAME
else: else:
const_preset const_preset
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/phase0/p2p-interface.md#configuration
func MIN_EPOCHS_FOR_BLOCK_REQUESTS*(cfg: RuntimeConfig): uint64 =
cfg.MIN_VALIDATOR_WITHDRAWABILITY_DELAY + cfg.CHURN_LIMIT_QUOTIENT div 2
func defaultLCDataMaxPeriods*(cfg: RuntimeConfig): uint64 =
const epochsPerPeriod = EPOCHS_PER_SYNC_COMMITTEE_PERIOD
let maxEpochs = cfg.MIN_EPOCHS_FOR_BLOCK_REQUESTS
(maxEpochs + epochsPerPeriod - 1) div epochsPerPeriod