From 782d5471cf551eb48e1aef55347775676a2386d4 Mon Sep 17 00:00:00 2001 From: Etan Kissling Date: Thu, 2 Jun 2022 11:40:40 +0200 Subject: [PATCH] online import of historic LC data Avoid multi-hour blocking when using `--import-light-client-data=full`; instead schedule a background task to import historic data over time. The background task is suspended while block proposals are scheduled. --- Makefile | 2 +- beacon_chain/conf.nim | 2 +- .../block_pools_types.nim | 4 + .../block_pools_types_light_client.nim | 14 +- .../consensus_object_pools/blockchain_dag.nim | 2 +- .../blockchain_dag_light_client.nim | 344 +++++++++++------- beacon_chain/nimbus_beacon_node.nim | 11 +- beacon_chain/spec/eth2_ssz_serialization.nim | 2 +- scripts/launch_local_testnet.sh | 2 +- 9 files changed, 242 insertions(+), 141 deletions(-) diff --git a/Makefile b/Makefile index dd7893930..fd3f6f9ba 100644 --- a/Makefile +++ b/Makefile @@ -322,7 +322,7 @@ define CONNECT_TO_NETWORK_IN_DEV_MODE --data-dir=build/data/shared_$(1)_$(NODE_ID) \ --light-client-enable=on \ --light-client-data-serve=on \ - --light-client-data-import-mode=only-new \ + --light-client-data-import-mode=full \ --dump $(NODE_PARAMS) endef diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index 40ed62425..386a4fc2c 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -453,7 +453,7 @@ type lightClientDataImportMode* {. hidden desc: "BETA: Which classes of light client data to import. " & - "Must be one of: none, only-new, full (slow startup), on-demand (may miss validator duties)" + "Must be one of: none, only-new, full" name: "light-client-data-import-mode" .}: Option[LightClientDataImportMode] lightClientDataMaxPeriods* {. diff --git a/beacon_chain/consensus_object_pools/block_pools_types.nim b/beacon_chain/consensus_object_pools/block_pools_types.nim index df829ed40..34a395b76 100644 --- a/beacon_chain/consensus_object_pools/block_pools_types.nim +++ b/beacon_chain/consensus_object_pools/block_pools_types.nim @@ -335,6 +335,10 @@ template setHeadCb*(dag: ChainDAGRef, cb: OnHeadCallback) = template setReorgCb*(dag: ChainDAGRef, cb: OnReorgCallback) = dag.onReorgHappened = cb +template setLightClientImportTaskAllowedCb*( + dag: ChainDAGRef, cb: LightClientTaskAllowedCallback) = + dag.lcDataStore.importTaskAllowed = cb + func shortLog*(v: EpochRef): string = # epoch:root when logging epoch, root:slot when logging slot! if v.isNil(): diff --git a/beacon_chain/consensus_object_pools/block_pools_types_light_client.nim b/beacon_chain/consensus_object_pools/block_pools_types_light_client.nim index 0c2d33fbd..55f34452a 100644 --- a/beacon_chain/consensus_object_pools/block_pools_types_light_client.nim +++ b/beacon_chain/consensus_object_pools/block_pools_types_light_client.nim @@ -13,6 +13,7 @@ import # Status libraries stew/bitops2, + chronos, # Beacon chain internals ../spec/datatypes/altair, ../light_client_data_db, @@ -26,15 +27,15 @@ type OnlyNew = "only-new" ## Import only new light client data. Full = "full" - ## Import light client data for entire weak subjectivity period. - OnDemand = "on-demand" - ## Don't precompute historic data. Slow, may miss validator duties. + ## Import all light client data, backfilling historic data. OnLightClientFinalityUpdateCallback* = proc(data: altair.LightClientFinalityUpdate) {.gcsafe, raises: [Defect].} OnLightClientOptimisticUpdateCallback* = proc(data: altair.LightClientOptimisticUpdate) {.gcsafe, raises: [Defect].} + LightClientTaskAllowedCallback* = proc(): bool {.gcsafe, raises: [Defect].} + CachedLightClientData* = object ## Cached data from historical non-finalized states to improve speed when ## creating future `LightClientUpdate` and `LightClientBootstrap` instances. @@ -103,3 +104,10 @@ type ## On new `LightClientFinalityUpdate` callback onLightClientOptimisticUpdate*: OnLightClientOptimisticUpdateCallback ## On new `LightClientOptimisticUpdate` callback + + # ----------------------------------- + # Historic data import + importFut*: Future[void] + ## Background task for importing historic light client data + importTaskAllowed*: LightClientTaskAllowedCallback + ## Callback to determine whether LC background task is allowed to run diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index 874164e61..9026fd927 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -50,7 +50,7 @@ const # When finality happens, we prune historical states from the database except # for a snapshort every 32 epochs from which replays can happen - there's a # balance here between making long replays and saving on disk space - EPOCHS_PER_STATE_SNAPSHOT = 32 + EPOCHS_PER_STATE_SNAPSHOT* = 32 proc putBlock*( dag: ChainDAGRef, signedBlock: ForkyTrustedSignedBeaconBlock) = diff --git a/beacon_chain/consensus_object_pools/blockchain_dag_light_client.nim b/beacon_chain/consensus_object_pools/blockchain_dag_light_client.nim index d1a6fdbe8..de62caa21 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag_light_client.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag_light_client.nim @@ -21,6 +21,54 @@ import logScope: topics = "chaindag" +type ThrottledTask = ref object + isAllowedToResume: LightClientTaskAllowedCallback + resumeTick: Moment + activeDur, suspendedDur: Duration + measureOffset: Duration + +proc newThrottledTask( + isAllowedToResume: LightClientTaskAllowedCallback): ThrottledTask = + ThrottledTask( + isAllowedToResume: isAllowedToResume, + resumeTick: Moment.now()) + +proc throttle(task: ThrottledTask): Future[void] {.async.} = + if task.isAllowedToResume == nil: + return + + const + chunkDurLimit = chronos.milliseconds(50) + suspendInterval = chronos.milliseconds(1750) + + let + tick = Moment.now() + chunkDur = tick - task.resumeTick + if chunkDur < chunkDurLimit: + return + task.activeDur += chunkDur - task.measureOffset + task.measureOffset.reset() + + await chronos.sleepAsync(suspendInterval) # yield at least once + while not task.isAllowedToResume(): + await chronos.sleepAsync(suspendInterval) + + task.resumeTick = Moment.now() + task.suspendedDur += task.resumeTick - tick + +proc measure(task: ThrottledTask): tuple[activeDur, suspendedDur: Duration] = + let + tick = Moment.now() + chunkDur = tick - task.resumeTick + res = ( + activeDur: task.activeDur + chunkDur - task.measureOffset, + suspendedDur: task.suspendedDur + ) + task.activeDur.reset() + task.suspendedDur.reset() + task.measureOffset = chunkDur + res + type HashedBeaconStateWithSyncCommittee = bellatrix.HashedBeaconState | @@ -45,6 +93,29 @@ proc updateExistingState( doAssert verifyFinalization notin dag.updateFlags ok +template asyncUpdateExistingState( + dag: ChainDAGRef, state: var ForkedHashedBeaconState, bsiParam: BlockSlotId, + save: bool, cache: var StateCache, periodicBody: untyped): bool = + ## Wrapper around `updateExistingState` for states expected to exist. + block: + # Ideally, an async API for updating state would be available. + # As a stopgap solution, update to the target state in multiple steps. + let + currentSlot = getStateField(state, slot) + bsi = bsiParam + if currentSlot > bsi.slot or bsi.slot - currentSlot > 2 * SLOTS_PER_EPOCH: + let + targetEpoch = bsi.slot.epoch + startEpoch = targetEpoch - (targetEpoch mod EPOCHS_PER_STATE_SNAPSHOT) + for epoch in startEpoch ..< targetEpoch: + periodicBody + let stepBsi = dag.atSlot(bsi.bid, epoch.start_slot).valueOr: + continue + discard dag.updateState(state, stepBsi, false, cache) + periodicBody + + dag.updateExistingState(state, bsi, save, cache) + template withUpdatedExistingState( dag: ChainDAGRef, stateParam: var ForkedHashedBeaconState, bsiParam: BlockSlotId, okBody: untyped, failureBody: untyped): untyped = @@ -58,6 +129,22 @@ template withUpdatedExistingState( doAssert verifyFinalization notin dag.updateFlags failureBody +template asyncWithUpdatedExistingState( + dag: ChainDAGRef, stateParam: var ForkedHashedBeaconState, + bsiParam: BlockSlotId, periodicBody: untyped, + okBody: untyped, failureBody: untyped): untyped = + ## Wrapper around `withUpdatedExistingState` with periodic callback. + block: + let bsi {.inject.} = bsiParam + var cache {.inject.} = StateCache() + if asyncUpdateExistingState( + dag, stateParam, bsi, false, cache, periodicBody): + template bid(): BlockId {.inject, used.} = bsi.bid + template state(): ForkedHashedBeaconState {.inject, used.} = stateParam + okBody + else: + failureBody + proc getExistingBlockIdAtSlot(dag: ChainDAGRef, slot: Slot): Opt[BlockSlotId] = ## Wrapper around `getBlockIdAtSlot` for blocks expected to exist. let bsi = dag.getBlockIdAtSlot(slot) @@ -147,7 +234,10 @@ proc initLightClientDataStore*( lcDataStore -proc closeLightClientDataStore*(dag: ChainDAGRef) = +proc closeLightClientDataStore*(dag: ChainDAGRef) {.async.} = + if dag.lcDataStore.importFut != nil: + await dag.lcDataStore.importFut.cancelAndWait() + dag.lcDataStore.db.close() func targetLightClientTailSlot(dag: ChainDAGRef): Slot = @@ -168,8 +258,8 @@ func handleUnexpectedLightClientError(dag: ChainDAGRef, buggedSlot: Slot) = dag.lcDataStore.cache.tailSlot = buggedSlot + 1 proc initLightClientBootstrapForPeriod( - dag: ChainDAGRef, - period: SyncCommitteePeriod): Opt[void] = + dag: ChainDAGRef, period: SyncCommitteePeriod, + task: ThrottledTask): Future[Opt[void]] {.async.} = ## Compute and cache `LightClientBootstrap` data for all finalized ## epoch boundary blocks within a given sync committee period. if not dag.isNextSyncCommitteeFinalized(period): @@ -177,12 +267,11 @@ proc initLightClientBootstrapForPeriod( if dag.lcDataStore.db.isPeriodSealed(period): return ok() - let startTick = Moment.now() + discard task.measure() debug "Caching historic LC bootstrap data", period defer: - let endTick = Moment.now() debug "Historic LC bootstrap data cached", period, - cacheDur = endTick - startTick + cacheDur = task.measure() let periodStartSlot = period.start_slot @@ -198,6 +287,7 @@ proc initLightClientBootstrapForPeriod( nextBoundarySlot = lowBoundarySlot while nextBoundarySlot <= highBoundarySlot: defer: nextBoundarySlot += SLOTS_PER_EPOCH + await task.throttle() let bsi = dag.getExistingBlockIdAtSlot(nextBoundarySlot).valueOr: dag.handleUnexpectedLightClientError(nextBoundarySlot) @@ -207,8 +297,11 @@ proc initLightClientBootstrapForPeriod( boundarySlot = bid.slot.nextEpochBoundarySlot if boundarySlot == nextBoundarySlot and bid.slot >= lowSlot and not dag.lcDataStore.db.hasCurrentSyncCommitteeBranch(bid.slot): - if not dag.updateExistingState( - tmpState[], bid.atSlot, save = false, tmpCache): + let ok = + dag.asyncUpdateExistingState( + tmpState[], bid.atSlot, save = false, tmpCache) do: + await task.throttle + if not ok: dag.handleUnexpectedLightClientError(bid.slot) res.err() continue @@ -217,10 +310,12 @@ proc initLightClientBootstrapForPeriod( state.data.build_proof(altair.CURRENT_SYNC_COMMITTEE_INDEX).get else: raiseAssert "Unreachable" dag.lcDataStore.db.putCurrentSyncCommitteeBranch(bid.slot, branch) - res + await task.throttle() + return res proc initLightClientUpdateForPeriod( - dag: ChainDAGRef, period: SyncCommitteePeriod): Opt[void] = + dag: ChainDAGRef, period: SyncCommitteePeriod, + task: ThrottledTask): Future[Opt[void]] {.async.} = ## Compute and cache the best `LightClientUpdate` within a given ## sync committee period up through the finalized head block. ## Non-finalized blocks are processed incrementally by other functions. @@ -229,20 +324,21 @@ proc initLightClientUpdateForPeriod( if dag.lcDataStore.db.isPeriodSealed(period): return ok() - let startTick = Moment.now() + discard task.measure() debug "Computing best historic LC update", period - proc logBest(endTick = Moment.now()) = + proc logBest() = # Using a helper function reduces code size as the `defer` beneath is # replicated on every `return`, and the log statement allocates another # copy of the arguments on the stack for each instantiation (~1 MB stack!) debug "Best historic LC update computed", period, update = dag.lcDataStore.db.getBestUpdate(period), - computeDur = endTick - startTick + computeDur = task.measure() defer: logBest() proc maxParticipantsBlock( - dag: ChainDAGRef, highBid: BlockId, lowSlot: Slot - ): tuple[bid: Opt[BlockId], res: Opt[void]] = + dag: ChainDAGRef, highBid: BlockId, lowSlot: Slot, + task: ThrottledTask + ): Future[tuple[bid: Opt[BlockId], res: Opt[void]]] {.async.} = ## Determine the earliest block with most sync committee signatures among ## ancestors of `highBid` with at least `lowSlot` as parent block slot. ## Return `err` if no block with `MIN_SYNC_COMMITTEE_PARTICIPANTS` exists. @@ -261,6 +357,7 @@ proc initLightClientUpdateForPeriod( break if parentBid.slot < lowSlot: break + await task.throttle() let bdata = dag.getExistingForkedBlock(bid).valueOr: dag.handleUnexpectedLightClientError(bid.slot) @@ -275,7 +372,7 @@ proc initLightClientUpdateForPeriod( maxParticipants = numParticipants maxBid.ok bid bid = parentBid - (bid: maxBid, res: res) + return (bid: maxBid, res: res) # Determine the block in the period with highest sync committee participation let @@ -288,7 +385,7 @@ proc initLightClientUpdateForPeriod( dag.handleUnexpectedLightClientError(highSlot) return err() highBid = highBsi.bid - maxParticipantsRes = dag.maxParticipantsBlock(highBid, lowSlot) + maxParticipantsRes = await dag.maxParticipantsBlock(highBid, lowSlot, task) maxParticipantsBid = maxParticipantsRes.bid.valueOr: const update = default(altair.LightClientUpdate) if fullPeriodCovered and maxParticipantsRes.res.isOk: # No block in period @@ -313,19 +410,22 @@ proc initLightClientUpdateForPeriod( else: let nextLowSlot = signatureBid.slot + 1 - signatureRes = dag.maxParticipantsBlock(highBid, nextLowSlot) - if signatureRes.res.isErr: + bidRes = await dag.maxParticipantsBlock(highBid, nextLowSlot, task) + if bidRes.res.isErr: res.err() - signatureBid = signatureRes.bid.valueOr: + signatureBid = bidRes.bid.valueOr: signatureBid = maxParticipantsBid break + await task.throttle() let attestedBid = dag.existingParent(signatureBid).valueOr: dag.handleUnexpectedLightClientError(signatureBid.slot) res.err() continue finalizedEpoch = block: - dag.withUpdatedExistingState(tmpState[], attestedBid.atSlot) do: + dag.asyncWithUpdatedExistingState(tmpState[], attestedBid.atSlot) do: + await task.throttle() + do: withState(state): when stateFork >= BeaconStateFork.Altair: state.data.finalized_checkpoint.epoch @@ -350,11 +450,14 @@ proc initLightClientUpdateForPeriod( finalizedBid = finalizedBsi.bid # For fallback `break` at start of loop # Save best light client data for given period + await task.throttle() var update {.noinit.}: altair.LightClientUpdate let attestedBid = dag.existingParent(signatureBid).valueOr: dag.handleUnexpectedLightClientError(signatureBid.slot) return err() - dag.withUpdatedExistingState(tmpState[], attestedBid.atSlot) do: + dag.asyncWithUpdatedExistingState(tmpState[], attestedBid.atSlot) do: + await task.throttle() + do: let bdata = dag.getExistingForkedBlock(bid).valueOr: dag.handleUnexpectedLightClientError(bid.slot) return err() @@ -394,22 +497,89 @@ proc initLightClientUpdateForPeriod( dag.lcDataStore.db.putBestUpdate(period, update) else: dag.lcDataStore.db.putUpdateIfBetter(period, update) - res + await task.throttle() + return res proc initLightClientDataForPeriod( - dag: ChainDAGRef, period: SyncCommitteePeriod): Opt[void] = + dag: ChainDAGRef, period: SyncCommitteePeriod, + task: ThrottledTask): Future[Opt[void]] {.async.} = ## Import light client data for a given sync committee period. if dag.lcDataStore.db.isPeriodSealed(period): return ok() let fullPeriodCovered = (dag.finalizedHead.slot >= (period + 1).start_slot) - res1 = dag.initLightClientBootstrapForPeriod(period) - res2 = dag.initLightClientUpdateForPeriod(period) + res1 = await dag.initLightClientBootstrapForPeriod(period, task) + res2 = await dag.initLightClientUpdateForPeriod(period, task) if res1.isErr or res2.isErr: return err() if fullPeriodCovered: dag.lcDataStore.db.sealPeriod(period) - ok() + return ok() + +proc importHistoricLightClientData(dag: ChainDAGRef): Future[void] {.async.} = + ## Import finalized historic light client data, moving `tailSlot` backwards. + let task = newThrottledTask(dag.lcDataStore.importTaskAllowed) + + logScope: lightClientDataMaxPeriods = dag.lcDataStore.maxPeriods + + var + anyWorkDone = false + startTick: Moment + while true: + # Delay import of historic light client data until finality advanced + # to avoid having to deal with potential forking in historic data + let + finalizedSlot = dag.finalizedHead.slot + currentTailSlot = dag.lcDataStore.cache.tailSlot + if finalizedSlot < currentTailSlot: + return + + # If target tail slot has been reached, work is done + let targetTailSlot = dag.targetLightClientTailSlot + if currentTailSlot <= targetTailSlot: + if anyWorkDone: + info "Historic LC data import complete", + importDur = Moment.now() - startTick + return + if not anyWorkDone: + anyWorkDone = true + startTick = Moment.now() + info "Importing historic LC data" + + # Import next period + let + currentPeriod = currentTailSlot.sync_committee_period + period = + if currentTailSlot > currentPeriod.start_slot: + currentPeriod + else: + doAssert currentPeriod > 0, "currentTailSlot > targetTailSlot" + currentPeriod - 1 + periodStartSlot = max(period.start_slot, targetTailSlot) + let res = await dag.initLightClientDataForPeriod(period, task) + if res.isOk and dag.lcDataStore.cache.tailSlot == currentTailSlot: + dag.lcDataStore.cache.tailSlot = periodStartSlot + else: + const retryAfterErrorInterval = chronos.seconds(30) + await sleepAsync(retryAfterErrorInterval) + +proc continueImportingHistoricLightClientData(dag: ChainDAGRef) {.gcsafe.} = + ## Continue importing finalized historic light client data. + ## This needs to be called whenever `finalized_checkpoint` changes. + if dag.lcDataStore.importMode != LightClientDataImportMode.Full: + return + if dag.lcDataStore.importFut != nil: + return + + dag.lcDataStore.importFut = dag.importHistoricLightClientData() + if dag.lcDataStore.importFut.completed: + dag.lcDataStore.importFut = nil + return + + proc handleFinishedImport(future: pointer) = + dag.lcDataStore.importFut = nil + dag.continueImportingHistoricLightClientData() + dag.lcDataStore.importFut.addCallback(handleFinishedImport) proc getLightClientData( dag: ChainDAGRef, @@ -605,7 +775,9 @@ proc createLightClientUpdates( dag.lcDataStore.onLightClientOptimisticUpdate(latest.toOptimistic) proc initLightClientDataCache*(dag: ChainDAGRef) = - ## Initialize cached light client data + ## Initialize cached light client data with the initial head state. + ## Historic data is imported in the background as soon as finality + ## advances past the initial head slot. if dag.lcDataStore.importMode == LightClientDataImportMode.None: return @@ -613,96 +785,15 @@ proc initLightClientDataCache*(dag: ChainDAGRef) = dag.lcDataStore.db.delPeriodsFrom(dag.firstNonFinalizedPeriod) # Initialize tail slot - let targetTailSlot = dag.targetLightClientTailSlot - dag.lcDataStore.cache.tailSlot = max(dag.head.slot, targetTailSlot) + let altairStartSlot = dag.cfg.ALTAIR_FORK_EPOCH.start_slot + dag.lcDataStore.cache.tailSlot = max(dag.head.slot, altairStartSlot) # Import head state - if dag.head.slot < dag.lcDataStore.cache.tailSlot: - return - withState(dag.headState): - when stateFork >= BeaconStateFork.Altair: - dag.cacheLightClientData(state, dag.head.bid) - else: raiseAssert "Unreachable" # `tailSlot` cannot be before Altair - if dag.lcDataStore.importMode == LightClientDataImportMode.OnlyNew: - return - - # Import light client data for finalized period through finalized head - let finalizedSlot = max(dag.finalizedHead.blck.slot, targetTailSlot) - if finalizedSlot >= dag.lcDataStore.cache.tailSlot: - return - dag.lcDataStore.cache.tailSlot = finalizedSlot - let finalizedPeriod = finalizedSlot.sync_committee_period - var res = dag.initLightClientDataForPeriod(finalizedPeriod) - - let lightClientStartTick = Moment.now() - logScope: lightClientDataMaxPeriods = dag.lcDataStore.maxPeriods - debug "Initializing cached LC data", res - - # Build list of block to process. - # As it is slow to load states in descending order, - # build a reverse todo list to then process them in ascending order - var - blocks = newSeqOfCap[BlockId](dag.head.slot - finalizedSlot + 1) - bid = dag.head.bid - while bid.slot > finalizedSlot: - blocks.add bid - bid = dag.existingParent(bid).valueOr: - dag.handleUnexpectedLightClientError(bid.slot) - res.err() - break - if bid.slot == finalizedSlot: - blocks.add bid - - # Process blocks (reuses `dag.headState`, but restores it to the current head) - var - tmpState = assignClone(dag.headState) - tmpCache, cache: StateCache - oldCheckpoint: Checkpoint - cpIndex = 0 - for i in countdown(blocks.high, blocks.low): - bid = blocks[i] - if not dag.updateExistingState( - dag.headState, bid.atSlot, save = false, cache): - dag.handleUnexpectedLightClientError(bid.slot) - res.err() - continue - let bdata = dag.getExistingForkedBlock(bid).valueOr: - dag.handleUnexpectedLightClientError(bid.slot) - res.err() - continue - withStateAndBlck(dag.headState, bdata): + if dag.head.slot >= dag.lcDataStore.cache.tailSlot: + withState(dag.headState): when stateFork >= BeaconStateFork.Altair: - # Cache light client data (non-finalized blocks may refer to this) - if i != blocks.low: - dag.cacheLightClientData(state, bid) # `dag.head` already cached - - # Create `LightClientUpdate` instances - if i < blocks.high: - dag.createLightClientUpdates(state, blck, parentBid = blocks[i + 1]) - else: raiseAssert "Unreachable" - - let lightClientEndTick = Moment.now() - debug "Initialized cached LC data", - initDur = lightClientEndTick - lightClientStartTick, res - if res.isErr: - return - if dag.lcDataStore.importMode == LightClientDataImportMode.OnDemand: - return - - # Import historic data - dag.lcDataStore.cache.tailSlot = targetTailSlot - let targetTailPeriod = targetTailSlot.sync_committee_period - if targetTailPeriod < finalizedPeriod: - # `countdown` through 0 fails on distinct `uint64` - # https://github.com/nim-lang/Nim/pull/19926 - var period = finalizedPeriod - 1 - while period >= targetTailPeriod: - if dag.initLightClientDataForPeriod(period).isErr: - res.err() - if period <= targetTailPeriod: - break - dec period - debug "Historic LC data imported", res + dag.cacheLightClientData(state, dag.head.bid) + else: raiseAssert "Unreachable" # `tailSlot` cannot be before Altair proc processNewBlockForLightClient*( dag: ChainDAGRef, @@ -824,6 +915,9 @@ proc processFinalizationForLightClient*( for key in keysToDelete: dag.lcDataStore.cache.pendingBest.del key + # Continue importing historic light client data + dag.continueImportingHistoricLightClientData() + proc getLightClientBootstrap*( dag: ChainDAGRef, blockRoot: Eth2Digest): Opt[altair.LightClientBootstrap] = @@ -843,21 +937,10 @@ proc getLightClientBootstrap*( if slot > dag.finalizedHead.blck.slot: debug "LC bootstrap unavailable: Not finalized", blockRoot return err() - var branch = dag.lcDataStore.db.getCurrentSyncCommitteeBranch(slot) + let branch = dag.lcDataStore.db.getCurrentSyncCommitteeBranch(slot) if branch.isZeroMemory: - if dag.lcDataStore.importMode == LightClientDataImportMode.OnDemand: - let bsi = ? dag.getExistingBlockIdAtSlot(slot) - var tmpState = assignClone(dag.headState) - dag.withUpdatedExistingState(tmpState[], bsi) do: - branch = withState(state): - when stateFork >= BeaconStateFork.Altair: - state.data.build_proof(altair.CURRENT_SYNC_COMMITTEE_INDEX).get - else: raiseAssert "Unreachable" - do: return err() - dag.lcDataStore.db.putCurrentSyncCommitteeBranch(slot, branch) - else: - debug "LC bootstrap unavailable: Data not cached", slot - return err() + debug "LC bootstrap unavailable: Data not cached", slot + return err() let period = slot.sync_committee_period var tmpState = assignClone(dag.headState) @@ -879,9 +962,6 @@ proc getLightClientUpdateForPeriod*( if not dag.lcDataStore.serve: return - if dag.lcDataStore.importMode == LightClientDataImportMode.OnDemand: - if dag.initLightClientUpdateForPeriod(period).isErr: - return result = some(dag.lcDataStore.db.getBestUpdate(period)) let numParticipants = countOnes(result.get.sync_aggregate.sync_committee_bits) if numParticipants < MIN_SYNC_COMMITTEE_PARTICIPANTS: diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index d1aa04dbd..9e4c35dee 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -336,6 +336,12 @@ proc initFullNode( dag.setHeadCb(onHeadChanged) dag.setReorgCb(onChainReorg) + proc importTaskAllowed(): bool = + # Suspend background task as important validator actions are approaching + node.actionTracker.getNextProposalSlot(node.currentSlot) == FAR_FUTURE_SLOT + + dag.setLightClientImportTaskAllowedCb(importTaskAllowed) + node.dag = dag node.quarantine = quarantine node.attestationPool = attestationPool @@ -1485,8 +1491,11 @@ proc stop(node: BeaconNode) = waitFor node.network.stop() except CatchableError as exc: warn "Couldn't stop network", msg = exc.msg + try: + waitFor node.dag.closeLightClientDataStore() + except CatchableError as exc: + warn "Couldn't close LC data store", msg = exc.msg - node.dag.closeLightClientDataStore() node.attachedValidators.slashingProtection.close() node.db.close() notice "Databases closed" diff --git a/beacon_chain/spec/eth2_ssz_serialization.nim b/beacon_chain/spec/eth2_ssz_serialization.nim index c68cbd956..7ae0ae73f 100644 --- a/beacon_chain/spec/eth2_ssz_serialization.nim +++ b/beacon_chain/spec/eth2_ssz_serialization.nim @@ -50,7 +50,7 @@ template readSszBytes*( data: openArray[byte], val: var auto, updateRoot: bool) = readSszValue(data, val) -func readSszBytes(T: type, data: openArray[byte], updateRoot = true): T {. +func readSszBytes*(T: type, data: openArray[byte], updateRoot = true): T {. raises: [Defect, MalformedSszError, SszSizeMismatchError].} = var res: T readSszBytes(data, res, updateRoot) diff --git a/scripts/launch_local_testnet.sh b/scripts/launch_local_testnet.sh index 52b602667..c891393a3 100755 --- a/scripts/launch_local_testnet.sh +++ b/scripts/launch_local_testnet.sh @@ -879,7 +879,7 @@ for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do --metrics-port="$(( BASE_METRICS_PORT + NUM_NODE ))" \ --light-client-enable=on \ --light-client-data-serve=on \ - --light-client-data-import-mode=only-new \ + --light-client-data-import-mode=full \ ${EXTRA_ARGS} \ &> "${DATA_DIR}/log${NUM_NODE}.txt" &