From 91d543440a281425c6ee259f7aabcd0a49fb217e Mon Sep 17 00:00:00 2001 From: Etan Kissling Date: Mon, 27 Jun 2022 13:24:38 +0200 Subject: [PATCH] add option to configure max historic LC data periods (#3799) Adds a `--light-client-data-max-periods` option to override the number of sync committee periods to retain light client data. Raising it above the default enables archive nodes to serve full data. Lowering below the default speeds up import times (still no persistence) --- beacon_chain/conf.nim | 5 + .../block_pools_types_light_client.nim | 4 +- .../consensus_object_pools/blockchain_dag.nim | 2 + .../blockchain_dag_light_client.nim | 726 +++++++++--------- beacon_chain/nimbus_beacon_node.nim | 1 + beacon_chain/spec/presets.nim | 9 + 6 files changed, 379 insertions(+), 368 deletions(-) diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index 7a31eb98d..66b022beb 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -456,6 +456,11 @@ type "Must be one of: none, only-new, full (slow startup), on-demand (may miss validator duties)" name: "light-client-data-import-mode" .}: Option[LightClientDataImportMode] + lightClientDataMaxPeriods* {. + hidden + desc: "BETA: Maximum number of sync committee periods to retain light client data" + name: "light-client-data-max-periods" .}: Option[uint64] + inProcessValidators* {. desc: "Disable the push model (the beacon node tells a signing process with the private keys of the validators what to sign and when) and load the validators in the beacon node itself" defaultValue: true # the use of the nimbus_signing_process binary by default will be delayed until async I/O over stdin/stdout is developed for the child process. diff --git a/beacon_chain/consensus_object_pools/block_pools_types_light_client.nim b/beacon_chain/consensus_object_pools/block_pools_types_light_client.nim index 02b647b2d..3421aafb3 100644 --- a/beacon_chain/consensus_object_pools/block_pools_types_light_client.nim +++ b/beacon_chain/consensus_object_pools/block_pools_types_light_client.nim @@ -77,7 +77,7 @@ type ## Tracks light client data for the latest slot that was signed by ## at least `MIN_SYNC_COMMITTEE_PARTICIPANTS`. May be older than head. - importTailSlot*: Slot + tailSlot*: Slot ## The earliest slot for which light client data is imported. LightClientDataStore* = object @@ -94,6 +94,8 @@ type ## Whether to make local light client data available or not importMode*: LightClientDataImportMode ## Which classes of light client data to import + maxPeriods*: uint64 + ## Maximum number of sync committee periods to retain light client data # ----------------------------------- # Callbacks diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index 45eed8570..2b71ed4e2 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -689,6 +689,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, onLCOptimisticUpdateCb: OnLightClientOptimisticUpdateCallback = nil, lightClientDataServe = false, lightClientDataImportMode = LightClientDataImportMode.None, + lightClientDataMaxPeriods = none(uint64), vanityLogs = default(VanityLogs)): ChainDAGRef = cfg.checkForkConsistency() @@ -728,6 +729,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, lcDataStore: initLightClientDataStore( serve = lightClientDataServe, importMode = lightClientDataImportMode, + maxPeriods = lightClientDataMaxPeriods.get(cfg.defaultLCDataMaxPeriods), onLCFinalityUpdateCb = onLCFinalityUpdateCb, onLCOptimisticUpdateCb = onLCOptimisticUpdateCb), diff --git a/beacon_chain/consensus_object_pools/blockchain_dag_light_client.nim b/beacon_chain/consensus_object_pools/blockchain_dag_light_client.nim index d04aa9c48..5e3db38ae 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag_light_client.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag_light_client.nim @@ -34,28 +34,6 @@ template nextEpochBoundarySlot(slot: Slot): Slot = ## referring to a block at given slot. (slot + (SLOTS_PER_EPOCH - 1)).epoch.start_slot -func computeEarliestLightClientSlot(dag: ChainDAGRef): Slot = - ## Compute the earliest slot for which light client data is retained. - let - minSupportedSlot = max( - dag.cfg.ALTAIR_FORK_EPOCH.start_slot, - dag.lcDataStore.cache.importTailSlot) - currentSlot = getStateField(dag.headState, slot) - if currentSlot < minSupportedSlot: - return minSupportedSlot - - let - MIN_EPOCHS_FOR_BLOCK_REQUESTS = - dag.cfg.MIN_VALIDATOR_WITHDRAWABILITY_DELAY + - dag.cfg.CHURN_LIMIT_QUOTIENT div 2 - MIN_SLOTS_FOR_BLOCK_REQUESTS = - MIN_EPOCHS_FOR_BLOCK_REQUESTS * SLOTS_PER_EPOCH - if currentSlot - minSupportedSlot < MIN_SLOTS_FOR_BLOCK_REQUESTS: - return minSupportedSlot - - let earliestSlot = currentSlot - MIN_SLOTS_FOR_BLOCK_REQUESTS - max(earliestSlot.sync_committee_period.start_slot, minSupportedSlot) - proc updateExistingState( dag: ChainDAGRef, state: var ForkedHashedBeaconState, bsi: BlockSlotId, save: bool, cache: var StateCache): bool = @@ -145,6 +123,245 @@ proc syncCommitteeRootForPeriod( else: raiseAssert "Unreachable" do: err() +func initLightClientDataStore*( + serve: bool, importMode: LightClientDataImportMode, maxPeriods: uint64, + onLCFinalityUpdateCb: OnLightClientFinalityUpdateCallback = nil, + onLCOptimisticUpdateCb: OnLightClientOptimisticUpdateCallback = nil +): LightClientDataStore = + ## Initialize light client data store. + LightClientDataStore( + serve: serve, + importMode: importMode, + maxPeriods: maxPeriods, + onLightClientFinalityUpdate: onLCFinalityUpdateCb, + onLightClientOptimisticUpdate: onLCOptimisticUpdateCb) + +func targetLightClientTailSlot(dag: ChainDAGRef): Slot = + ## Earliest slot for which light client data is retained. + let + maxPeriods = dag.lcDataStore.maxPeriods + headPeriod = dag.head.slot.sync_committee_period + lowSlot = max(dag.tail.slot, dag.cfg.ALTAIR_FORK_EPOCH.start_slot) + tail = max(headPeriod + 1, maxPeriods.SyncCommitteePeriod) - maxPeriods + max(tail.start_slot, lowSlot) + +func handleUnexpectedLightClientError(dag: ChainDAGRef, buggedSlot: Slot) = + ## If there is an unexpected error, adjust `tailSlot` to keep track of the + ## section for which complete light client data is available, and to avoid + ## failed lookups of cached light client data. + doAssert verifyFinalization notin dag.updateFlags + if buggedSlot >= dag.lcDataStore.cache.tailSlot: + dag.lcDataStore.cache.tailSlot = buggedSlot + 1 + +proc initLightClientBootstrapForPeriod( + dag: ChainDAGRef, + period: SyncCommitteePeriod) = + ## Compute and cache `LightClientBootstrap` data for all finalized + ## epoch boundary blocks within a given sync committee period. + if not dag.isNextSyncCommitteeFinalized(period): + return + + let startTick = Moment.now() + debug "Caching historic LC bootstrap data", period + defer: + let endTick = Moment.now() + debug "Historic LC bootstrap data cached", period, + cacheDur = endTick - startTick + + let + periodStartSlot = period.start_slot + periodEndSlot = periodStartSlot + SLOTS_PER_SYNC_COMMITTEE_PERIOD - 1 + lowSlot = max(periodStartSlot, dag.targetLightClientTailSlot) + highSlot = min(periodEndSlot, dag.finalizedHead.blck.slot) + lowBoundarySlot = lowSlot.nextEpochBoundarySlot + highBoundarySlot = highSlot.nextEpochBoundarySlot + var + tmpState = assignClone(dag.headState) + tmpCache: StateCache + nextBoundarySlot = lowBoundarySlot + while nextBoundarySlot <= highBoundarySlot: + defer: nextBoundarySlot += SLOTS_PER_EPOCH + let + bsi = dag.getExistingBlockIdAtSlot(nextBoundarySlot).valueOr: + dag.handleUnexpectedLightClientError(nextBoundarySlot) + continue + bid = bsi.bid + boundarySlot = bid.slot.nextEpochBoundarySlot + if boundarySlot == nextBoundarySlot and bid.slot >= lowSlot and + not dag.lcDataStore.cache.bootstrap.hasKey(bid.slot): + if not dag.updateExistingState( + tmpState[], bid.atSlot, save = false, tmpCache): + dag.handleUnexpectedLightClientError(bid.slot) + continue + var cachedBootstrap {.noinit.}: CachedLightClientBootstrap + cachedBootstrap.current_sync_committee_branch = withState(tmpState[]): + when stateFork >= BeaconStateFork.Altair: + state.data.build_proof(altair.CURRENT_SYNC_COMMITTEE_INDEX).get + else: raiseAssert "Unreachable" + dag.lcDataStore.cache.bootstrap[bid.slot] = cachedBootstrap + +proc initLightClientUpdateForPeriod( + dag: ChainDAGRef, period: SyncCommitteePeriod) = + ## Compute and cache the best `LightClientUpdate` within a given + ## sync committee period up through the finalized head block. + ## Non-finalized blocks are processed incrementally. + if not dag.isNextSyncCommitteeFinalized(period): + return + if dag.lcDataStore.cache.best.hasKey(period): + return + + let startTick = Moment.now() + debug "Computing best historic LC update", period + proc logBest(endTick = Moment.now()) = + # Using a helper function reduces code size as the `defer` beneath is + # replicated on every `return`, and the log statement allocates another + # copy of the arguments on the stack for each instantiation (~1 MB stack!) + debug "Best historic LC update computed", + period, update = dag.lcDataStore.cache.best.getOrDefault(period), + computeDur = endTick - startTick + defer: logBest() + + proc maxParticipantsBlock( + dag: ChainDAGRef, highBid: BlockId, lowSlot: Slot + ): tuple[bid: Opt[BlockId], ok: bool] = + ## Determine the earliest block with most sync committee signatures among + ## ancestors of `highBid` with at least `lowSlot` as parent block slot. + ## Return `err` if no block with `MIN_SYNC_COMMITTEE_PARTICIPANTS` exists. + ## `bool` in result indicates whether no unexpected errors occurred. + var + maxParticipants = MIN_SYNC_COMMITTEE_PARTICIPANTS + maxBid: Opt[BlockId] + allOk = true + bid = highBid + while true: + if bid.slot <= lowSlot: + break + let parentBid = dag.existingParent(bid).valueOr: + dag.handleUnexpectedLightClientError(bid.slot) + allOk = false + break + if parentBid.slot < lowSlot: + break + let + bdata = dag.getExistingForkedBlock(bid).valueOr: + dag.handleUnexpectedLightClientError(bid.slot) + allOk = false + break + numParticipants = + withBlck(bdata): + when stateFork >= BeaconStateFork.Altair: + countOnes(blck.message.body.sync_aggregate.sync_committee_bits) + else: raiseAssert "Unreachable" + if numParticipants >= maxParticipants: + maxParticipants = numParticipants + maxBid.ok bid + bid = parentBid + (bid: maxBid, ok: allOk) + + # Determine the block in the period with highest sync committee participation + let + periodStartSlot = period.start_slot + periodEndSlot = periodStartSlot + SLOTS_PER_SYNC_COMMITTEE_PERIOD - 1 + lowSlot = max(periodStartSlot, dag.targetLightClientTailSlot) + highSlot = min(periodEndSlot, dag.finalizedHead.blck.slot) + highBsi = dag.getExistingBlockIdAtSlot(highSlot).valueOr: + dag.handleUnexpectedLightClientError(highSlot) + return + highBid = highBsi.bid + maxParticipantsRes = dag.maxParticipantsBlock(highBid, lowSlot) + maxParticipantsBid = maxParticipantsRes.bid.valueOr: + if maxParticipantsRes.ok: # No single valid block exists in the period + dag.lcDataStore.cache.best[period] = default(altair.LightClientUpdate) + return + + # The block with highest participation may refer to a `finalized_checkpoint` + # in a different sync committee period. If that is the case, search for a + # later block with a `finalized_checkpoint` within the given sync committee + # period, despite it having a lower sync committee participation + var + tmpState = assignClone(dag.headState) + signatureBid {.noinit.}, finalizedBid {.noinit.}: BlockId + signatureBid.slot = FAR_FUTURE_SLOT + finalizedBid.slot = FAR_FUTURE_SLOT + while true: + if signatureBid.slot == FAR_FUTURE_SLOT: + signatureBid = maxParticipantsBid + else: + let + nextLowSlot = signatureBid.slot + 1 + signatureRes = dag.maxParticipantsBlock(highBid, nextLowSlot) + signatureBid = signatureRes.bid.valueOr: + signatureBid = maxParticipantsBid + break + let + attestedBid = dag.existingParent(signatureBid).valueOr: + dag.handleUnexpectedLightClientError(signatureBid.slot) + continue + finalizedEpoch = block: + dag.withUpdatedExistingState(tmpState[], attestedBid.atSlot) do: + withState(state): + when stateFork >= BeaconStateFork.Altair: + state.data.finalized_checkpoint.epoch + else: raiseAssert "Unreachable" + do: + dag.handleUnexpectedLightClientError(attestedBid.slot) + continue + finalizedSlot = finalizedEpoch.start_slot + finalizedBsi = + if finalizedSlot >= dag.tail.slot: + dag.getExistingBlockIdAtSlot(finalizedSlot).valueOr: + dag.handleUnexpectedLightClientError(finalizedSlot) + continue + else: + continue + if finalizedBid.slot >= lowSlot: + finalizedBid = finalizedBsi.bid + break + if signatureBid == maxParticipantsBid: + finalizedBid = finalizedBsi.bid # For fallback `break` at start of loop + + # Save best light client data for given period + var update {.noinit.}: altair.LightClientUpdate + let attestedBid = dag.existingParent(signatureBid).valueOr: + dag.handleUnexpectedLightClientError(signatureBid.slot) + return + dag.withUpdatedExistingState(tmpState[], attestedBid.atSlot) do: + let bdata = dag.getExistingForkedBlock(bid).valueOr: + dag.handleUnexpectedLightClientError(bid.slot) + return + withStateAndBlck(state, bdata): + when stateFork >= BeaconStateFork.Altair: + update.attested_header = blck.toBeaconBlockHeader() + update.next_sync_committee = state.data.next_sync_committee + update.next_sync_committee_branch = + state.data.build_proof(altair.NEXT_SYNC_COMMITTEE_INDEX).get + if finalizedBid.slot == FAR_FUTURE_SLOT: + update.finality_branch.reset() + else: + update.finality_branch = + state.data.build_proof(altair.FINALIZED_ROOT_INDEX).get + else: raiseAssert "Unreachable" + do: + dag.handleUnexpectedLightClientError(attestedBid.slot) + return + if finalizedBid.slot == FAR_FUTURE_SLOT or finalizedBid.slot == GENESIS_SLOT: + update.finalized_header.reset() + else: + let bdata = dag.getExistingForkedBlock(finalizedBid).valueOr: + dag.handleUnexpectedLightClientError(finalizedBid.slot) + return + withBlck(bdata): + update.finalized_header = blck.toBeaconBlockHeader() + let bdata = dag.getExistingForkedBlock(signatureBid).valueOr: + dag.handleUnexpectedLightClientError(signatureBid.slot) + return + withBlck(bdata): + when stateFork >= BeaconStateFork.Altair: + update.sync_aggregate = blck.asSigned().message.body.sync_aggregate + else: raiseAssert "Unreachable" + update.signature_slot = signatureBid.slot + dag.lcDataStore.cache.best[period] = update + proc getLightClientData( dag: ChainDAGRef, bid: BlockId): CachedLightClientData = @@ -178,14 +395,6 @@ proc deleteLightClientData*(dag: ChainDAGRef, bid: BlockId) = dag.lcDataStore.cache.data.del bid -func handleUnexpectedLightClientError(dag: ChainDAGRef, buggedSlot: Slot) = - ## If there is an unexpected error, adjust `importTailSlot` to keep track of - ## section for which complete light client data is available, and to avoid - ## failed lookups of cached light client data. - doAssert verifyFinalization notin dag.updateFlags - if buggedSlot >= dag.lcDataStore.cache.importTailSlot: - dag.lcDataStore.cache.importTailSlot = buggedSlot + 1 - template lazy_header(name: untyped): untyped {.dirty.} = ## `createLightClientUpdates` helper to lazily load a known block header. var @@ -246,10 +455,8 @@ proc createLightClientUpdates( # Verify attested block (parent) is recent enough and that state is available template attested_bid(): auto = parent_bid - let - earliest_slot = dag.computeEarliestLightClientSlot - attested_slot = attested_bid.slot - if attested_slot < earliest_slot: + let attested_slot = attested_bid.slot + if attested_slot < dag.lcDataStore.cache.tailSlot: return # Lazy variables to hold historic data @@ -337,19 +544,115 @@ proc createLightClientUpdates( if isCommitteeFinalized: dag.lcDataStore.cache.best[attested_period] = best - debug "Best LC update for period improved", - period = attested_period, update = best + debug "Best LC update improved", period = attested_period, update = best else: let key = (attested_period, state.syncCommitteeRoot) dag.lcDataStore.cache.pendingBest[key] = best - debug "Best LC update for period improved", - period = key, update = best + debug "Best LC update improved", period = key, update = best if newFinality and dag.lcDataStore.onLightClientFinalityUpdate != nil: dag.lcDataStore.onLightClientFinalityUpdate(latest) if newOptimistic and dag.lcDataStore.onLightClientOptimisticUpdate != nil: dag.lcDataStore.onLightClientOptimisticUpdate(latest.toOptimistic) +proc initLightClientDataCache*(dag: ChainDAGRef) = + ## Initialize cached light client data + if dag.lcDataStore.importMode == LightClientDataImportMode.None: + return + + # Initialize tail slot + let targetTailSlot = dag.targetLightClientTailSlot + dag.lcDataStore.cache.tailSlot = max(dag.head.slot, targetTailSlot) + + # Import head state + if dag.head.slot < dag.lcDataStore.cache.tailSlot: + return + withState(dag.headState): + when stateFork >= BeaconStateFork.Altair: + dag.cacheLightClientData(state, dag.head.bid) + else: raiseAssert "Unreachable" # `tailSlot` cannot be before Altair + if dag.lcDataStore.importMode == LightClientDataImportMode.OnlyNew: + return + + # Import light client data for finalized period through finalized head + let finalizedSlot = max(dag.finalizedHead.blck.slot, targetTailSlot) + if finalizedSlot >= dag.lcDataStore.cache.tailSlot: + return + dag.lcDataStore.cache.tailSlot = finalizedSlot + let finalizedPeriod = finalizedSlot.sync_committee_period + dag.initLightClientBootstrapForPeriod(finalizedPeriod) + dag.initLightClientUpdateForPeriod(finalizedPeriod) + + let lightClientStartTick = Moment.now() + logScope: lightClientDataMaxPeriods = dag.lcDataStore.maxPeriods + debug "Initializing cached LC data" + + # Build list of block to process. + # As it is slow to load states in descending order, + # build a reverse todo list to then process them in ascending order + var + blocks = newSeqOfCap[BlockId](dag.head.slot - finalizedSlot + 1) + bid = dag.head.bid + while bid.slot > finalizedSlot: + blocks.add bid + bid = dag.existingParent(bid).valueOr: + dag.handleUnexpectedLightClientError(bid.slot) + break + if bid.slot == finalizedSlot: + blocks.add bid + + # Process blocks (reuses `dag.headState`, but restores it to the current head) + var + tmpState = assignClone(dag.headState) + tmpCache, cache: StateCache + oldCheckpoint: Checkpoint + cpIndex = 0 + for i in countdown(blocks.high, blocks.low): + bid = blocks[i] + if not dag.updateExistingState( + dag.headState, bid.atSlot, save = false, cache): + dag.handleUnexpectedLightClientError(bid.slot) + continue + let bdata = dag.getExistingForkedBlock(bid).valueOr: + dag.handleUnexpectedLightClientError(bid.slot) + continue + withStateAndBlck(dag.headState, bdata): + when stateFork >= BeaconStateFork.Altair: + # Cache light client data (non-finalized blocks may refer to this) + if i != blocks.low: + dag.cacheLightClientData(state, bid) # `dag.head` already cached + + # Create `LightClientUpdate` instances + if i < blocks.high: + dag.createLightClientUpdates(state, blck, parentBid = blocks[i + 1]) + else: raiseAssert "Unreachable" + + let lightClientEndTick = Moment.now() + debug "Initialized cached LC data", + initDur = lightClientEndTick - lightClientStartTick + if dag.lcDataStore.importMode == LightClientDataImportMode.OnDemand: + return + + # Import historic data + dag.lcDataStore.cache.tailSlot = targetTailSlot + let targetTailPeriod = targetTailSlot.sync_committee_period + if targetTailPeriod < finalizedPeriod: + # `countdown` through 0 fails on distinct `uint64` + # https://github.com/nim-lang/Nim/pull/19926 + var period = finalizedPeriod - 1 + while period >= targetTailPeriod: + dag.initLightClientBootstrapForPeriod(period) + dag.initLightClientUpdateForPeriod(period) + if period <= targetTailPeriod: + break + dec period + if dag.lcDataStore.cache.tailSlot == targetTailSlot: + debug "Historic LC data imported" + else: + # `handleUnexpectedLightClientError` updates `tailSlot` + warn "Error while importing historic LC data", + errorSlot = dag.lcDataStore.cache.tailSlot - 1, targetTailSlot + proc processNewBlockForLightClient*( dag: ChainDAGRef, state: ForkedHashedBeaconState, @@ -358,7 +661,7 @@ proc processNewBlockForLightClient*( ## Update light client data with information from a new block. if dag.lcDataStore.importMode == LightClientDataImportMode.None: return - if signedBlock.message.slot < dag.computeEarliestLightClientSlot: + if signedBlock.message.slot < dag.lcDataStore.cache.tailSlot: return when signedBlock is bellatrix.TrustedSignedBeaconBlock: @@ -368,7 +671,7 @@ proc processNewBlockForLightClient*( dag.cacheLightClientData(state.altairData, signedBlock.toBlockId()) dag.createLightClientUpdates(state.altairData, signedBlock, parentBid) elif signedBlock is phase0.TrustedSignedBeaconBlock: - raiseAssert "Unreachable" # `earliestSlot` cannot be before Altair + raiseAssert "Unreachable" # `tailSlot` cannot be before Altair else: {.error: "Unreachable".} @@ -377,8 +680,7 @@ proc processHeadChangeForLightClient*(dag: ChainDAGRef) = ## Note that `dag.finalizedHead` is not yet updated when this is called. if dag.lcDataStore.importMode == LightClientDataImportMode.None: return - let earliestSlot = dag.computeEarliestLightClientSlot - if dag.head.slot < earliestSlot: + if dag.head.slot < dag.lcDataStore.cache.tailSlot: return # Update `best` from `pendingBest` to ensure light client data @@ -386,8 +688,8 @@ proc processHeadChangeForLightClient*(dag: ChainDAGRef) = let headPeriod = dag.head.slot.sync_committee_period if not dag.isNextSyncCommitteeFinalized(headPeriod): let - earliestPeriod = earliestSlot.sync_committee_period - lowPeriod = max(dag.firstNonFinalizedPeriod, earliestPeriod) + tailPeriod = dag.lcDataStore.cache.tailSlot.sync_committee_period + lowPeriod = max(dag.firstNonFinalizedPeriod, tailPeriod) if headPeriod > lowPeriod: var tmpState = assignClone(dag.headState) for period in lowPeriod ..< headPeriod: @@ -404,7 +706,7 @@ proc processHeadChangeForLightClient*(dag: ChainDAGRef) = let key = (headPeriod, state.syncCommitteeRoot) dag.lcDataStore.cache.best[headPeriod] = dag.lcDataStore.cache.pendingBest.getOrDefault(key) - else: raiseAssert "Unreachable" + else: raiseAssert "Unreachable" # `tailSlot` cannot be before Altair proc processFinalizationForLightClient*( dag: ChainDAGRef, oldFinalizedHead: BlockSlot) = @@ -413,14 +715,14 @@ proc processFinalizationForLightClient*( ## This needs to be called whenever `finalized_checkpoint` changes. if dag.lcDataStore.importMode == LightClientDataImportMode.None: return - let - earliestSlot = dag.computeEarliestLightClientSlot - finalizedSlot = dag.finalizedHead.slot - if finalizedSlot < earliestSlot: + let finalizedSlot = dag.finalizedHead.slot + if finalizedSlot < dag.lcDataStore.cache.tailSlot: return # Cache `LightClientBootstrap` for newly finalized epoch boundary blocks - let lowSlot = max(oldFinalizedHead.slot + 1, earliestSlot) + let + firstNewSlot = oldFinalizedHead.slot + 1 + lowSlot = max(firstNewSlot, dag.lcDataStore.cache.tailSlot) var boundarySlot = finalizedSlot while boundarySlot >= lowSlot: let @@ -447,19 +749,22 @@ proc processFinalizationForLightClient*( for bid in bidsToDelete: dag.lcDataStore.cache.data.del bid + let + targetTailSlot = dag.targetLightClientTailSlot + targetTailPeriod = targetTailSlot.sync_committee_period + # Prune bootstrap data that is no longer relevant var slotsToDelete: seq[Slot] for slot in dag.lcDataStore.cache.bootstrap.keys: - if slot < earliestSlot: + if slot < targetTailSlot: slotsToDelete.add slot for slot in slotsToDelete: dag.lcDataStore.cache.bootstrap.del slot # Prune best `LightClientUpdate` that are no longer relevant - let earliestPeriod = earliestSlot.sync_committee_period var periodsToDelete: seq[SyncCommitteePeriod] for period in dag.lcDataStore.cache.best.keys: - if period < earliestPeriod: + if period < targetTailPeriod: periodsToDelete.add period for period in periodsToDelete: dag.lcDataStore.cache.best.del period @@ -474,318 +779,6 @@ proc processFinalizationForLightClient*( for key in keysToDelete: dag.lcDataStore.cache.pendingBest.del key -func initLightClientDataStore*( - serve: bool, importMode: LightClientDataImportMode, - onLCFinalityUpdateCb: OnLightClientFinalityUpdateCallback = nil, - onLCOptimisticUpdateCb: OnLightClientOptimisticUpdateCallback = nil -): LightClientDataStore = - ## Initialize light client data collector. - LightClientDataStore( - serve: serve, - importMode: importMode, - onLightClientFinalityUpdate: onLCFinalityUpdateCb, - onLightClientOptimisticUpdate: onLCOptimisticUpdateCb) - -proc initLightClientBootstrapForPeriod( - dag: ChainDAGRef, - period: SyncCommitteePeriod) = - ## Compute and cache `LightClientBootstrap` data for all finalized - ## epoch boundary blocks within a given sync committee period. - if not dag.isNextSyncCommitteeFinalized(period): - return - let - earliestSlot = dag.computeEarliestLightClientSlot - periodStartSlot = period.start_slot - periodEndSlot = periodStartSlot + SLOTS_PER_SYNC_COMMITTEE_PERIOD - 1 - if periodEndSlot < earliestSlot: - return - - let startTick = Moment.now() - debug "Caching LC bootstrap data for period", period - defer: - let endTick = Moment.now() - debug "LC bootstrap data for period cached", period, - cacheDur = endTick - startTick - - let - lowSlot = max(periodStartSlot, earliestSlot) - highSlot = min(periodEndSlot, dag.finalizedHead.blck.slot) - lowBoundarySlot = lowSlot.nextEpochBoundarySlot - highBoundarySlot = highSlot.nextEpochBoundarySlot - var - tmpState = assignClone(dag.headState) - tmpCache: StateCache - nextBoundarySlot = lowBoundarySlot - while nextBoundarySlot <= highBoundarySlot: - defer: nextBoundarySlot += SLOTS_PER_EPOCH - let - bsi = dag.getExistingBlockIdAtSlot(nextBoundarySlot).valueOr: - dag.handleUnexpectedLightClientError(nextBoundarySlot) - continue - bid = bsi.bid - boundarySlot = bid.slot.nextEpochBoundarySlot - if boundarySlot == nextBoundarySlot and bid.slot >= lowSlot and - not dag.lcDataStore.cache.bootstrap.hasKey(bid.slot): - if not dag.updateExistingState( - tmpState[], bid.atSlot, save = false, tmpCache): - dag.handleUnexpectedLightClientError(bid.slot) - continue - var cachedBootstrap {.noinit.}: CachedLightClientBootstrap - cachedBootstrap.current_sync_committee_branch = withState(tmpState[]): - when stateFork >= BeaconStateFork.Altair: - state.data.build_proof(altair.CURRENT_SYNC_COMMITTEE_INDEX).get - else: raiseAssert "Unreachable" - dag.lcDataStore.cache.bootstrap[bid.slot] = cachedBootstrap - -proc initLightClientUpdateForPeriod( - dag: ChainDAGRef, period: SyncCommitteePeriod) = - ## Compute and cache the best `LightClientUpdate` within a given - ## sync committee period up through the finalized head block. - ## Non-finalized blocks are processed incrementally. - if not dag.isNextSyncCommitteeFinalized(period): - return - let - earliestSlot = dag.computeEarliestLightClientSlot - periodStartSlot = period.start_slot - periodEndSlot = periodStartSlot + SLOTS_PER_SYNC_COMMITTEE_PERIOD - 1 - if periodEndSlot < earliestSlot: - return - if dag.lcDataStore.cache.best.hasKey(period): - return - - let startTick = Moment.now() - debug "Computing best LC update for period", period - proc logBest(endTick = Moment.now()) = - # Using a helper function reduces code size as the `defer` beneath is - # replicated on every `return`, and the log statement allocates another - # copy of the arguments on the stack for each instantiation (~1 MB stack!) - debug "Best LC update for period computed", - period, update = dag.lcDataStore.cache.best.getOrDefault(period), - computeDur = endTick - startTick - defer: logBest() - - proc maxParticipantsBlock( - dag: ChainDAGRef, highBid: BlockId, lowSlot: Slot - ): tuple[bid: Opt[BlockId], ok: bool] = - ## Determine the earliest block with most sync committee signatures among - ## ancestors of `highBid` with at least `lowSlot` as parent block slot. - ## Return `err` if no block with `MIN_SYNC_COMMITTEE_PARTICIPANTS` exists. - ## `bool` in result indicates whether no unexpected errors occurred. - var - maxParticipants = MIN_SYNC_COMMITTEE_PARTICIPANTS - maxBid: Opt[BlockId] - allOk = true - bid = highBid - while true: - if bid.slot <= lowSlot: - break - let parentBid = dag.existingParent(bid).valueOr: - dag.handleUnexpectedLightClientError(bid.slot) - allOk = false - break - if parentBid.slot < lowSlot: - break - let - bdata = dag.getExistingForkedBlock(bid).valueOr: - dag.handleUnexpectedLightClientError(bid.slot) - allOk = false - break - numParticipants = - withBlck(bdata): - when stateFork >= BeaconStateFork.Altair: - countOnes(blck.message.body.sync_aggregate.sync_committee_bits) - else: raiseAssert "Unreachable" - if numParticipants >= maxParticipants: - maxParticipants = numParticipants - maxBid.ok bid - bid = parentBid - (bid: maxBid, ok: allOk) - - # Determine the block in the period with highest sync committee participation - let - lowSlot = max(periodStartSlot, earliestSlot) - highSlot = min(periodEndSlot, dag.finalizedHead.blck.slot) - highBsi = dag.getExistingBlockIdAtSlot(highSlot).valueOr: - dag.handleUnexpectedLightClientError(highSlot) - return - highBid = highBsi.bid - maxParticipantsRes = dag.maxParticipantsBlock(highBid, lowSlot) - maxParticipantsBid = maxParticipantsRes.bid.valueOr: - if maxParticipantsRes.ok: # No single valid block exists in the period - dag.lcDataStore.cache.best[period] = default(altair.LightClientUpdate) - return - - # The block with highest participation may refer to a `finalized_checkpoint` - # in a different sync committee period. If that is the case, search for a - # later block with a `finalized_checkpoint` within the given sync committee - # period, despite it having a lower sync committee participation - var - tmpState = assignClone(dag.headState) - signatureBid {.noinit.}, finalizedBid {.noinit.}: BlockId - signatureBid.slot = FAR_FUTURE_SLOT - finalizedBid.slot = FAR_FUTURE_SLOT - while true: - if signatureBid.slot == FAR_FUTURE_SLOT: - signatureBid = maxParticipantsBid - else: - let - nextLowSlot = signatureBid.slot + 1 - signatureRes = dag.maxParticipantsBlock(highBid, nextLowSlot) - signatureBid = signatureRes.bid.valueOr: - signatureBid = maxParticipantsBid - break - let - attestedBid = dag.existingParent(signatureBid).valueOr: - dag.handleUnexpectedLightClientError(signatureBid.slot) - continue - finalizedEpoch = block: - dag.withUpdatedExistingState(tmpState[], attestedBid.atSlot) do: - withState(state): - when stateFork >= BeaconStateFork.Altair: - state.data.finalized_checkpoint.epoch - else: raiseAssert "Unreachable" - do: - dag.handleUnexpectedLightClientError(attestedBid.slot) - continue - finalizedSlot = finalizedEpoch.start_slot - finalizedBsi = - if finalizedSlot >= dag.tail.slot: - dag.getExistingBlockIdAtSlot(finalizedSlot).valueOr: - dag.handleUnexpectedLightClientError(finalizedSlot) - continue - else: - continue - if finalizedBid.slot >= lowSlot: - finalizedBid = finalizedBsi.bid - break - if signatureBid == maxParticipantsBid: - finalizedBid = finalizedBsi.bid # For fallback `break` at start of loop - - # Save best light client data for given period - var update {.noinit.}: altair.LightClientUpdate - let attestedBid = dag.existingParent(signatureBid).valueOr: - dag.handleUnexpectedLightClientError(signatureBid.slot) - return - dag.withUpdatedExistingState(tmpState[], attestedBid.atSlot) do: - let bdata = dag.getExistingForkedBlock(bid).valueOr: - dag.handleUnexpectedLightClientError(bid.slot) - return - withStateAndBlck(state, bdata): - when stateFork >= BeaconStateFork.Altair: - update.attested_header = blck.toBeaconBlockHeader() - update.next_sync_committee = state.data.next_sync_committee - update.next_sync_committee_branch = - state.data.build_proof(altair.NEXT_SYNC_COMMITTEE_INDEX).get - if finalizedBid.slot == FAR_FUTURE_SLOT: - update.finality_branch.reset() - else: - update.finality_branch = - state.data.build_proof(altair.FINALIZED_ROOT_INDEX).get - else: raiseAssert "Unreachable" - do: - dag.handleUnexpectedLightClientError(attestedBid.slot) - return - if finalizedBid.slot == FAR_FUTURE_SLOT or finalizedBid.slot == GENESIS_SLOT: - update.finalized_header.reset() - else: - let bdata = dag.getExistingForkedBlock(finalizedBid).valueOr: - dag.handleUnexpectedLightClientError(finalizedBid.slot) - return - withBlck(bdata): - update.finalized_header = blck.toBeaconBlockHeader() - let bdata = dag.getExistingForkedBlock(signatureBid).valueOr: - dag.handleUnexpectedLightClientError(signatureBid.slot) - return - withBlck(bdata): - when stateFork >= BeaconStateFork.Altair: - update.sync_aggregate = blck.asSigned().message.body.sync_aggregate - else: raiseAssert "Unreachable" - update.signature_slot = signatureBid.slot - dag.lcDataStore.cache.best[period] = update - -proc initLightClientDataCache*(dag: ChainDAGRef) = - ## Initialize cached light client data - if dag.lcDataStore.importMode == LightClientDataImportMode.None: - return - dag.lcDataStore.cache.importTailSlot = dag.tail.slot - if dag.lcDataStore.importMode == LightClientDataImportMode.OnlyNew: - dag.lcDataStore.cache.importTailSlot = dag.head.slot - var earliestSlot = dag.computeEarliestLightClientSlot - if dag.head.slot < earliestSlot: - return - - # Import light client data for finalized period through finalized head - let - finalizedSlot = dag.finalizedHead.slot - finalizedPeriod = finalizedSlot.sync_committee_period - dag.initLightClientBootstrapForPeriod(finalizedPeriod) - dag.initLightClientUpdateForPeriod(finalizedPeriod) - - let lightClientStartTick = Moment.now() - debug "Initializing cached light client data" - - template handleUnexpectedError(buggedBid: BlockId) = - # Light client data is expected to be available from `earliestSlot` onward. - # If there is an error, adjust `importTailSlot` to avoid failed lookups of - # cached light client data. For new blocks / states, the caches can always - # be updated incrementally, because those blocks / states are passed in - # directly. It is only historical blocks (or sync committees) that depend - # on a potentially corrupted database. - doAssert buggedBid.slot > dag.lcDataStore.cache.importTailSlot - dag.handleUnexpectedLightClientError(buggedBid.slot) - earliestSlot = dag.computeEarliestLightClientSlot - - # Build list of block to process. - # As it is slow to load states in descending order, - # build a reverse todo list to then process them in ascending order - let lowSlot = max(finalizedSlot, earliestSlot) - var - blocks = newSeqOfCap[BlockId](dag.head.slot - lowSlot + 1) - bid = dag.head.bid - while bid.slot > lowSlot: - blocks.add bid - bid = dag.existingParent(bid).valueOr: - handleUnexpectedError(bid) - break - if bid.slot == lowSlot: - blocks.add bid - - # Process blocks (reuses `dag.headState`, but restores it to the current head) - var - tmpState = assignClone(dag.headState) - tmpCache, cache: StateCache - oldCheckpoint: Checkpoint - cpIndex = 0 - for i in countdown(blocks.high, blocks.low): - bid = blocks[i] - if not dag.updateExistingState( - dag.headState, bid.atSlot, save = false, cache): - handleUnexpectedError(bid) - continue - let bdata = dag.getExistingForkedBlock(bid).valueOr: - handleUnexpectedError(bid) - continue - withStateAndBlck(dag.headState, bdata): - when stateFork >= BeaconStateFork.Altair: - # Cache light client data (non-finalized blocks may refer to this) - dag.cacheLightClientData(state, blck.toBlockId()) - - # Create `LightClientUpdate` instances - if bid.slot != lowSlot: - dag.createLightClientUpdates(state, blck, parentBid = blocks[i + 1]) - else: raiseAssert "Unreachable" - - let lightClientEndTick = Moment.now() - debug "Initialized cached light client data", - initDur = lightClientEndTick - lightClientStartTick - - # Import historic data - if dag.lcDataStore.importMode == LightClientDataImportMode.Full: - let earliestPeriod = earliestSlot.sync_committee_period - for period in earliestPeriod ..< finalizedPeriod: - dag.initLightClientBootstrapForPeriod(period) - dag.initLightClientUpdateForPeriod(period) - proc getLightClientBootstrap*( dag: ChainDAGRef, blockRoot: Eth2Digest): Opt[altair.LightClientBootstrap] = @@ -799,8 +792,7 @@ proc getLightClientBootstrap*( withBlck(bdata): let slot = blck.message.slot when stateFork >= BeaconStateFork.Altair: - let earliestSlot = dag.computeEarliestLightClientSlot - if slot < earliestSlot: + if slot < dag.lcDataStore.cache.tailSlot: debug "LC bootstrap unavailable: Block too old", slot return err() if slot > dag.finalizedHead.blck.slot: diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 339ffef0e..4d5f7a6c6 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -194,6 +194,7 @@ proc loadChainDag( onLCOptimisticUpdateCb = onLightClientOptimisticUpdateCb, lightClientDataServe = config.lightClientDataServe.get, lightClientDataImportMode = config.lightClientDataImportMode.get, + lightClientDataMaxPeriods = config.lightClientDataMaxPeriods, vanityLogs = getPandas(detectTTY(config.logStdout))) let diff --git a/beacon_chain/spec/presets.nim b/beacon_chain/spec/presets.nim index a17e9e0db..1b09d3f73 100644 --- a/beacon_chain/spec/presets.nim +++ b/beacon_chain/spec/presets.nim @@ -479,3 +479,12 @@ template name*(cfg: RuntimeConfig): string = cfg.CONFIG_NAME else: const_preset + +# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/phase0/p2p-interface.md#configuration +func MIN_EPOCHS_FOR_BLOCK_REQUESTS*(cfg: RuntimeConfig): uint64 = + cfg.MIN_VALIDATOR_WITHDRAWABILITY_DELAY + cfg.CHURN_LIMIT_QUOTIENT div 2 + +func defaultLCDataMaxPeriods*(cfg: RuntimeConfig): uint64 = + const epochsPerPeriod = EPOCHS_PER_SYNC_COMMITTEE_PERIOD + let maxEpochs = cfg.MIN_EPOCHS_FOR_BLOCK_REQUESTS + (maxEpochs + epochsPerPeriod - 1) div epochsPerPeriod