diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index 7a31eb98d..66b022beb 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -456,6 +456,11 @@ type "Must be one of: none, only-new, full (slow startup), on-demand (may miss validator duties)" name: "light-client-data-import-mode" .}: Option[LightClientDataImportMode] + lightClientDataMaxPeriods* {. + hidden + desc: "BETA: Maximum number of sync committee periods to retain light client data" + name: "light-client-data-max-periods" .}: Option[uint64] + inProcessValidators* {. desc: "Disable the push model (the beacon node tells a signing process with the private keys of the validators what to sign and when) and load the validators in the beacon node itself" defaultValue: true # the use of the nimbus_signing_process binary by default will be delayed until async I/O over stdin/stdout is developed for the child process. diff --git a/beacon_chain/consensus_object_pools/block_pools_types_light_client.nim b/beacon_chain/consensus_object_pools/block_pools_types_light_client.nim index 02b647b2d..3421aafb3 100644 --- a/beacon_chain/consensus_object_pools/block_pools_types_light_client.nim +++ b/beacon_chain/consensus_object_pools/block_pools_types_light_client.nim @@ -77,7 +77,7 @@ type ## Tracks light client data for the latest slot that was signed by ## at least `MIN_SYNC_COMMITTEE_PARTICIPANTS`. May be older than head. - importTailSlot*: Slot + tailSlot*: Slot ## The earliest slot for which light client data is imported. LightClientDataStore* = object @@ -94,6 +94,8 @@ type ## Whether to make local light client data available or not importMode*: LightClientDataImportMode ## Which classes of light client data to import + maxPeriods*: uint64 + ## Maximum number of sync committee periods to retain light client data # ----------------------------------- # Callbacks diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index 45eed8570..2b71ed4e2 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -689,6 +689,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, onLCOptimisticUpdateCb: OnLightClientOptimisticUpdateCallback = nil, lightClientDataServe = false, lightClientDataImportMode = LightClientDataImportMode.None, + lightClientDataMaxPeriods = none(uint64), vanityLogs = default(VanityLogs)): ChainDAGRef = cfg.checkForkConsistency() @@ -728,6 +729,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, lcDataStore: initLightClientDataStore( serve = lightClientDataServe, importMode = lightClientDataImportMode, + maxPeriods = lightClientDataMaxPeriods.get(cfg.defaultLCDataMaxPeriods), onLCFinalityUpdateCb = onLCFinalityUpdateCb, onLCOptimisticUpdateCb = onLCOptimisticUpdateCb), diff --git a/beacon_chain/consensus_object_pools/blockchain_dag_light_client.nim b/beacon_chain/consensus_object_pools/blockchain_dag_light_client.nim index d04aa9c48..5e3db38ae 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag_light_client.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag_light_client.nim @@ -34,28 +34,6 @@ template nextEpochBoundarySlot(slot: Slot): Slot = ## referring to a block at given slot. (slot + (SLOTS_PER_EPOCH - 1)).epoch.start_slot -func computeEarliestLightClientSlot(dag: ChainDAGRef): Slot = - ## Compute the earliest slot for which light client data is retained. - let - minSupportedSlot = max( - dag.cfg.ALTAIR_FORK_EPOCH.start_slot, - dag.lcDataStore.cache.importTailSlot) - currentSlot = getStateField(dag.headState, slot) - if currentSlot < minSupportedSlot: - return minSupportedSlot - - let - MIN_EPOCHS_FOR_BLOCK_REQUESTS = - dag.cfg.MIN_VALIDATOR_WITHDRAWABILITY_DELAY + - dag.cfg.CHURN_LIMIT_QUOTIENT div 2 - MIN_SLOTS_FOR_BLOCK_REQUESTS = - MIN_EPOCHS_FOR_BLOCK_REQUESTS * SLOTS_PER_EPOCH - if currentSlot - minSupportedSlot < MIN_SLOTS_FOR_BLOCK_REQUESTS: - return minSupportedSlot - - let earliestSlot = currentSlot - MIN_SLOTS_FOR_BLOCK_REQUESTS - max(earliestSlot.sync_committee_period.start_slot, minSupportedSlot) - proc updateExistingState( dag: ChainDAGRef, state: var ForkedHashedBeaconState, bsi: BlockSlotId, save: bool, cache: var StateCache): bool = @@ -145,6 +123,245 @@ proc syncCommitteeRootForPeriod( else: raiseAssert "Unreachable" do: err() +func initLightClientDataStore*( + serve: bool, importMode: LightClientDataImportMode, maxPeriods: uint64, + onLCFinalityUpdateCb: OnLightClientFinalityUpdateCallback = nil, + onLCOptimisticUpdateCb: OnLightClientOptimisticUpdateCallback = nil +): LightClientDataStore = + ## Initialize light client data store. + LightClientDataStore( + serve: serve, + importMode: importMode, + maxPeriods: maxPeriods, + onLightClientFinalityUpdate: onLCFinalityUpdateCb, + onLightClientOptimisticUpdate: onLCOptimisticUpdateCb) + +func targetLightClientTailSlot(dag: ChainDAGRef): Slot = + ## Earliest slot for which light client data is retained. + let + maxPeriods = dag.lcDataStore.maxPeriods + headPeriod = dag.head.slot.sync_committee_period + lowSlot = max(dag.tail.slot, dag.cfg.ALTAIR_FORK_EPOCH.start_slot) + tail = max(headPeriod + 1, maxPeriods.SyncCommitteePeriod) - maxPeriods + max(tail.start_slot, lowSlot) + +func handleUnexpectedLightClientError(dag: ChainDAGRef, buggedSlot: Slot) = + ## If there is an unexpected error, adjust `tailSlot` to keep track of the + ## section for which complete light client data is available, and to avoid + ## failed lookups of cached light client data. + doAssert verifyFinalization notin dag.updateFlags + if buggedSlot >= dag.lcDataStore.cache.tailSlot: + dag.lcDataStore.cache.tailSlot = buggedSlot + 1 + +proc initLightClientBootstrapForPeriod( + dag: ChainDAGRef, + period: SyncCommitteePeriod) = + ## Compute and cache `LightClientBootstrap` data for all finalized + ## epoch boundary blocks within a given sync committee period. + if not dag.isNextSyncCommitteeFinalized(period): + return + + let startTick = Moment.now() + debug "Caching historic LC bootstrap data", period + defer: + let endTick = Moment.now() + debug "Historic LC bootstrap data cached", period, + cacheDur = endTick - startTick + + let + periodStartSlot = period.start_slot + periodEndSlot = periodStartSlot + SLOTS_PER_SYNC_COMMITTEE_PERIOD - 1 + lowSlot = max(periodStartSlot, dag.targetLightClientTailSlot) + highSlot = min(periodEndSlot, dag.finalizedHead.blck.slot) + lowBoundarySlot = lowSlot.nextEpochBoundarySlot + highBoundarySlot = highSlot.nextEpochBoundarySlot + var + tmpState = assignClone(dag.headState) + tmpCache: StateCache + nextBoundarySlot = lowBoundarySlot + while nextBoundarySlot <= highBoundarySlot: + defer: nextBoundarySlot += SLOTS_PER_EPOCH + let + bsi = dag.getExistingBlockIdAtSlot(nextBoundarySlot).valueOr: + dag.handleUnexpectedLightClientError(nextBoundarySlot) + continue + bid = bsi.bid + boundarySlot = bid.slot.nextEpochBoundarySlot + if boundarySlot == nextBoundarySlot and bid.slot >= lowSlot and + not dag.lcDataStore.cache.bootstrap.hasKey(bid.slot): + if not dag.updateExistingState( + tmpState[], bid.atSlot, save = false, tmpCache): + dag.handleUnexpectedLightClientError(bid.slot) + continue + var cachedBootstrap {.noinit.}: CachedLightClientBootstrap + cachedBootstrap.current_sync_committee_branch = withState(tmpState[]): + when stateFork >= BeaconStateFork.Altair: + state.data.build_proof(altair.CURRENT_SYNC_COMMITTEE_INDEX).get + else: raiseAssert "Unreachable" + dag.lcDataStore.cache.bootstrap[bid.slot] = cachedBootstrap + +proc initLightClientUpdateForPeriod( + dag: ChainDAGRef, period: SyncCommitteePeriod) = + ## Compute and cache the best `LightClientUpdate` within a given + ## sync committee period up through the finalized head block. + ## Non-finalized blocks are processed incrementally. + if not dag.isNextSyncCommitteeFinalized(period): + return + if dag.lcDataStore.cache.best.hasKey(period): + return + + let startTick = Moment.now() + debug "Computing best historic LC update", period + proc logBest(endTick = Moment.now()) = + # Using a helper function reduces code size as the `defer` beneath is + # replicated on every `return`, and the log statement allocates another + # copy of the arguments on the stack for each instantiation (~1 MB stack!) + debug "Best historic LC update computed", + period, update = dag.lcDataStore.cache.best.getOrDefault(period), + computeDur = endTick - startTick + defer: logBest() + + proc maxParticipantsBlock( + dag: ChainDAGRef, highBid: BlockId, lowSlot: Slot + ): tuple[bid: Opt[BlockId], ok: bool] = + ## Determine the earliest block with most sync committee signatures among + ## ancestors of `highBid` with at least `lowSlot` as parent block slot. + ## Return `err` if no block with `MIN_SYNC_COMMITTEE_PARTICIPANTS` exists. + ## `bool` in result indicates whether no unexpected errors occurred. + var + maxParticipants = MIN_SYNC_COMMITTEE_PARTICIPANTS + maxBid: Opt[BlockId] + allOk = true + bid = highBid + while true: + if bid.slot <= lowSlot: + break + let parentBid = dag.existingParent(bid).valueOr: + dag.handleUnexpectedLightClientError(bid.slot) + allOk = false + break + if parentBid.slot < lowSlot: + break + let + bdata = dag.getExistingForkedBlock(bid).valueOr: + dag.handleUnexpectedLightClientError(bid.slot) + allOk = false + break + numParticipants = + withBlck(bdata): + when stateFork >= BeaconStateFork.Altair: + countOnes(blck.message.body.sync_aggregate.sync_committee_bits) + else: raiseAssert "Unreachable" + if numParticipants >= maxParticipants: + maxParticipants = numParticipants + maxBid.ok bid + bid = parentBid + (bid: maxBid, ok: allOk) + + # Determine the block in the period with highest sync committee participation + let + periodStartSlot = period.start_slot + periodEndSlot = periodStartSlot + SLOTS_PER_SYNC_COMMITTEE_PERIOD - 1 + lowSlot = max(periodStartSlot, dag.targetLightClientTailSlot) + highSlot = min(periodEndSlot, dag.finalizedHead.blck.slot) + highBsi = dag.getExistingBlockIdAtSlot(highSlot).valueOr: + dag.handleUnexpectedLightClientError(highSlot) + return + highBid = highBsi.bid + maxParticipantsRes = dag.maxParticipantsBlock(highBid, lowSlot) + maxParticipantsBid = maxParticipantsRes.bid.valueOr: + if maxParticipantsRes.ok: # No single valid block exists in the period + dag.lcDataStore.cache.best[period] = default(altair.LightClientUpdate) + return + + # The block with highest participation may refer to a `finalized_checkpoint` + # in a different sync committee period. If that is the case, search for a + # later block with a `finalized_checkpoint` within the given sync committee + # period, despite it having a lower sync committee participation + var + tmpState = assignClone(dag.headState) + signatureBid {.noinit.}, finalizedBid {.noinit.}: BlockId + signatureBid.slot = FAR_FUTURE_SLOT + finalizedBid.slot = FAR_FUTURE_SLOT + while true: + if signatureBid.slot == FAR_FUTURE_SLOT: + signatureBid = maxParticipantsBid + else: + let + nextLowSlot = signatureBid.slot + 1 + signatureRes = dag.maxParticipantsBlock(highBid, nextLowSlot) + signatureBid = signatureRes.bid.valueOr: + signatureBid = maxParticipantsBid + break + let + attestedBid = dag.existingParent(signatureBid).valueOr: + dag.handleUnexpectedLightClientError(signatureBid.slot) + continue + finalizedEpoch = block: + dag.withUpdatedExistingState(tmpState[], attestedBid.atSlot) do: + withState(state): + when stateFork >= BeaconStateFork.Altair: + state.data.finalized_checkpoint.epoch + else: raiseAssert "Unreachable" + do: + dag.handleUnexpectedLightClientError(attestedBid.slot) + continue + finalizedSlot = finalizedEpoch.start_slot + finalizedBsi = + if finalizedSlot >= dag.tail.slot: + dag.getExistingBlockIdAtSlot(finalizedSlot).valueOr: + dag.handleUnexpectedLightClientError(finalizedSlot) + continue + else: + continue + if finalizedBid.slot >= lowSlot: + finalizedBid = finalizedBsi.bid + break + if signatureBid == maxParticipantsBid: + finalizedBid = finalizedBsi.bid # For fallback `break` at start of loop + + # Save best light client data for given period + var update {.noinit.}: altair.LightClientUpdate + let attestedBid = dag.existingParent(signatureBid).valueOr: + dag.handleUnexpectedLightClientError(signatureBid.slot) + return + dag.withUpdatedExistingState(tmpState[], attestedBid.atSlot) do: + let bdata = dag.getExistingForkedBlock(bid).valueOr: + dag.handleUnexpectedLightClientError(bid.slot) + return + withStateAndBlck(state, bdata): + when stateFork >= BeaconStateFork.Altair: + update.attested_header = blck.toBeaconBlockHeader() + update.next_sync_committee = state.data.next_sync_committee + update.next_sync_committee_branch = + state.data.build_proof(altair.NEXT_SYNC_COMMITTEE_INDEX).get + if finalizedBid.slot == FAR_FUTURE_SLOT: + update.finality_branch.reset() + else: + update.finality_branch = + state.data.build_proof(altair.FINALIZED_ROOT_INDEX).get + else: raiseAssert "Unreachable" + do: + dag.handleUnexpectedLightClientError(attestedBid.slot) + return + if finalizedBid.slot == FAR_FUTURE_SLOT or finalizedBid.slot == GENESIS_SLOT: + update.finalized_header.reset() + else: + let bdata = dag.getExistingForkedBlock(finalizedBid).valueOr: + dag.handleUnexpectedLightClientError(finalizedBid.slot) + return + withBlck(bdata): + update.finalized_header = blck.toBeaconBlockHeader() + let bdata = dag.getExistingForkedBlock(signatureBid).valueOr: + dag.handleUnexpectedLightClientError(signatureBid.slot) + return + withBlck(bdata): + when stateFork >= BeaconStateFork.Altair: + update.sync_aggregate = blck.asSigned().message.body.sync_aggregate + else: raiseAssert "Unreachable" + update.signature_slot = signatureBid.slot + dag.lcDataStore.cache.best[period] = update + proc getLightClientData( dag: ChainDAGRef, bid: BlockId): CachedLightClientData = @@ -178,14 +395,6 @@ proc deleteLightClientData*(dag: ChainDAGRef, bid: BlockId) = dag.lcDataStore.cache.data.del bid -func handleUnexpectedLightClientError(dag: ChainDAGRef, buggedSlot: Slot) = - ## If there is an unexpected error, adjust `importTailSlot` to keep track of - ## section for which complete light client data is available, and to avoid - ## failed lookups of cached light client data. - doAssert verifyFinalization notin dag.updateFlags - if buggedSlot >= dag.lcDataStore.cache.importTailSlot: - dag.lcDataStore.cache.importTailSlot = buggedSlot + 1 - template lazy_header(name: untyped): untyped {.dirty.} = ## `createLightClientUpdates` helper to lazily load a known block header. var @@ -246,10 +455,8 @@ proc createLightClientUpdates( # Verify attested block (parent) is recent enough and that state is available template attested_bid(): auto = parent_bid - let - earliest_slot = dag.computeEarliestLightClientSlot - attested_slot = attested_bid.slot - if attested_slot < earliest_slot: + let attested_slot = attested_bid.slot + if attested_slot < dag.lcDataStore.cache.tailSlot: return # Lazy variables to hold historic data @@ -337,19 +544,115 @@ proc createLightClientUpdates( if isCommitteeFinalized: dag.lcDataStore.cache.best[attested_period] = best - debug "Best LC update for period improved", - period = attested_period, update = best + debug "Best LC update improved", period = attested_period, update = best else: let key = (attested_period, state.syncCommitteeRoot) dag.lcDataStore.cache.pendingBest[key] = best - debug "Best LC update for period improved", - period = key, update = best + debug "Best LC update improved", period = key, update = best if newFinality and dag.lcDataStore.onLightClientFinalityUpdate != nil: dag.lcDataStore.onLightClientFinalityUpdate(latest) if newOptimistic and dag.lcDataStore.onLightClientOptimisticUpdate != nil: dag.lcDataStore.onLightClientOptimisticUpdate(latest.toOptimistic) +proc initLightClientDataCache*(dag: ChainDAGRef) = + ## Initialize cached light client data + if dag.lcDataStore.importMode == LightClientDataImportMode.None: + return + + # Initialize tail slot + let targetTailSlot = dag.targetLightClientTailSlot + dag.lcDataStore.cache.tailSlot = max(dag.head.slot, targetTailSlot) + + # Import head state + if dag.head.slot < dag.lcDataStore.cache.tailSlot: + return + withState(dag.headState): + when stateFork >= BeaconStateFork.Altair: + dag.cacheLightClientData(state, dag.head.bid) + else: raiseAssert "Unreachable" # `tailSlot` cannot be before Altair + if dag.lcDataStore.importMode == LightClientDataImportMode.OnlyNew: + return + + # Import light client data for finalized period through finalized head + let finalizedSlot = max(dag.finalizedHead.blck.slot, targetTailSlot) + if finalizedSlot >= dag.lcDataStore.cache.tailSlot: + return + dag.lcDataStore.cache.tailSlot = finalizedSlot + let finalizedPeriod = finalizedSlot.sync_committee_period + dag.initLightClientBootstrapForPeriod(finalizedPeriod) + dag.initLightClientUpdateForPeriod(finalizedPeriod) + + let lightClientStartTick = Moment.now() + logScope: lightClientDataMaxPeriods = dag.lcDataStore.maxPeriods + debug "Initializing cached LC data" + + # Build list of block to process. + # As it is slow to load states in descending order, + # build a reverse todo list to then process them in ascending order + var + blocks = newSeqOfCap[BlockId](dag.head.slot - finalizedSlot + 1) + bid = dag.head.bid + while bid.slot > finalizedSlot: + blocks.add bid + bid = dag.existingParent(bid).valueOr: + dag.handleUnexpectedLightClientError(bid.slot) + break + if bid.slot == finalizedSlot: + blocks.add bid + + # Process blocks (reuses `dag.headState`, but restores it to the current head) + var + tmpState = assignClone(dag.headState) + tmpCache, cache: StateCache + oldCheckpoint: Checkpoint + cpIndex = 0 + for i in countdown(blocks.high, blocks.low): + bid = blocks[i] + if not dag.updateExistingState( + dag.headState, bid.atSlot, save = false, cache): + dag.handleUnexpectedLightClientError(bid.slot) + continue + let bdata = dag.getExistingForkedBlock(bid).valueOr: + dag.handleUnexpectedLightClientError(bid.slot) + continue + withStateAndBlck(dag.headState, bdata): + when stateFork >= BeaconStateFork.Altair: + # Cache light client data (non-finalized blocks may refer to this) + if i != blocks.low: + dag.cacheLightClientData(state, bid) # `dag.head` already cached + + # Create `LightClientUpdate` instances + if i < blocks.high: + dag.createLightClientUpdates(state, blck, parentBid = blocks[i + 1]) + else: raiseAssert "Unreachable" + + let lightClientEndTick = Moment.now() + debug "Initialized cached LC data", + initDur = lightClientEndTick - lightClientStartTick + if dag.lcDataStore.importMode == LightClientDataImportMode.OnDemand: + return + + # Import historic data + dag.lcDataStore.cache.tailSlot = targetTailSlot + let targetTailPeriod = targetTailSlot.sync_committee_period + if targetTailPeriod < finalizedPeriod: + # `countdown` through 0 fails on distinct `uint64` + # https://github.com/nim-lang/Nim/pull/19926 + var period = finalizedPeriod - 1 + while period >= targetTailPeriod: + dag.initLightClientBootstrapForPeriod(period) + dag.initLightClientUpdateForPeriod(period) + if period <= targetTailPeriod: + break + dec period + if dag.lcDataStore.cache.tailSlot == targetTailSlot: + debug "Historic LC data imported" + else: + # `handleUnexpectedLightClientError` updates `tailSlot` + warn "Error while importing historic LC data", + errorSlot = dag.lcDataStore.cache.tailSlot - 1, targetTailSlot + proc processNewBlockForLightClient*( dag: ChainDAGRef, state: ForkedHashedBeaconState, @@ -358,7 +661,7 @@ proc processNewBlockForLightClient*( ## Update light client data with information from a new block. if dag.lcDataStore.importMode == LightClientDataImportMode.None: return - if signedBlock.message.slot < dag.computeEarliestLightClientSlot: + if signedBlock.message.slot < dag.lcDataStore.cache.tailSlot: return when signedBlock is bellatrix.TrustedSignedBeaconBlock: @@ -368,7 +671,7 @@ proc processNewBlockForLightClient*( dag.cacheLightClientData(state.altairData, signedBlock.toBlockId()) dag.createLightClientUpdates(state.altairData, signedBlock, parentBid) elif signedBlock is phase0.TrustedSignedBeaconBlock: - raiseAssert "Unreachable" # `earliestSlot` cannot be before Altair + raiseAssert "Unreachable" # `tailSlot` cannot be before Altair else: {.error: "Unreachable".} @@ -377,8 +680,7 @@ proc processHeadChangeForLightClient*(dag: ChainDAGRef) = ## Note that `dag.finalizedHead` is not yet updated when this is called. if dag.lcDataStore.importMode == LightClientDataImportMode.None: return - let earliestSlot = dag.computeEarliestLightClientSlot - if dag.head.slot < earliestSlot: + if dag.head.slot < dag.lcDataStore.cache.tailSlot: return # Update `best` from `pendingBest` to ensure light client data @@ -386,8 +688,8 @@ proc processHeadChangeForLightClient*(dag: ChainDAGRef) = let headPeriod = dag.head.slot.sync_committee_period if not dag.isNextSyncCommitteeFinalized(headPeriod): let - earliestPeriod = earliestSlot.sync_committee_period - lowPeriod = max(dag.firstNonFinalizedPeriod, earliestPeriod) + tailPeriod = dag.lcDataStore.cache.tailSlot.sync_committee_period + lowPeriod = max(dag.firstNonFinalizedPeriod, tailPeriod) if headPeriod > lowPeriod: var tmpState = assignClone(dag.headState) for period in lowPeriod ..< headPeriod: @@ -404,7 +706,7 @@ proc processHeadChangeForLightClient*(dag: ChainDAGRef) = let key = (headPeriod, state.syncCommitteeRoot) dag.lcDataStore.cache.best[headPeriod] = dag.lcDataStore.cache.pendingBest.getOrDefault(key) - else: raiseAssert "Unreachable" + else: raiseAssert "Unreachable" # `tailSlot` cannot be before Altair proc processFinalizationForLightClient*( dag: ChainDAGRef, oldFinalizedHead: BlockSlot) = @@ -413,14 +715,14 @@ proc processFinalizationForLightClient*( ## This needs to be called whenever `finalized_checkpoint` changes. if dag.lcDataStore.importMode == LightClientDataImportMode.None: return - let - earliestSlot = dag.computeEarliestLightClientSlot - finalizedSlot = dag.finalizedHead.slot - if finalizedSlot < earliestSlot: + let finalizedSlot = dag.finalizedHead.slot + if finalizedSlot < dag.lcDataStore.cache.tailSlot: return # Cache `LightClientBootstrap` for newly finalized epoch boundary blocks - let lowSlot = max(oldFinalizedHead.slot + 1, earliestSlot) + let + firstNewSlot = oldFinalizedHead.slot + 1 + lowSlot = max(firstNewSlot, dag.lcDataStore.cache.tailSlot) var boundarySlot = finalizedSlot while boundarySlot >= lowSlot: let @@ -447,19 +749,22 @@ proc processFinalizationForLightClient*( for bid in bidsToDelete: dag.lcDataStore.cache.data.del bid + let + targetTailSlot = dag.targetLightClientTailSlot + targetTailPeriod = targetTailSlot.sync_committee_period + # Prune bootstrap data that is no longer relevant var slotsToDelete: seq[Slot] for slot in dag.lcDataStore.cache.bootstrap.keys: - if slot < earliestSlot: + if slot < targetTailSlot: slotsToDelete.add slot for slot in slotsToDelete: dag.lcDataStore.cache.bootstrap.del slot # Prune best `LightClientUpdate` that are no longer relevant - let earliestPeriod = earliestSlot.sync_committee_period var periodsToDelete: seq[SyncCommitteePeriod] for period in dag.lcDataStore.cache.best.keys: - if period < earliestPeriod: + if period < targetTailPeriod: periodsToDelete.add period for period in periodsToDelete: dag.lcDataStore.cache.best.del period @@ -474,318 +779,6 @@ proc processFinalizationForLightClient*( for key in keysToDelete: dag.lcDataStore.cache.pendingBest.del key -func initLightClientDataStore*( - serve: bool, importMode: LightClientDataImportMode, - onLCFinalityUpdateCb: OnLightClientFinalityUpdateCallback = nil, - onLCOptimisticUpdateCb: OnLightClientOptimisticUpdateCallback = nil -): LightClientDataStore = - ## Initialize light client data collector. - LightClientDataStore( - serve: serve, - importMode: importMode, - onLightClientFinalityUpdate: onLCFinalityUpdateCb, - onLightClientOptimisticUpdate: onLCOptimisticUpdateCb) - -proc initLightClientBootstrapForPeriod( - dag: ChainDAGRef, - period: SyncCommitteePeriod) = - ## Compute and cache `LightClientBootstrap` data for all finalized - ## epoch boundary blocks within a given sync committee period. - if not dag.isNextSyncCommitteeFinalized(period): - return - let - earliestSlot = dag.computeEarliestLightClientSlot - periodStartSlot = period.start_slot - periodEndSlot = periodStartSlot + SLOTS_PER_SYNC_COMMITTEE_PERIOD - 1 - if periodEndSlot < earliestSlot: - return - - let startTick = Moment.now() - debug "Caching LC bootstrap data for period", period - defer: - let endTick = Moment.now() - debug "LC bootstrap data for period cached", period, - cacheDur = endTick - startTick - - let - lowSlot = max(periodStartSlot, earliestSlot) - highSlot = min(periodEndSlot, dag.finalizedHead.blck.slot) - lowBoundarySlot = lowSlot.nextEpochBoundarySlot - highBoundarySlot = highSlot.nextEpochBoundarySlot - var - tmpState = assignClone(dag.headState) - tmpCache: StateCache - nextBoundarySlot = lowBoundarySlot - while nextBoundarySlot <= highBoundarySlot: - defer: nextBoundarySlot += SLOTS_PER_EPOCH - let - bsi = dag.getExistingBlockIdAtSlot(nextBoundarySlot).valueOr: - dag.handleUnexpectedLightClientError(nextBoundarySlot) - continue - bid = bsi.bid - boundarySlot = bid.slot.nextEpochBoundarySlot - if boundarySlot == nextBoundarySlot and bid.slot >= lowSlot and - not dag.lcDataStore.cache.bootstrap.hasKey(bid.slot): - if not dag.updateExistingState( - tmpState[], bid.atSlot, save = false, tmpCache): - dag.handleUnexpectedLightClientError(bid.slot) - continue - var cachedBootstrap {.noinit.}: CachedLightClientBootstrap - cachedBootstrap.current_sync_committee_branch = withState(tmpState[]): - when stateFork >= BeaconStateFork.Altair: - state.data.build_proof(altair.CURRENT_SYNC_COMMITTEE_INDEX).get - else: raiseAssert "Unreachable" - dag.lcDataStore.cache.bootstrap[bid.slot] = cachedBootstrap - -proc initLightClientUpdateForPeriod( - dag: ChainDAGRef, period: SyncCommitteePeriod) = - ## Compute and cache the best `LightClientUpdate` within a given - ## sync committee period up through the finalized head block. - ## Non-finalized blocks are processed incrementally. - if not dag.isNextSyncCommitteeFinalized(period): - return - let - earliestSlot = dag.computeEarliestLightClientSlot - periodStartSlot = period.start_slot - periodEndSlot = periodStartSlot + SLOTS_PER_SYNC_COMMITTEE_PERIOD - 1 - if periodEndSlot < earliestSlot: - return - if dag.lcDataStore.cache.best.hasKey(period): - return - - let startTick = Moment.now() - debug "Computing best LC update for period", period - proc logBest(endTick = Moment.now()) = - # Using a helper function reduces code size as the `defer` beneath is - # replicated on every `return`, and the log statement allocates another - # copy of the arguments on the stack for each instantiation (~1 MB stack!) - debug "Best LC update for period computed", - period, update = dag.lcDataStore.cache.best.getOrDefault(period), - computeDur = endTick - startTick - defer: logBest() - - proc maxParticipantsBlock( - dag: ChainDAGRef, highBid: BlockId, lowSlot: Slot - ): tuple[bid: Opt[BlockId], ok: bool] = - ## Determine the earliest block with most sync committee signatures among - ## ancestors of `highBid` with at least `lowSlot` as parent block slot. - ## Return `err` if no block with `MIN_SYNC_COMMITTEE_PARTICIPANTS` exists. - ## `bool` in result indicates whether no unexpected errors occurred. - var - maxParticipants = MIN_SYNC_COMMITTEE_PARTICIPANTS - maxBid: Opt[BlockId] - allOk = true - bid = highBid - while true: - if bid.slot <= lowSlot: - break - let parentBid = dag.existingParent(bid).valueOr: - dag.handleUnexpectedLightClientError(bid.slot) - allOk = false - break - if parentBid.slot < lowSlot: - break - let - bdata = dag.getExistingForkedBlock(bid).valueOr: - dag.handleUnexpectedLightClientError(bid.slot) - allOk = false - break - numParticipants = - withBlck(bdata): - when stateFork >= BeaconStateFork.Altair: - countOnes(blck.message.body.sync_aggregate.sync_committee_bits) - else: raiseAssert "Unreachable" - if numParticipants >= maxParticipants: - maxParticipants = numParticipants - maxBid.ok bid - bid = parentBid - (bid: maxBid, ok: allOk) - - # Determine the block in the period with highest sync committee participation - let - lowSlot = max(periodStartSlot, earliestSlot) - highSlot = min(periodEndSlot, dag.finalizedHead.blck.slot) - highBsi = dag.getExistingBlockIdAtSlot(highSlot).valueOr: - dag.handleUnexpectedLightClientError(highSlot) - return - highBid = highBsi.bid - maxParticipantsRes = dag.maxParticipantsBlock(highBid, lowSlot) - maxParticipantsBid = maxParticipantsRes.bid.valueOr: - if maxParticipantsRes.ok: # No single valid block exists in the period - dag.lcDataStore.cache.best[period] = default(altair.LightClientUpdate) - return - - # The block with highest participation may refer to a `finalized_checkpoint` - # in a different sync committee period. If that is the case, search for a - # later block with a `finalized_checkpoint` within the given sync committee - # period, despite it having a lower sync committee participation - var - tmpState = assignClone(dag.headState) - signatureBid {.noinit.}, finalizedBid {.noinit.}: BlockId - signatureBid.slot = FAR_FUTURE_SLOT - finalizedBid.slot = FAR_FUTURE_SLOT - while true: - if signatureBid.slot == FAR_FUTURE_SLOT: - signatureBid = maxParticipantsBid - else: - let - nextLowSlot = signatureBid.slot + 1 - signatureRes = dag.maxParticipantsBlock(highBid, nextLowSlot) - signatureBid = signatureRes.bid.valueOr: - signatureBid = maxParticipantsBid - break - let - attestedBid = dag.existingParent(signatureBid).valueOr: - dag.handleUnexpectedLightClientError(signatureBid.slot) - continue - finalizedEpoch = block: - dag.withUpdatedExistingState(tmpState[], attestedBid.atSlot) do: - withState(state): - when stateFork >= BeaconStateFork.Altair: - state.data.finalized_checkpoint.epoch - else: raiseAssert "Unreachable" - do: - dag.handleUnexpectedLightClientError(attestedBid.slot) - continue - finalizedSlot = finalizedEpoch.start_slot - finalizedBsi = - if finalizedSlot >= dag.tail.slot: - dag.getExistingBlockIdAtSlot(finalizedSlot).valueOr: - dag.handleUnexpectedLightClientError(finalizedSlot) - continue - else: - continue - if finalizedBid.slot >= lowSlot: - finalizedBid = finalizedBsi.bid - break - if signatureBid == maxParticipantsBid: - finalizedBid = finalizedBsi.bid # For fallback `break` at start of loop - - # Save best light client data for given period - var update {.noinit.}: altair.LightClientUpdate - let attestedBid = dag.existingParent(signatureBid).valueOr: - dag.handleUnexpectedLightClientError(signatureBid.slot) - return - dag.withUpdatedExistingState(tmpState[], attestedBid.atSlot) do: - let bdata = dag.getExistingForkedBlock(bid).valueOr: - dag.handleUnexpectedLightClientError(bid.slot) - return - withStateAndBlck(state, bdata): - when stateFork >= BeaconStateFork.Altair: - update.attested_header = blck.toBeaconBlockHeader() - update.next_sync_committee = state.data.next_sync_committee - update.next_sync_committee_branch = - state.data.build_proof(altair.NEXT_SYNC_COMMITTEE_INDEX).get - if finalizedBid.slot == FAR_FUTURE_SLOT: - update.finality_branch.reset() - else: - update.finality_branch = - state.data.build_proof(altair.FINALIZED_ROOT_INDEX).get - else: raiseAssert "Unreachable" - do: - dag.handleUnexpectedLightClientError(attestedBid.slot) - return - if finalizedBid.slot == FAR_FUTURE_SLOT or finalizedBid.slot == GENESIS_SLOT: - update.finalized_header.reset() - else: - let bdata = dag.getExistingForkedBlock(finalizedBid).valueOr: - dag.handleUnexpectedLightClientError(finalizedBid.slot) - return - withBlck(bdata): - update.finalized_header = blck.toBeaconBlockHeader() - let bdata = dag.getExistingForkedBlock(signatureBid).valueOr: - dag.handleUnexpectedLightClientError(signatureBid.slot) - return - withBlck(bdata): - when stateFork >= BeaconStateFork.Altair: - update.sync_aggregate = blck.asSigned().message.body.sync_aggregate - else: raiseAssert "Unreachable" - update.signature_slot = signatureBid.slot - dag.lcDataStore.cache.best[period] = update - -proc initLightClientDataCache*(dag: ChainDAGRef) = - ## Initialize cached light client data - if dag.lcDataStore.importMode == LightClientDataImportMode.None: - return - dag.lcDataStore.cache.importTailSlot = dag.tail.slot - if dag.lcDataStore.importMode == LightClientDataImportMode.OnlyNew: - dag.lcDataStore.cache.importTailSlot = dag.head.slot - var earliestSlot = dag.computeEarliestLightClientSlot - if dag.head.slot < earliestSlot: - return - - # Import light client data for finalized period through finalized head - let - finalizedSlot = dag.finalizedHead.slot - finalizedPeriod = finalizedSlot.sync_committee_period - dag.initLightClientBootstrapForPeriod(finalizedPeriod) - dag.initLightClientUpdateForPeriod(finalizedPeriod) - - let lightClientStartTick = Moment.now() - debug "Initializing cached light client data" - - template handleUnexpectedError(buggedBid: BlockId) = - # Light client data is expected to be available from `earliestSlot` onward. - # If there is an error, adjust `importTailSlot` to avoid failed lookups of - # cached light client data. For new blocks / states, the caches can always - # be updated incrementally, because those blocks / states are passed in - # directly. It is only historical blocks (or sync committees) that depend - # on a potentially corrupted database. - doAssert buggedBid.slot > dag.lcDataStore.cache.importTailSlot - dag.handleUnexpectedLightClientError(buggedBid.slot) - earliestSlot = dag.computeEarliestLightClientSlot - - # Build list of block to process. - # As it is slow to load states in descending order, - # build a reverse todo list to then process them in ascending order - let lowSlot = max(finalizedSlot, earliestSlot) - var - blocks = newSeqOfCap[BlockId](dag.head.slot - lowSlot + 1) - bid = dag.head.bid - while bid.slot > lowSlot: - blocks.add bid - bid = dag.existingParent(bid).valueOr: - handleUnexpectedError(bid) - break - if bid.slot == lowSlot: - blocks.add bid - - # Process blocks (reuses `dag.headState`, but restores it to the current head) - var - tmpState = assignClone(dag.headState) - tmpCache, cache: StateCache - oldCheckpoint: Checkpoint - cpIndex = 0 - for i in countdown(blocks.high, blocks.low): - bid = blocks[i] - if not dag.updateExistingState( - dag.headState, bid.atSlot, save = false, cache): - handleUnexpectedError(bid) - continue - let bdata = dag.getExistingForkedBlock(bid).valueOr: - handleUnexpectedError(bid) - continue - withStateAndBlck(dag.headState, bdata): - when stateFork >= BeaconStateFork.Altair: - # Cache light client data (non-finalized blocks may refer to this) - dag.cacheLightClientData(state, blck.toBlockId()) - - # Create `LightClientUpdate` instances - if bid.slot != lowSlot: - dag.createLightClientUpdates(state, blck, parentBid = blocks[i + 1]) - else: raiseAssert "Unreachable" - - let lightClientEndTick = Moment.now() - debug "Initialized cached light client data", - initDur = lightClientEndTick - lightClientStartTick - - # Import historic data - if dag.lcDataStore.importMode == LightClientDataImportMode.Full: - let earliestPeriod = earliestSlot.sync_committee_period - for period in earliestPeriod ..< finalizedPeriod: - dag.initLightClientBootstrapForPeriod(period) - dag.initLightClientUpdateForPeriod(period) - proc getLightClientBootstrap*( dag: ChainDAGRef, blockRoot: Eth2Digest): Opt[altair.LightClientBootstrap] = @@ -799,8 +792,7 @@ proc getLightClientBootstrap*( withBlck(bdata): let slot = blck.message.slot when stateFork >= BeaconStateFork.Altair: - let earliestSlot = dag.computeEarliestLightClientSlot - if slot < earliestSlot: + if slot < dag.lcDataStore.cache.tailSlot: debug "LC bootstrap unavailable: Block too old", slot return err() if slot > dag.finalizedHead.blck.slot: diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 339ffef0e..4d5f7a6c6 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -194,6 +194,7 @@ proc loadChainDag( onLCOptimisticUpdateCb = onLightClientOptimisticUpdateCb, lightClientDataServe = config.lightClientDataServe.get, lightClientDataImportMode = config.lightClientDataImportMode.get, + lightClientDataMaxPeriods = config.lightClientDataMaxPeriods, vanityLogs = getPandas(detectTTY(config.logStdout))) let diff --git a/beacon_chain/spec/presets.nim b/beacon_chain/spec/presets.nim index a17e9e0db..1b09d3f73 100644 --- a/beacon_chain/spec/presets.nim +++ b/beacon_chain/spec/presets.nim @@ -479,3 +479,12 @@ template name*(cfg: RuntimeConfig): string = cfg.CONFIG_NAME else: const_preset + +# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/phase0/p2p-interface.md#configuration +func MIN_EPOCHS_FOR_BLOCK_REQUESTS*(cfg: RuntimeConfig): uint64 = + cfg.MIN_VALIDATOR_WITHDRAWABILITY_DELAY + cfg.CHURN_LIMIT_QUOTIENT div 2 + +func defaultLCDataMaxPeriods*(cfg: RuntimeConfig): uint64 = + const epochsPerPeriod = EPOCHS_PER_SYNC_COMMITTEE_PERIOD + let maxEpochs = cfg.MIN_EPOCHS_FOR_BLOCK_REQUESTS + (maxEpochs + epochsPerPeriod - 1) div epochsPerPeriod