mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-10 14:26:26 +00:00
online import of historic LC data
Avoid multi-hour blocking when using `--import-light-client-data=full`; instead schedule a background task to import historic data over time. The background task is suspended while block proposals are scheduled.
This commit is contained in:
parent
4515851b71
commit
782d5471cf
2
Makefile
2
Makefile
@ -322,7 +322,7 @@ define CONNECT_TO_NETWORK_IN_DEV_MODE
|
||||
--data-dir=build/data/shared_$(1)_$(NODE_ID) \
|
||||
--light-client-enable=on \
|
||||
--light-client-data-serve=on \
|
||||
--light-client-data-import-mode=only-new \
|
||||
--light-client-data-import-mode=full \
|
||||
--dump $(NODE_PARAMS)
|
||||
endef
|
||||
|
||||
|
@ -453,7 +453,7 @@ type
|
||||
lightClientDataImportMode* {.
|
||||
hidden
|
||||
desc: "BETA: Which classes of light client data to import. " &
|
||||
"Must be one of: none, only-new, full (slow startup), on-demand (may miss validator duties)"
|
||||
"Must be one of: none, only-new, full"
|
||||
name: "light-client-data-import-mode" .}: Option[LightClientDataImportMode]
|
||||
|
||||
lightClientDataMaxPeriods* {.
|
||||
|
@ -335,6 +335,10 @@ template setHeadCb*(dag: ChainDAGRef, cb: OnHeadCallback) =
|
||||
template setReorgCb*(dag: ChainDAGRef, cb: OnReorgCallback) =
|
||||
dag.onReorgHappened = cb
|
||||
|
||||
template setLightClientImportTaskAllowedCb*(
|
||||
dag: ChainDAGRef, cb: LightClientTaskAllowedCallback) =
|
||||
dag.lcDataStore.importTaskAllowed = cb
|
||||
|
||||
func shortLog*(v: EpochRef): string =
|
||||
# epoch:root when logging epoch, root:slot when logging slot!
|
||||
if v.isNil():
|
||||
|
@ -13,6 +13,7 @@
|
||||
import
|
||||
# Status libraries
|
||||
stew/bitops2,
|
||||
chronos,
|
||||
# Beacon chain internals
|
||||
../spec/datatypes/altair,
|
||||
../light_client_data_db,
|
||||
@ -26,15 +27,15 @@ type
|
||||
OnlyNew = "only-new"
|
||||
## Import only new light client data.
|
||||
Full = "full"
|
||||
## Import light client data for entire weak subjectivity period.
|
||||
OnDemand = "on-demand"
|
||||
## Don't precompute historic data. Slow, may miss validator duties.
|
||||
## Import all light client data, backfilling historic data.
|
||||
|
||||
OnLightClientFinalityUpdateCallback* =
|
||||
proc(data: altair.LightClientFinalityUpdate) {.gcsafe, raises: [Defect].}
|
||||
OnLightClientOptimisticUpdateCallback* =
|
||||
proc(data: altair.LightClientOptimisticUpdate) {.gcsafe, raises: [Defect].}
|
||||
|
||||
LightClientTaskAllowedCallback* = proc(): bool {.gcsafe, raises: [Defect].}
|
||||
|
||||
CachedLightClientData* = object
|
||||
## Cached data from historical non-finalized states to improve speed when
|
||||
## creating future `LightClientUpdate` and `LightClientBootstrap` instances.
|
||||
@ -103,3 +104,10 @@ type
|
||||
## On new `LightClientFinalityUpdate` callback
|
||||
onLightClientOptimisticUpdate*: OnLightClientOptimisticUpdateCallback
|
||||
## On new `LightClientOptimisticUpdate` callback
|
||||
|
||||
# -----------------------------------
|
||||
# Historic data import
|
||||
importFut*: Future[void]
|
||||
## Background task for importing historic light client data
|
||||
importTaskAllowed*: LightClientTaskAllowedCallback
|
||||
## Callback to determine whether LC background task is allowed to run
|
||||
|
@ -50,7 +50,7 @@ const
|
||||
# When finality happens, we prune historical states from the database except
|
||||
# for a snapshort every 32 epochs from which replays can happen - there's a
|
||||
# balance here between making long replays and saving on disk space
|
||||
EPOCHS_PER_STATE_SNAPSHOT = 32
|
||||
EPOCHS_PER_STATE_SNAPSHOT* = 32
|
||||
|
||||
proc putBlock*(
|
||||
dag: ChainDAGRef, signedBlock: ForkyTrustedSignedBeaconBlock) =
|
||||
|
@ -21,6 +21,54 @@ import
|
||||
|
||||
logScope: topics = "chaindag"
|
||||
|
||||
type ThrottledTask = ref object
|
||||
isAllowedToResume: LightClientTaskAllowedCallback
|
||||
resumeTick: Moment
|
||||
activeDur, suspendedDur: Duration
|
||||
measureOffset: Duration
|
||||
|
||||
proc newThrottledTask(
|
||||
isAllowedToResume: LightClientTaskAllowedCallback): ThrottledTask =
|
||||
ThrottledTask(
|
||||
isAllowedToResume: isAllowedToResume,
|
||||
resumeTick: Moment.now())
|
||||
|
||||
proc throttle(task: ThrottledTask): Future[void] {.async.} =
|
||||
if task.isAllowedToResume == nil:
|
||||
return
|
||||
|
||||
const
|
||||
chunkDurLimit = chronos.milliseconds(50)
|
||||
suspendInterval = chronos.milliseconds(1750)
|
||||
|
||||
let
|
||||
tick = Moment.now()
|
||||
chunkDur = tick - task.resumeTick
|
||||
if chunkDur < chunkDurLimit:
|
||||
return
|
||||
task.activeDur += chunkDur - task.measureOffset
|
||||
task.measureOffset.reset()
|
||||
|
||||
await chronos.sleepAsync(suspendInterval) # yield at least once
|
||||
while not task.isAllowedToResume():
|
||||
await chronos.sleepAsync(suspendInterval)
|
||||
|
||||
task.resumeTick = Moment.now()
|
||||
task.suspendedDur += task.resumeTick - tick
|
||||
|
||||
proc measure(task: ThrottledTask): tuple[activeDur, suspendedDur: Duration] =
|
||||
let
|
||||
tick = Moment.now()
|
||||
chunkDur = tick - task.resumeTick
|
||||
res = (
|
||||
activeDur: task.activeDur + chunkDur - task.measureOffset,
|
||||
suspendedDur: task.suspendedDur
|
||||
)
|
||||
task.activeDur.reset()
|
||||
task.suspendedDur.reset()
|
||||
task.measureOffset = chunkDur
|
||||
res
|
||||
|
||||
type
|
||||
HashedBeaconStateWithSyncCommittee =
|
||||
bellatrix.HashedBeaconState |
|
||||
@ -45,6 +93,29 @@ proc updateExistingState(
|
||||
doAssert verifyFinalization notin dag.updateFlags
|
||||
ok
|
||||
|
||||
template asyncUpdateExistingState(
|
||||
dag: ChainDAGRef, state: var ForkedHashedBeaconState, bsiParam: BlockSlotId,
|
||||
save: bool, cache: var StateCache, periodicBody: untyped): bool =
|
||||
## Wrapper around `updateExistingState` for states expected to exist.
|
||||
block:
|
||||
# Ideally, an async API for updating state would be available.
|
||||
# As a stopgap solution, update to the target state in multiple steps.
|
||||
let
|
||||
currentSlot = getStateField(state, slot)
|
||||
bsi = bsiParam
|
||||
if currentSlot > bsi.slot or bsi.slot - currentSlot > 2 * SLOTS_PER_EPOCH:
|
||||
let
|
||||
targetEpoch = bsi.slot.epoch
|
||||
startEpoch = targetEpoch - (targetEpoch mod EPOCHS_PER_STATE_SNAPSHOT)
|
||||
for epoch in startEpoch ..< targetEpoch:
|
||||
periodicBody
|
||||
let stepBsi = dag.atSlot(bsi.bid, epoch.start_slot).valueOr:
|
||||
continue
|
||||
discard dag.updateState(state, stepBsi, false, cache)
|
||||
periodicBody
|
||||
|
||||
dag.updateExistingState(state, bsi, save, cache)
|
||||
|
||||
template withUpdatedExistingState(
|
||||
dag: ChainDAGRef, stateParam: var ForkedHashedBeaconState,
|
||||
bsiParam: BlockSlotId, okBody: untyped, failureBody: untyped): untyped =
|
||||
@ -58,6 +129,22 @@ template withUpdatedExistingState(
|
||||
doAssert verifyFinalization notin dag.updateFlags
|
||||
failureBody
|
||||
|
||||
template asyncWithUpdatedExistingState(
|
||||
dag: ChainDAGRef, stateParam: var ForkedHashedBeaconState,
|
||||
bsiParam: BlockSlotId, periodicBody: untyped,
|
||||
okBody: untyped, failureBody: untyped): untyped =
|
||||
## Wrapper around `withUpdatedExistingState` with periodic callback.
|
||||
block:
|
||||
let bsi {.inject.} = bsiParam
|
||||
var cache {.inject.} = StateCache()
|
||||
if asyncUpdateExistingState(
|
||||
dag, stateParam, bsi, false, cache, periodicBody):
|
||||
template bid(): BlockId {.inject, used.} = bsi.bid
|
||||
template state(): ForkedHashedBeaconState {.inject, used.} = stateParam
|
||||
okBody
|
||||
else:
|
||||
failureBody
|
||||
|
||||
proc getExistingBlockIdAtSlot(dag: ChainDAGRef, slot: Slot): Opt[BlockSlotId] =
|
||||
## Wrapper around `getBlockIdAtSlot` for blocks expected to exist.
|
||||
let bsi = dag.getBlockIdAtSlot(slot)
|
||||
@ -147,7 +234,10 @@ proc initLightClientDataStore*(
|
||||
|
||||
lcDataStore
|
||||
|
||||
proc closeLightClientDataStore*(dag: ChainDAGRef) =
|
||||
proc closeLightClientDataStore*(dag: ChainDAGRef) {.async.} =
|
||||
if dag.lcDataStore.importFut != nil:
|
||||
await dag.lcDataStore.importFut.cancelAndWait()
|
||||
|
||||
dag.lcDataStore.db.close()
|
||||
|
||||
func targetLightClientTailSlot(dag: ChainDAGRef): Slot =
|
||||
@ -168,8 +258,8 @@ func handleUnexpectedLightClientError(dag: ChainDAGRef, buggedSlot: Slot) =
|
||||
dag.lcDataStore.cache.tailSlot = buggedSlot + 1
|
||||
|
||||
proc initLightClientBootstrapForPeriod(
|
||||
dag: ChainDAGRef,
|
||||
period: SyncCommitteePeriod): Opt[void] =
|
||||
dag: ChainDAGRef, period: SyncCommitteePeriod,
|
||||
task: ThrottledTask): Future[Opt[void]] {.async.} =
|
||||
## Compute and cache `LightClientBootstrap` data for all finalized
|
||||
## epoch boundary blocks within a given sync committee period.
|
||||
if not dag.isNextSyncCommitteeFinalized(period):
|
||||
@ -177,12 +267,11 @@ proc initLightClientBootstrapForPeriod(
|
||||
if dag.lcDataStore.db.isPeriodSealed(period):
|
||||
return ok()
|
||||
|
||||
let startTick = Moment.now()
|
||||
discard task.measure()
|
||||
debug "Caching historic LC bootstrap data", period
|
||||
defer:
|
||||
let endTick = Moment.now()
|
||||
debug "Historic LC bootstrap data cached", period,
|
||||
cacheDur = endTick - startTick
|
||||
cacheDur = task.measure()
|
||||
|
||||
let
|
||||
periodStartSlot = period.start_slot
|
||||
@ -198,6 +287,7 @@ proc initLightClientBootstrapForPeriod(
|
||||
nextBoundarySlot = lowBoundarySlot
|
||||
while nextBoundarySlot <= highBoundarySlot:
|
||||
defer: nextBoundarySlot += SLOTS_PER_EPOCH
|
||||
await task.throttle()
|
||||
let
|
||||
bsi = dag.getExistingBlockIdAtSlot(nextBoundarySlot).valueOr:
|
||||
dag.handleUnexpectedLightClientError(nextBoundarySlot)
|
||||
@ -207,8 +297,11 @@ proc initLightClientBootstrapForPeriod(
|
||||
boundarySlot = bid.slot.nextEpochBoundarySlot
|
||||
if boundarySlot == nextBoundarySlot and bid.slot >= lowSlot and
|
||||
not dag.lcDataStore.db.hasCurrentSyncCommitteeBranch(bid.slot):
|
||||
if not dag.updateExistingState(
|
||||
tmpState[], bid.atSlot, save = false, tmpCache):
|
||||
let ok =
|
||||
dag.asyncUpdateExistingState(
|
||||
tmpState[], bid.atSlot, save = false, tmpCache) do:
|
||||
await task.throttle
|
||||
if not ok:
|
||||
dag.handleUnexpectedLightClientError(bid.slot)
|
||||
res.err()
|
||||
continue
|
||||
@ -217,10 +310,12 @@ proc initLightClientBootstrapForPeriod(
|
||||
state.data.build_proof(altair.CURRENT_SYNC_COMMITTEE_INDEX).get
|
||||
else: raiseAssert "Unreachable"
|
||||
dag.lcDataStore.db.putCurrentSyncCommitteeBranch(bid.slot, branch)
|
||||
res
|
||||
await task.throttle()
|
||||
return res
|
||||
|
||||
proc initLightClientUpdateForPeriod(
|
||||
dag: ChainDAGRef, period: SyncCommitteePeriod): Opt[void] =
|
||||
dag: ChainDAGRef, period: SyncCommitteePeriod,
|
||||
task: ThrottledTask): Future[Opt[void]] {.async.} =
|
||||
## Compute and cache the best `LightClientUpdate` within a given
|
||||
## sync committee period up through the finalized head block.
|
||||
## Non-finalized blocks are processed incrementally by other functions.
|
||||
@ -229,20 +324,21 @@ proc initLightClientUpdateForPeriod(
|
||||
if dag.lcDataStore.db.isPeriodSealed(period):
|
||||
return ok()
|
||||
|
||||
let startTick = Moment.now()
|
||||
discard task.measure()
|
||||
debug "Computing best historic LC update", period
|
||||
proc logBest(endTick = Moment.now()) =
|
||||
proc logBest() =
|
||||
# Using a helper function reduces code size as the `defer` beneath is
|
||||
# replicated on every `return`, and the log statement allocates another
|
||||
# copy of the arguments on the stack for each instantiation (~1 MB stack!)
|
||||
debug "Best historic LC update computed",
|
||||
period, update = dag.lcDataStore.db.getBestUpdate(period),
|
||||
computeDur = endTick - startTick
|
||||
computeDur = task.measure()
|
||||
defer: logBest()
|
||||
|
||||
proc maxParticipantsBlock(
|
||||
dag: ChainDAGRef, highBid: BlockId, lowSlot: Slot
|
||||
): tuple[bid: Opt[BlockId], res: Opt[void]] =
|
||||
dag: ChainDAGRef, highBid: BlockId, lowSlot: Slot,
|
||||
task: ThrottledTask
|
||||
): Future[tuple[bid: Opt[BlockId], res: Opt[void]]] {.async.} =
|
||||
## Determine the earliest block with most sync committee signatures among
|
||||
## ancestors of `highBid` with at least `lowSlot` as parent block slot.
|
||||
## Return `err` if no block with `MIN_SYNC_COMMITTEE_PARTICIPANTS` exists.
|
||||
@ -261,6 +357,7 @@ proc initLightClientUpdateForPeriod(
|
||||
break
|
||||
if parentBid.slot < lowSlot:
|
||||
break
|
||||
await task.throttle()
|
||||
let
|
||||
bdata = dag.getExistingForkedBlock(bid).valueOr:
|
||||
dag.handleUnexpectedLightClientError(bid.slot)
|
||||
@ -275,7 +372,7 @@ proc initLightClientUpdateForPeriod(
|
||||
maxParticipants = numParticipants
|
||||
maxBid.ok bid
|
||||
bid = parentBid
|
||||
(bid: maxBid, res: res)
|
||||
return (bid: maxBid, res: res)
|
||||
|
||||
# Determine the block in the period with highest sync committee participation
|
||||
let
|
||||
@ -288,7 +385,7 @@ proc initLightClientUpdateForPeriod(
|
||||
dag.handleUnexpectedLightClientError(highSlot)
|
||||
return err()
|
||||
highBid = highBsi.bid
|
||||
maxParticipantsRes = dag.maxParticipantsBlock(highBid, lowSlot)
|
||||
maxParticipantsRes = await dag.maxParticipantsBlock(highBid, lowSlot, task)
|
||||
maxParticipantsBid = maxParticipantsRes.bid.valueOr:
|
||||
const update = default(altair.LightClientUpdate)
|
||||
if fullPeriodCovered and maxParticipantsRes.res.isOk: # No block in period
|
||||
@ -313,19 +410,22 @@ proc initLightClientUpdateForPeriod(
|
||||
else:
|
||||
let
|
||||
nextLowSlot = signatureBid.slot + 1
|
||||
signatureRes = dag.maxParticipantsBlock(highBid, nextLowSlot)
|
||||
if signatureRes.res.isErr:
|
||||
bidRes = await dag.maxParticipantsBlock(highBid, nextLowSlot, task)
|
||||
if bidRes.res.isErr:
|
||||
res.err()
|
||||
signatureBid = signatureRes.bid.valueOr:
|
||||
signatureBid = bidRes.bid.valueOr:
|
||||
signatureBid = maxParticipantsBid
|
||||
break
|
||||
await task.throttle()
|
||||
let
|
||||
attestedBid = dag.existingParent(signatureBid).valueOr:
|
||||
dag.handleUnexpectedLightClientError(signatureBid.slot)
|
||||
res.err()
|
||||
continue
|
||||
finalizedEpoch = block:
|
||||
dag.withUpdatedExistingState(tmpState[], attestedBid.atSlot) do:
|
||||
dag.asyncWithUpdatedExistingState(tmpState[], attestedBid.atSlot) do:
|
||||
await task.throttle()
|
||||
do:
|
||||
withState(state):
|
||||
when stateFork >= BeaconStateFork.Altair:
|
||||
state.data.finalized_checkpoint.epoch
|
||||
@ -350,11 +450,14 @@ proc initLightClientUpdateForPeriod(
|
||||
finalizedBid = finalizedBsi.bid # For fallback `break` at start of loop
|
||||
|
||||
# Save best light client data for given period
|
||||
await task.throttle()
|
||||
var update {.noinit.}: altair.LightClientUpdate
|
||||
let attestedBid = dag.existingParent(signatureBid).valueOr:
|
||||
dag.handleUnexpectedLightClientError(signatureBid.slot)
|
||||
return err()
|
||||
dag.withUpdatedExistingState(tmpState[], attestedBid.atSlot) do:
|
||||
dag.asyncWithUpdatedExistingState(tmpState[], attestedBid.atSlot) do:
|
||||
await task.throttle()
|
||||
do:
|
||||
let bdata = dag.getExistingForkedBlock(bid).valueOr:
|
||||
dag.handleUnexpectedLightClientError(bid.slot)
|
||||
return err()
|
||||
@ -394,22 +497,89 @@ proc initLightClientUpdateForPeriod(
|
||||
dag.lcDataStore.db.putBestUpdate(period, update)
|
||||
else:
|
||||
dag.lcDataStore.db.putUpdateIfBetter(period, update)
|
||||
res
|
||||
await task.throttle()
|
||||
return res
|
||||
|
||||
proc initLightClientDataForPeriod(
|
||||
dag: ChainDAGRef, period: SyncCommitteePeriod): Opt[void] =
|
||||
dag: ChainDAGRef, period: SyncCommitteePeriod,
|
||||
task: ThrottledTask): Future[Opt[void]] {.async.} =
|
||||
## Import light client data for a given sync committee period.
|
||||
if dag.lcDataStore.db.isPeriodSealed(period):
|
||||
return ok()
|
||||
let
|
||||
fullPeriodCovered = (dag.finalizedHead.slot >= (period + 1).start_slot)
|
||||
res1 = dag.initLightClientBootstrapForPeriod(period)
|
||||
res2 = dag.initLightClientUpdateForPeriod(period)
|
||||
res1 = await dag.initLightClientBootstrapForPeriod(period, task)
|
||||
res2 = await dag.initLightClientUpdateForPeriod(period, task)
|
||||
if res1.isErr or res2.isErr:
|
||||
return err()
|
||||
if fullPeriodCovered:
|
||||
dag.lcDataStore.db.sealPeriod(period)
|
||||
ok()
|
||||
return ok()
|
||||
|
||||
proc importHistoricLightClientData(dag: ChainDAGRef): Future[void] {.async.} =
|
||||
## Import finalized historic light client data, moving `tailSlot` backwards.
|
||||
let task = newThrottledTask(dag.lcDataStore.importTaskAllowed)
|
||||
|
||||
logScope: lightClientDataMaxPeriods = dag.lcDataStore.maxPeriods
|
||||
|
||||
var
|
||||
anyWorkDone = false
|
||||
startTick: Moment
|
||||
while true:
|
||||
# Delay import of historic light client data until finality advanced
|
||||
# to avoid having to deal with potential forking in historic data
|
||||
let
|
||||
finalizedSlot = dag.finalizedHead.slot
|
||||
currentTailSlot = dag.lcDataStore.cache.tailSlot
|
||||
if finalizedSlot < currentTailSlot:
|
||||
return
|
||||
|
||||
# If target tail slot has been reached, work is done
|
||||
let targetTailSlot = dag.targetLightClientTailSlot
|
||||
if currentTailSlot <= targetTailSlot:
|
||||
if anyWorkDone:
|
||||
info "Historic LC data import complete",
|
||||
importDur = Moment.now() - startTick
|
||||
return
|
||||
if not anyWorkDone:
|
||||
anyWorkDone = true
|
||||
startTick = Moment.now()
|
||||
info "Importing historic LC data"
|
||||
|
||||
# Import next period
|
||||
let
|
||||
currentPeriod = currentTailSlot.sync_committee_period
|
||||
period =
|
||||
if currentTailSlot > currentPeriod.start_slot:
|
||||
currentPeriod
|
||||
else:
|
||||
doAssert currentPeriod > 0, "currentTailSlot > targetTailSlot"
|
||||
currentPeriod - 1
|
||||
periodStartSlot = max(period.start_slot, targetTailSlot)
|
||||
let res = await dag.initLightClientDataForPeriod(period, task)
|
||||
if res.isOk and dag.lcDataStore.cache.tailSlot == currentTailSlot:
|
||||
dag.lcDataStore.cache.tailSlot = periodStartSlot
|
||||
else:
|
||||
const retryAfterErrorInterval = chronos.seconds(30)
|
||||
await sleepAsync(retryAfterErrorInterval)
|
||||
|
||||
proc continueImportingHistoricLightClientData(dag: ChainDAGRef) {.gcsafe.} =
|
||||
## Continue importing finalized historic light client data.
|
||||
## This needs to be called whenever `finalized_checkpoint` changes.
|
||||
if dag.lcDataStore.importMode != LightClientDataImportMode.Full:
|
||||
return
|
||||
if dag.lcDataStore.importFut != nil:
|
||||
return
|
||||
|
||||
dag.lcDataStore.importFut = dag.importHistoricLightClientData()
|
||||
if dag.lcDataStore.importFut.completed:
|
||||
dag.lcDataStore.importFut = nil
|
||||
return
|
||||
|
||||
proc handleFinishedImport(future: pointer) =
|
||||
dag.lcDataStore.importFut = nil
|
||||
dag.continueImportingHistoricLightClientData()
|
||||
dag.lcDataStore.importFut.addCallback(handleFinishedImport)
|
||||
|
||||
proc getLightClientData(
|
||||
dag: ChainDAGRef,
|
||||
@ -605,7 +775,9 @@ proc createLightClientUpdates(
|
||||
dag.lcDataStore.onLightClientOptimisticUpdate(latest.toOptimistic)
|
||||
|
||||
proc initLightClientDataCache*(dag: ChainDAGRef) =
|
||||
## Initialize cached light client data
|
||||
## Initialize cached light client data with the initial head state.
|
||||
## Historic data is imported in the background as soon as finality
|
||||
## advances past the initial head slot.
|
||||
if dag.lcDataStore.importMode == LightClientDataImportMode.None:
|
||||
return
|
||||
|
||||
@ -613,96 +785,15 @@ proc initLightClientDataCache*(dag: ChainDAGRef) =
|
||||
dag.lcDataStore.db.delPeriodsFrom(dag.firstNonFinalizedPeriod)
|
||||
|
||||
# Initialize tail slot
|
||||
let targetTailSlot = dag.targetLightClientTailSlot
|
||||
dag.lcDataStore.cache.tailSlot = max(dag.head.slot, targetTailSlot)
|
||||
let altairStartSlot = dag.cfg.ALTAIR_FORK_EPOCH.start_slot
|
||||
dag.lcDataStore.cache.tailSlot = max(dag.head.slot, altairStartSlot)
|
||||
|
||||
# Import head state
|
||||
if dag.head.slot < dag.lcDataStore.cache.tailSlot:
|
||||
return
|
||||
withState(dag.headState):
|
||||
when stateFork >= BeaconStateFork.Altair:
|
||||
dag.cacheLightClientData(state, dag.head.bid)
|
||||
else: raiseAssert "Unreachable" # `tailSlot` cannot be before Altair
|
||||
if dag.lcDataStore.importMode == LightClientDataImportMode.OnlyNew:
|
||||
return
|
||||
|
||||
# Import light client data for finalized period through finalized head
|
||||
let finalizedSlot = max(dag.finalizedHead.blck.slot, targetTailSlot)
|
||||
if finalizedSlot >= dag.lcDataStore.cache.tailSlot:
|
||||
return
|
||||
dag.lcDataStore.cache.tailSlot = finalizedSlot
|
||||
let finalizedPeriod = finalizedSlot.sync_committee_period
|
||||
var res = dag.initLightClientDataForPeriod(finalizedPeriod)
|
||||
|
||||
let lightClientStartTick = Moment.now()
|
||||
logScope: lightClientDataMaxPeriods = dag.lcDataStore.maxPeriods
|
||||
debug "Initializing cached LC data", res
|
||||
|
||||
# Build list of block to process.
|
||||
# As it is slow to load states in descending order,
|
||||
# build a reverse todo list to then process them in ascending order
|
||||
var
|
||||
blocks = newSeqOfCap[BlockId](dag.head.slot - finalizedSlot + 1)
|
||||
bid = dag.head.bid
|
||||
while bid.slot > finalizedSlot:
|
||||
blocks.add bid
|
||||
bid = dag.existingParent(bid).valueOr:
|
||||
dag.handleUnexpectedLightClientError(bid.slot)
|
||||
res.err()
|
||||
break
|
||||
if bid.slot == finalizedSlot:
|
||||
blocks.add bid
|
||||
|
||||
# Process blocks (reuses `dag.headState`, but restores it to the current head)
|
||||
var
|
||||
tmpState = assignClone(dag.headState)
|
||||
tmpCache, cache: StateCache
|
||||
oldCheckpoint: Checkpoint
|
||||
cpIndex = 0
|
||||
for i in countdown(blocks.high, blocks.low):
|
||||
bid = blocks[i]
|
||||
if not dag.updateExistingState(
|
||||
dag.headState, bid.atSlot, save = false, cache):
|
||||
dag.handleUnexpectedLightClientError(bid.slot)
|
||||
res.err()
|
||||
continue
|
||||
let bdata = dag.getExistingForkedBlock(bid).valueOr:
|
||||
dag.handleUnexpectedLightClientError(bid.slot)
|
||||
res.err()
|
||||
continue
|
||||
withStateAndBlck(dag.headState, bdata):
|
||||
if dag.head.slot >= dag.lcDataStore.cache.tailSlot:
|
||||
withState(dag.headState):
|
||||
when stateFork >= BeaconStateFork.Altair:
|
||||
# Cache light client data (non-finalized blocks may refer to this)
|
||||
if i != blocks.low:
|
||||
dag.cacheLightClientData(state, bid) # `dag.head` already cached
|
||||
|
||||
# Create `LightClientUpdate` instances
|
||||
if i < blocks.high:
|
||||
dag.createLightClientUpdates(state, blck, parentBid = blocks[i + 1])
|
||||
else: raiseAssert "Unreachable"
|
||||
|
||||
let lightClientEndTick = Moment.now()
|
||||
debug "Initialized cached LC data",
|
||||
initDur = lightClientEndTick - lightClientStartTick, res
|
||||
if res.isErr:
|
||||
return
|
||||
if dag.lcDataStore.importMode == LightClientDataImportMode.OnDemand:
|
||||
return
|
||||
|
||||
# Import historic data
|
||||
dag.lcDataStore.cache.tailSlot = targetTailSlot
|
||||
let targetTailPeriod = targetTailSlot.sync_committee_period
|
||||
if targetTailPeriod < finalizedPeriod:
|
||||
# `countdown` through 0 fails on distinct `uint64`
|
||||
# https://github.com/nim-lang/Nim/pull/19926
|
||||
var period = finalizedPeriod - 1
|
||||
while period >= targetTailPeriod:
|
||||
if dag.initLightClientDataForPeriod(period).isErr:
|
||||
res.err()
|
||||
if period <= targetTailPeriod:
|
||||
break
|
||||
dec period
|
||||
debug "Historic LC data imported", res
|
||||
dag.cacheLightClientData(state, dag.head.bid)
|
||||
else: raiseAssert "Unreachable" # `tailSlot` cannot be before Altair
|
||||
|
||||
proc processNewBlockForLightClient*(
|
||||
dag: ChainDAGRef,
|
||||
@ -824,6 +915,9 @@ proc processFinalizationForLightClient*(
|
||||
for key in keysToDelete:
|
||||
dag.lcDataStore.cache.pendingBest.del key
|
||||
|
||||
# Continue importing historic light client data
|
||||
dag.continueImportingHistoricLightClientData()
|
||||
|
||||
proc getLightClientBootstrap*(
|
||||
dag: ChainDAGRef,
|
||||
blockRoot: Eth2Digest): Opt[altair.LightClientBootstrap] =
|
||||
@ -843,21 +937,10 @@ proc getLightClientBootstrap*(
|
||||
if slot > dag.finalizedHead.blck.slot:
|
||||
debug "LC bootstrap unavailable: Not finalized", blockRoot
|
||||
return err()
|
||||
var branch = dag.lcDataStore.db.getCurrentSyncCommitteeBranch(slot)
|
||||
let branch = dag.lcDataStore.db.getCurrentSyncCommitteeBranch(slot)
|
||||
if branch.isZeroMemory:
|
||||
if dag.lcDataStore.importMode == LightClientDataImportMode.OnDemand:
|
||||
let bsi = ? dag.getExistingBlockIdAtSlot(slot)
|
||||
var tmpState = assignClone(dag.headState)
|
||||
dag.withUpdatedExistingState(tmpState[], bsi) do:
|
||||
branch = withState(state):
|
||||
when stateFork >= BeaconStateFork.Altair:
|
||||
state.data.build_proof(altair.CURRENT_SYNC_COMMITTEE_INDEX).get
|
||||
else: raiseAssert "Unreachable"
|
||||
do: return err()
|
||||
dag.lcDataStore.db.putCurrentSyncCommitteeBranch(slot, branch)
|
||||
else:
|
||||
debug "LC bootstrap unavailable: Data not cached", slot
|
||||
return err()
|
||||
debug "LC bootstrap unavailable: Data not cached", slot
|
||||
return err()
|
||||
|
||||
let period = slot.sync_committee_period
|
||||
var tmpState = assignClone(dag.headState)
|
||||
@ -879,9 +962,6 @@ proc getLightClientUpdateForPeriod*(
|
||||
if not dag.lcDataStore.serve:
|
||||
return
|
||||
|
||||
if dag.lcDataStore.importMode == LightClientDataImportMode.OnDemand:
|
||||
if dag.initLightClientUpdateForPeriod(period).isErr:
|
||||
return
|
||||
result = some(dag.lcDataStore.db.getBestUpdate(period))
|
||||
let numParticipants = countOnes(result.get.sync_aggregate.sync_committee_bits)
|
||||
if numParticipants < MIN_SYNC_COMMITTEE_PARTICIPANTS:
|
||||
|
@ -336,6 +336,12 @@ proc initFullNode(
|
||||
dag.setHeadCb(onHeadChanged)
|
||||
dag.setReorgCb(onChainReorg)
|
||||
|
||||
proc importTaskAllowed(): bool =
|
||||
# Suspend background task as important validator actions are approaching
|
||||
node.actionTracker.getNextProposalSlot(node.currentSlot) == FAR_FUTURE_SLOT
|
||||
|
||||
dag.setLightClientImportTaskAllowedCb(importTaskAllowed)
|
||||
|
||||
node.dag = dag
|
||||
node.quarantine = quarantine
|
||||
node.attestationPool = attestationPool
|
||||
@ -1485,8 +1491,11 @@ proc stop(node: BeaconNode) =
|
||||
waitFor node.network.stop()
|
||||
except CatchableError as exc:
|
||||
warn "Couldn't stop network", msg = exc.msg
|
||||
try:
|
||||
waitFor node.dag.closeLightClientDataStore()
|
||||
except CatchableError as exc:
|
||||
warn "Couldn't close LC data store", msg = exc.msg
|
||||
|
||||
node.dag.closeLightClientDataStore()
|
||||
node.attachedValidators.slashingProtection.close()
|
||||
node.db.close()
|
||||
notice "Databases closed"
|
||||
|
@ -50,7 +50,7 @@ template readSszBytes*(
|
||||
data: openArray[byte], val: var auto, updateRoot: bool) =
|
||||
readSszValue(data, val)
|
||||
|
||||
func readSszBytes(T: type, data: openArray[byte], updateRoot = true): T {.
|
||||
func readSszBytes*(T: type, data: openArray[byte], updateRoot = true): T {.
|
||||
raises: [Defect, MalformedSszError, SszSizeMismatchError].} =
|
||||
var res: T
|
||||
readSszBytes(data, res, updateRoot)
|
||||
|
@ -879,7 +879,7 @@ for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do
|
||||
--metrics-port="$(( BASE_METRICS_PORT + NUM_NODE ))" \
|
||||
--light-client-enable=on \
|
||||
--light-client-data-serve=on \
|
||||
--light-client-data-import-mode=only-new \
|
||||
--light-client-data-import-mode=full \
|
||||
${EXTRA_ARGS} \
|
||||
&> "${DATA_DIR}/log${NUM_NODE}.txt" &
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user