# beacon_chain # Copyright (c) 2018-2024 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. {.push raises: [].} import std/[algorithm, sequtils, tables, sets], stew/[arrayops, assign2, byteutils], chronos, metrics, results, snappy, chronicles, ../spec/[beaconstate, eth2_merkleization, eth2_ssz_serialization, helpers, state_transition, validator], ../spec/forks, ../spec/datatypes/[phase0, altair, bellatrix, capella], ".."/[beacon_chain_db, beacon_clock, era_db], "."/[block_pools_types, block_quarantine] from ../spec/datatypes/deneb import shortLog export eth2_merkleization, eth2_ssz_serialization, block_pools_types, results, beacon_chain_db logScope: topics = "chaindag" # https://github.com/ethereum/beacon-metrics/blob/master/metrics.md#interop-metrics declareGauge beacon_head_root, "Root of the head block of the beacon chain" declareGauge beacon_head_slot, "Slot of the head block of the beacon chain" # https://github.com/ethereum/beacon-metrics/blob/master/metrics.md#interop-metrics declareGauge beacon_finalized_epoch, "Current finalized epoch" # On epoch transition declareGauge beacon_finalized_root, "Current finalized root" # On epoch transition declareGauge beacon_current_justified_epoch, "Current justified epoch" # On epoch transition declareGauge beacon_current_justified_root, "Current justified root" # On epoch transition declareGauge beacon_previous_justified_epoch, "Current previously justified epoch" # On epoch transition declareGauge beacon_previous_justified_root, "Current previously justified root" # On epoch transition declareGauge beacon_reorgs_total_total, "Total occurrences of reorganizations of the chain" # On fork choice; backwards-compat name (used to be a counter) declareGauge beacon_reorgs_total, "Total occurrences of reorganizations of the chain" # Interop copy declareCounter beacon_state_data_cache_hits, "EpochRef hits" declareCounter beacon_state_data_cache_misses, "EpochRef misses" declareCounter beacon_state_rewinds, "State database rewinds" declareGauge beacon_active_validators, "Number of validators in the active validator set" declareGauge beacon_current_active_validators, "Number of validators in the active validator set" # Interop copy declareGauge beacon_pending_deposits, "Number of pending deposits (state.eth1_data.deposit_count - state.eth1_deposit_index)" # On block declareGauge beacon_processed_deposits_total, "Number of total deposits included on chain" # On block declareCounter beacon_dag_state_replay_seconds, "Time spent replaying states" const EPOCHS_PER_STATE_SNAPSHOT* = 32 ## When finality happens, we prune historical states from the database except ## for a snapshot every 32 epochs from which replays can happen - there's a ## balance here between making long replays and saving on disk space MAX_SLOTS_PER_PRUNE* = SLOTS_PER_EPOCH ## We prune the database incrementally so as not to introduce long ## processing breaks - this number is the maximum number of blocks we allow ## to be pruned every time the prune call is made (once per slot typically) ## unless head is moving faster (ie during sync) proc putBlock*( dag: ChainDAGRef, signedBlock: ForkyTrustedSignedBeaconBlock) = dag.db.putBlock(signedBlock) proc updateState*( dag: ChainDAGRef, state: var ForkedHashedBeaconState, bsi: BlockSlotId, save: bool, cache: var StateCache): bool {.gcsafe.} template withUpdatedState*( dag: ChainDAGRef, stateParam: var ForkedHashedBeaconState, bsiParam: BlockSlotId, okBody: untyped, failureBody: untyped): untyped = ## Helper template that updates stateData to a particular BlockSlot - usage of ## stateData is unsafe outside of block, or across `await` boundaries block: let bsi {.inject.} = bsiParam var cache {.inject.} = StateCache() if updateState(dag, stateParam, bsi, false, cache): template bid(): BlockId {.inject, used.} = bsi.bid template updatedState(): ForkedHashedBeaconState {.inject, used.} = stateParam okBody else: failureBody func get_effective_balances( validators: openArray[Validator], epoch: Epoch): seq[Gwei] = ## Get the balances from a state as counted for fork choice result.newSeq(validators.len) # zero-init for i in 0 ..< result.len: # All non-active validators have a 0 balance let validator = unsafeAddr validators[i] if validator[].is_active_validator(epoch) and not validator[].slashed: result[i] = validator[].effective_balance proc updateValidatorKeys*(dag: ChainDAGRef, validators: openArray[Validator]) = # Update validator key cache - must be called every time a valid block is # applied to the state - this is important to ensure that when we sync blocks # without storing a state (non-epoch blocks essentially), the deposits from # those blocks are persisted to the in-database cache of immutable validator # data (but no earlier than that the whole block as been validated) dag.db.updateImmutableValidators(validators) proc updateFinalizedBlocks*(db: BeaconChainDB, newFinalized: openArray[BlockId]) = if db.db.readOnly: return # TODO abstraction leak - where to put this? db.withManyWrites: for bid in newFinalized: db.finalizedBlocks.insert(bid.slot, bid.root) proc updateFrontfillBlocks*(dag: ChainDAGRef) = # When backfilling is done and manages to reach the frontfill point, we can # write the frontfill index knowing that the block information in the # era files match the chain if dag.db.db.readOnly: return # TODO abstraction leak - where to put this? if dag.frontfillBlocks.len == 0 or dag.backfill.slot > GENESIS_SLOT: return info "Writing frontfill index", slots = dag.frontfillBlocks.len dag.db.withManyWrites: let low = dag.db.finalizedBlocks.low.expect( "wrote at least tailRef during init") let blocks = min(low.int, dag.frontfillBlocks.len - 1) var parent: Eth2Digest for i in 0..blocks: let root = dag.frontfillBlocks[i] if not isZero(root): dag.db.finalizedBlocks.insert(Slot(i), root) dag.db.putBeaconBlockSummary( root, BeaconBlockSummary(slot: Slot(i), parent_root: parent)) parent = root reset(dag.frontfillBlocks) func validatorKey*( dag: ChainDAGRef, index: ValidatorIndex or uint64): Opt[CookedPubKey] = ## Returns the validator pubkey for the index, assuming it's been observed ## at any point in time - this function may return pubkeys for indicies that ## are not (yet) part of the head state (if the key has been observed on a ## non-head branch)! dag.db.immutableValidators.load(index) template is_merge_transition_complete*( stateParam: ForkedHashedBeaconState): bool = withState(stateParam): when consensusFork >= ConsensusFork.Bellatrix: is_merge_transition_complete(forkyState.data) else: false func effective_balances*(epochRef: EpochRef): seq[Gwei] = try: SSZ.decode(snappy.decode(epochRef.effective_balances_bytes, uint32.high), List[Gwei, Limit VALIDATOR_REGISTRY_LIMIT]).toSeq() except CatchableError as exc: raiseAssert exc.msg func getBlockRef*(dag: ChainDAGRef, root: Eth2Digest): Opt[BlockRef] = ## Retrieve a resolved block reference, if available - this function does ## not return historical finalized blocks, see `getBlockIdAtSlot` for a ## function that covers the entire known history let key = KeyedBlockRef.asLookupKey(root) # HashSet lacks the api to do check-and-get in one lookup - `[]` will return # the copy of the instance in the set which has more fields than `root` set! if key in dag.forkBlocks: try: ok(dag.forkBlocks[key].blockRef()) except KeyError: raiseAssert "contains" else: err() func getBlockIdAtSlot*( state: ForkyHashedBeaconState, slot: Slot): Opt[BlockSlotId] = ## Use given state to attempt to find a historical `BlockSlotId`. if slot > state.data.slot: return Opt.none(BlockSlotId) # State does not know about requested slot if state.data.slot > slot + SLOTS_PER_HISTORICAL_ROOT: return Opt.none(BlockSlotId) # Cache has expired var idx = slot mod SLOTS_PER_HISTORICAL_ROOT let root = if slot == state.data.slot: state.latest_block_root else: state.data.block_roots[idx] var bid = BlockId(slot: slot, root: root) let availableSlots = min(slot.uint64, slot + SLOTS_PER_HISTORICAL_ROOT - state.data.slot) for i in 0 ..< availableSlots: if idx == 0: idx = SLOTS_PER_HISTORICAL_ROOT dec idx if state.data.block_roots[idx] != root: return Opt.some BlockSlotId.init(bid, slot) dec bid.slot if bid.slot == GENESIS_SLOT: return Opt.some BlockSlotId.init(bid, slot) Opt.none(BlockSlotId) # Unknown if there are more empty slots before func getBlockIdAtSlot*(dag: ChainDAGRef, slot: Slot): Opt[BlockSlotId] = ## Retrieve the canonical block at the given slot, or the last block that ## comes before - similar to atSlot, but without the linear scan - may hit ## the database to look up early indices. if slot > dag.finalizedHead.slot: return dag.head.atSlot(slot).toBlockSlotId() # iterate to the given slot if dag.finalizedHead.blck == nil: # Not initialized yet (in init) return Opt.none(BlockSlotId) if slot >= dag.finalizedHead.blck.slot: # finalized head is still in memory return dag.finalizedHead.blck.atSlot(slot).toBlockSlotId() # Load from memory, if the block ID is sufficiently recent. # For checkpoint sync, this is the only available of historical block IDs # until sufficient blocks have been backfilled. template tryWithState(state: ForkedHashedBeaconState) = block: withState(state): # State must be a descendent of the finalized chain to be viable let finBsi = forkyState.getBlockIdAtSlot(dag.finalizedHead.slot) if finBsi.isSome and # DAG finalized bid slot wrong if CP not @ epoch finBsi.unsafeGet.bid.root == dag.finalizedHead.blck.bid.root: let bsi = forkyState.getBlockIdAtSlot(slot) if bsi.isSome: return bsi tryWithState dag.headState tryWithState dag.epochRefState tryWithState dag.clearanceState # Fallback to database, this only works for backfilled blocks let finlow = dag.db.finalizedBlocks.low.expect("at least tailRef written") if slot >= finlow: var pos = slot while true: let root = dag.db.finalizedBlocks.get(pos) if root.isSome(): return ok BlockSlotId.init( BlockId(root: root.get(), slot: pos), slot) doAssert pos > finlow, "We should have returned the finlow" pos = pos - 1 if slot == GENESIS_SLOT and dag.genesis.isSome(): return ok dag.genesis.get().atSlot() err() # not backfilled yet proc containsBlock( cfg: RuntimeConfig, db: BeaconChainDB, slot: Slot, root: Eth2Digest): bool = db.containsBlock(root, cfg.consensusForkAtEpoch(slot.epoch)) proc containsBlock*(dag: ChainDAGRef, bid: BlockId): bool = dag.cfg.containsBlock(dag.db, bid.slot, bid.root) proc getForkedBlock*(db: BeaconChainDB, root: Eth2Digest): Opt[ForkedTrustedSignedBeaconBlock] = # When we only have a digest, we don't know which fork it's from so we try # them one by one - this should be used sparingly static: doAssert high(ConsensusFork) == ConsensusFork.Electra if (let blck = db.getBlock(root, electra.TrustedSignedBeaconBlock); blck.isSome()): ok(ForkedTrustedSignedBeaconBlock.init(blck.get())) elif (let blck = db.getBlock(root, deneb.TrustedSignedBeaconBlock); blck.isSome()): ok(ForkedTrustedSignedBeaconBlock.init(blck.get())) elif (let blck = db.getBlock(root, capella.TrustedSignedBeaconBlock); blck.isSome()): ok(ForkedTrustedSignedBeaconBlock.init(blck.get())) elif (let blck = db.getBlock(root, bellatrix.TrustedSignedBeaconBlock); blck.isSome()): ok(ForkedTrustedSignedBeaconBlock.init(blck.get())) elif (let blck = db.getBlock(root, altair.TrustedSignedBeaconBlock); blck.isSome()): ok(ForkedTrustedSignedBeaconBlock.init(blck.get())) elif (let blck = db.getBlock(root, phase0.TrustedSignedBeaconBlock); blck.isSome()): ok(ForkedTrustedSignedBeaconBlock.init(blck.get())) else: err() proc getBlock*( dag: ChainDAGRef, bid: BlockId, T: type ForkyTrustedSignedBeaconBlock): Opt[T] = dag.db.getBlock(bid.root, T) or getBlock( dag.era, getStateField(dag.headState, historical_roots).asSeq, dag.headState.historical_summaries().asSeq, bid.slot, Opt[Eth2Digest].ok(bid.root), T) proc getBlockSSZ*(dag: ChainDAGRef, bid: BlockId, bytes: var seq[byte]): bool = # Load the SSZ-encoded data of a block into `bytes`, overwriting the existing # content let fork = dag.cfg.consensusForkAtEpoch(bid.slot.epoch) dag.db.getBlockSSZ(bid.root, bytes, fork) or (bid.slot <= dag.finalizedHead.slot and getBlockSSZ( dag.era, getStateField(dag.headState, historical_roots).asSeq, dag.headState.historical_summaries().asSeq, bid.slot, bytes).isOk() and bytes.len > 0) proc getBlockSZ*(dag: ChainDAGRef, bid: BlockId, bytes: var seq[byte]): bool = # Load the snappy-frame-compressed ("SZ") SSZ-encoded data of a block into # `bytes`, overwriting the existing content # careful: there are two snappy encodings in use, with and without framing! # Returns true if the block is found, false if not let fork = dag.cfg.consensusForkAtEpoch(bid.slot.epoch) dag.db.getBlockSZ(bid.root, bytes, fork) or (bid.slot <= dag.finalizedHead.slot and getBlockSZ( dag.era, getStateField(dag.headState, historical_roots).asSeq, dag.headState.historical_summaries().asSeq, bid.slot, bytes).isOk and bytes.len > 0) proc getForkedBlock*( dag: ChainDAGRef, bid: BlockId): Opt[ForkedTrustedSignedBeaconBlock] = let fork = dag.cfg.consensusForkAtEpoch(bid.slot.epoch) result.ok(ForkedTrustedSignedBeaconBlock(kind: fork)) withBlck(result.get()): type T = type(forkyBlck) forkyBlck = getBlock(dag, bid, T).valueOr: getBlock( dag.era, getStateField(dag.headState, historical_roots).asSeq, dag.headState.historical_summaries().asSeq, bid.slot, Opt[Eth2Digest].ok(bid.root), T).valueOr: result.err() return proc getBlockId*(db: BeaconChainDB, root: Eth2Digest): Opt[BlockId] = block: # We might have a summary in the database let summary = db.getBeaconBlockSummary(root) if summary.isOk(): return ok(BlockId(root: root, slot: summary.get().slot)) block: # We might have a block without having written a summary - this can happen # if there was a crash between writing the block and writing the summary, # specially in databases written by older nimbus versions let forked = db.getForkedBlock(root) if forked.isSome(): # Shouldn't happen too often but.. let blck = forked.get() summary = withBlck(blck): forkyBlck.message.toBeaconBlockSummary() debug "Writing summary", blck = shortLog(blck) db.putBeaconBlockSummary(root, summary) return ok(BlockId(root: root, slot: summary.slot)) err() proc getBlockId*(dag: ChainDAGRef, root: Eth2Digest): Opt[BlockId] = ## Look up block id by root in history - useful for turning a root into a ## slot - may hit the database, may return blocks that have since become ## unviable - use `getBlockIdAtSlot` to check that the block is still viable ## if used in a sensitive context block: # If we have a BlockRef, this is the fastest way to get a block id let blck = dag.getBlockRef(root) if blck.isOk(): return ok(blck.get().bid) dag.db.getBlockId(root) proc getForkedBlock*( dag: ChainDAGRef, root: Eth2Digest): Opt[ForkedTrustedSignedBeaconBlock] = let bid = dag.getBlockId(root) if bid.isSome(): dag.getForkedBlock(bid.get()) else: # In case we didn't have a summary - should be rare, but .. dag.db.getForkedBlock(root) func isCanonical*(dag: ChainDAGRef, bid: BlockId): bool = ## Returns `true` if the given `bid` is part of the history selected by ## `dag.head`. let current = dag.getBlockIdAtSlot(bid.slot).valueOr: return false # We don't know, so .. return current.bid == bid func isFinalized*(dag: ChainDAGRef, bid: BlockId): bool = ## Returns `true` if the given `bid` is part of the finalized history ## selected by `dag.finalizedHead`. dag.isCanonical(bid) and (bid.slot <= dag.finalizedHead.slot) func parent*(dag: ChainDAGRef, bid: BlockId): Opt[BlockId] = if bid.slot == 0: return err() if bid.slot > dag.finalizedHead.slot: # Make sure we follow the correct history as there may be forks let blck = ? dag.getBlockRef(bid.root) doAssert not isNil(blck.parent), "should reach finalized head" return ok blck.parent.bid let bids = ? dag.getBlockIdAtSlot(bid.slot - 1) ok(bids.bid) func parentOrSlot*(dag: ChainDAGRef, bsi: BlockSlotId): Opt[BlockSlotId] = if bsi.slot == 0: return err() if bsi.isProposed: let parent = ? dag.parent(bsi.bid) ok BlockSlotId.init(parent, bsi.slot) else: ok BlockSlotId.init(bsi.bid, bsi.slot - 1) func atSlot*(dag: ChainDAGRef, bid: BlockId, slot: Slot): Opt[BlockSlotId] = if bid.slot > dag.finalizedHead.slot: let blck = ? dag.getBlockRef(bid.root) if slot > dag.finalizedHead.slot: return blck.atSlot(slot).toBlockSlotId() else: # Check if the given `bid` is still part of history - it might hail from an # orphaned fork let existing = ? dag.getBlockIdAtSlot(bid.slot) if existing.bid != bid: return err() # Not part of known / relevant history if existing.slot == slot: # and bid.slot == slot return ok existing if bid.slot <= slot: ok BlockSlotId.init(bid, slot) else: dag.getBlockIdAtSlot(slot) func nextTimestamp[I, T](cache: var LRUCache[I, T]): uint32 = if cache.timestamp == uint32.high: for i in 0 ..< I: template e: untyped = cache.entries[i] if e.lastUsed != 0: e.lastUsed = 1 cache.timestamp = 1 inc cache.timestamp cache.timestamp template peekIt[I, T](cache: var LRUCache[I, T], predicate: untyped): Opt[T] = block: var res: Opt[T] for i in 0 ..< I: template e: untyped = cache.entries[i] template it: untyped {.inject, used.} = e.value if e.lastUsed != 0 and predicate: res.ok it break res template findIt[I, T](cache: var LRUCache[I, T], predicate: untyped): Opt[T] = block: var res: Opt[T] for i in 0 ..< I: template e: untyped = cache.entries[i] template it: untyped {.inject, used.} = e.value if e.lastUsed != 0 and predicate: e.lastUsed = cache.nextTimestamp res.ok it break res template delIt[I, T](cache: var LRUCache[I, T], predicate: untyped) = block: for i in 0 ..< I: template e: untyped = cache.entries[i] template it: untyped {.inject, used.} = e.value if e.lastUsed != 0 and predicate: e.reset() func put[I, T](cache: var LRUCache[I, T], value: T) = var lru = 0 block: var min = uint32.high for i in 0 ..< I: template e: untyped = cache.entries[i] if e.lastUsed < min: min = e.lastUsed lru = i if min == 0: break template e: untyped = cache.entries[lru] e.value = value e.lastUsed = cache.nextTimestamp func epochAncestor(dag: ChainDAGRef, bid: BlockId, epoch: Epoch): Opt[BlockSlotId] = ## The epoch ancestor is the last block that has an effect on the epoch- ## related state data, as updated in `process_epoch` - this block determines ## effective balances, validator addtions and removals etc and serves as a ## base for `EpochRef` construction. if epoch < dag.tail.slot.epoch or bid.slot < dag.tail.slot: # Not enough information in database to meaningfully process pre-tail epochs return Opt.none BlockSlotId let dependentSlot = if epoch == dag.tail.slot.epoch: # Use the tail as "dependent block" - this may be the genesis block, or, # in the case of checkpoint sync, the checkpoint block dag.tail.slot else: epoch.start_slot() - 1 bsi = ? dag.atSlot(bid, dependentSlot) epochSlot = if epoch == dag.tail.slot.epoch: dag.tail.slot else: epoch.start_slot() ok BlockSlotId(bid: bsi.bid, slot: epochSlot) func epochKey(dag: ChainDAGRef, bid: BlockId, epoch: Epoch): Opt[EpochKey] = ## The state transition works by storing information from blocks in a ## "working" area until the epoch transition, then batching work collected ## during the epoch. Thus, last block in the ancestor epochs is the block ## that has an impact on epoch currently considered. ## ## This function returns an epoch key pointing to that epoch boundary, i.e. the ## boundary where the last block has been applied to the state and epoch ## processing has been done. let bsi = dag.epochAncestor(bid, epoch).valueOr: return Opt.none(EpochKey) Opt.some(EpochKey(bid: bsi.bid, epoch: epoch)) func putShufflingRef*(dag: ChainDAGRef, shufflingRef: ShufflingRef) = ## Store shuffling in the cache if shufflingRef.epoch < dag.finalizedHead.slot.epoch(): # Only cache epoch information for unfinalized blocks - earlier states # are seldomly used (ie RPC), so no need to cache return dag.shufflingRefs.put shufflingRef func findShufflingRef*( dag: ChainDAGRef, bid: BlockId, epoch: Epoch): Opt[ShufflingRef] = ## Lookup a shuffling in the cache, returning `none` if it's not present - see ## `getShufflingRef` for a version that creates a new instance if it's missing let dependent_slot = epoch.attester_dependent_slot() dependent_bsi = ? dag.atSlot(bid, dependent_slot) # Check `ShufflingRef` cache let shufflingRef = dag.shufflingRefs.findIt( it.epoch == epoch and it.attester_dependent_root == dependent_bsi.bid.root) if shufflingRef.isOk: return shufflingRef # Check `EpochRef` cache let epochRef = dag.epochRefs.peekIt( it.shufflingRef.epoch == epoch and it.shufflingRef.attester_dependent_root == dependent_bsi.bid.root) if epochRef.isOk: dag.putShufflingRef(epochRef.get.shufflingRef) return ok epochRef.get.shufflingRef err() func findEpochRef*( dag: ChainDAGRef, bid: BlockId, epoch: Epoch): Opt[EpochRef] = ## Lookup an EpochRef in the cache, returning `none` if it's not present - see ## `getEpochRef` for a version that creates a new instance if it's missing let key = ? dag.epochKey(bid, epoch) dag.epochRefs.findIt(it.key == key) func putEpochRef(dag: ChainDAGRef, epochRef: EpochRef) = if epochRef.epoch < dag.finalizedHead.slot.epoch(): # Only cache epoch information for unfinalized blocks - earlier states # are seldomly used (ie RPC), so no need to cache return dag.epochRefs.put epochRef func init*( T: type ShufflingRef, state: ForkedHashedBeaconState, cache: var StateCache, epoch: Epoch): T = let attester_dependent_root = withState(state): forkyState.dependent_root(epoch.get_previous_epoch) ShufflingRef( epoch: epoch, attester_dependent_root: attester_dependent_root, shuffled_active_validator_indices: cache.get_shuffled_active_validator_indices(state, epoch), ) func init*( T: type EpochRef, dag: ChainDAGRef, state: ForkedHashedBeaconState, cache: var StateCache): T = let epoch = state.get_current_epoch() proposer_dependent_root = withState(state): forkyState.proposer_dependent_root shufflingRef = dag.findShufflingRef(state.latest_block_id, epoch).valueOr: let tmp = ShufflingRef.init(state, cache, epoch) dag.putShufflingRef(tmp) tmp total_active_balance = withState(state): get_total_active_balance(forkyState.data, cache) epochRef = EpochRef( key: dag.epochKey(state.latest_block_id, epoch).expect( "Valid epoch ancestor when processing state"), eth1_data: getStateField(state, eth1_data), eth1_deposit_index: getStateField(state, eth1_deposit_index), checkpoints: FinalityCheckpoints( justified: getStateField(state, current_justified_checkpoint), finalized: getStateField(state, finalized_checkpoint)), # beacon_proposers: Separately filled below proposer_dependent_root: proposer_dependent_root, shufflingRef: shufflingRef, total_active_balance: total_active_balance ) epochStart = epoch.start_slot() for i in 0'u64.. 0: load(epoch - 1) if dag.head != nil: # nil during init.. sigh let period = dag.head.slot.sync_committee_period if period == epoch.sync_committee_period and period notin cache.sync_committees and period > dag.cfg.ALTAIR_FORK_EPOCH.sync_committee_period(): # If the block we're aiming for shares ancestry with head, we can reuse # the cached head committee - this accounts for most "live" cases like # syncing and checking blocks since the committees rarely change let periodBsi = dag.atSlot(bid, period.start_slot) if periodBsi.isSome and periodBsi == dag.atSlot(dag.head.bid, period.start_slot): # We often end up sharing sync committees with head during sync / gossip # validation / head updates cache.sync_committees[period] = dag.headSyncCommittees func containsForkBlock*(dag: ChainDAGRef, root: Eth2Digest): bool = ## Checks for blocks at the finalized checkpoint or newer KeyedBlockRef.asLookupKey(root) in dag.forkBlocks func isFinalizedStateSnapshot(slot: Slot): bool = slot.is_epoch and slot.epoch mod EPOCHS_PER_STATE_SNAPSHOT == 0 func isStateCheckpoint(dag: ChainDAGRef, bsi: BlockSlotId): bool = ## State checkpoints are the points in time for which we store full state ## snapshots, which later serve as rewind starting points when replaying state ## transitions from database, for example during reorgs. ## # As a policy, we only store epoch boundary states without the epoch block # (if it exists) applied - the rest can be reconstructed by loading an epoch # boundary state and applying the missing blocks. # We also avoid states that were produced with empty slots only - as such, # there is only a checkpoint for the first epoch after a block. # The tail block also counts as a state checkpoint! (bsi.isProposed and bsi.bid == dag.tail) or (bsi.slot.is_epoch and bsi.slot.epoch == (bsi.bid.slot.epoch + 1)) proc getState( db: BeaconChainDB, cfg: RuntimeConfig, block_root: Eth2Digest, slot: Slot, state: var ForkedHashedBeaconState, rollback: RollbackProc): bool = let state_root = db.getStateRoot(block_root, slot).valueOr: return false db.getState(cfg.consensusForkAtEpoch(slot.epoch), state_root, state, rollback) proc containsState*( db: BeaconChainDB, cfg: RuntimeConfig, block_root: Eth2Digest, slots: Slice[Slot], legacy = true): bool = var slot = slots.b while slot >= slots.a: let state_root = db.getStateRoot(block_root, slot) if state_root.isSome() and db.containsState( cfg.consensusForkAtEpoch(slot.epoch), state_root.get(), legacy): return true if slot == slots.a: # avoid underflow at genesis break slot -= 1 false proc getState*( db: BeaconChainDB, cfg: RuntimeConfig, block_root: Eth2Digest, slots: Slice[Slot], state: var ForkedHashedBeaconState, rollback: RollbackProc): bool = var slot = slots.b while slot >= slots.a: let state_root = db.getStateRoot(block_root, slot) if state_root.isSome() and db.getState( cfg.consensusForkAtEpoch(slot.epoch), state_root.get(), state, rollback): return true if slot == slots.a: # avoid underflow at genesis break slot -= 1 false proc getState( dag: ChainDAGRef, bsi: BlockSlotId, state: var ForkedHashedBeaconState): bool = ## Load a state from the database given a block and a slot - this will first ## lookup the state root in the state root table then load the corresponding ## state, if it exists if not dag.isStateCheckpoint(bsi): return false let rollbackAddr = # Any restore point will do as long as it's not the object being updated if unsafeAddr(state) == unsafeAddr(dag.headState): unsafeAddr dag.clearanceState else: unsafeAddr dag.headState let v = addr state func rollback() = assign(v[], rollbackAddr[]) dag.db.getState(dag.cfg, bsi.bid.root, bsi.slot, state, rollback) proc getStateByParent( dag: ChainDAGRef, bid: BlockId, state: var ForkedHashedBeaconState): bool = ## Try to load the state referenced by the parent of the given `bid` - this ## state can be used to advance to the `bid` state itself. let slot = bid.slot let summary = dag.db.getBeaconBlockSummary(bid.root).valueOr: return false parentMinSlot = dag.db.getBeaconBlockSummary(summary.parent_root). map(proc(x: auto): auto = x.slot).valueOr: # in the cases that we don't have slot information, we'll search for the # state for a few back from the `bid` slot - if there are gaps of empty # slots larger than this, we will not be able to load the state using this # trick if slot.uint64 >= (EPOCHS_PER_STATE_SNAPSHOT * 2) * SLOTS_PER_EPOCH: slot - (EPOCHS_PER_STATE_SNAPSHOT * 2) * SLOTS_PER_EPOCH else: Slot(0) let rollbackAddr = # Any restore point will do as long as it's not the object being updated if unsafeAddr(state) == unsafeAddr(dag.headState): unsafeAddr dag.clearanceState else: unsafeAddr dag.headState let v = addr state func rollback() = assign(v[], rollbackAddr[]) dag.db.getState( dag.cfg, summary.parent_root, parentMinSlot..slot, state, rollback) proc getNearbyState( dag: ChainDAGRef, state: var ForkedHashedBeaconState, bid: BlockId, lowSlot: Slot): Opt[void] = ## Load state from DB that is close to `bid` and has at least slot `lowSlot`. var e = bid.slot.epoch b = bid while true: let stateSlot = e.start_slot if stateSlot < lowSlot: return err() b = (? dag.atSlot(b, max(stateSlot, 1.Slot) - 1)).bid let bsi = BlockSlotId.init(b, stateSlot) if not dag.getState(bsi, state): if e == GENESIS_EPOCH: return err() dec e continue return ok() proc currentSyncCommitteeForPeriod*( dag: ChainDAGRef, tmpState: var ForkedHashedBeaconState, period: SyncCommitteePeriod): Opt[SyncCommittee] = ## Fetch a `SyncCommittee` for a given sync committee period. ## For non-finalized periods, follow the chain as selected by fork choice. let lowSlot = max(dag.tail.slot, dag.cfg.ALTAIR_FORK_EPOCH.start_slot) if period < lowSlot.sync_committee_period: return err() let periodStartSlot = period.start_slot syncCommitteeSlot = max(periodStartSlot, lowSlot) bsi = ? dag.getBlockIdAtSlot(syncCommitteeSlot) dag.withUpdatedState(tmpState, bsi) do: withState(updatedState): when consensusFork >= ConsensusFork.Altair: ok forkyState.data.current_sync_committee else: err() do: err() proc getBlockIdAtSlot*( dag: ChainDAGRef, state: ForkyHashedBeaconState, slot: Slot): Opt[BlockId] = if slot >= state.data.slot: Opt.some state.latest_block_id elif state.data.slot <= slot + SLOTS_PER_HISTORICAL_ROOT: dag.getBlockId(state.data.get_block_root_at_slot(slot)) else: Opt.none(BlockId) proc updateBeaconMetrics( state: ForkedHashedBeaconState, bid: BlockId, cache: var StateCache) = # https://github.com/ethereum/beacon-metrics/blob/master/metrics.md#additional-metrics # both non-negative, so difference can't overflow or underflow int64 beacon_head_root.set(bid.root.toGaugeValue) beacon_head_slot.set(bid.slot.toGaugeValue) withState(state): beacon_pending_deposits.set( (forkyState.data.eth1_data.deposit_count - forkyState.data.eth1_deposit_index).toGaugeValue) beacon_processed_deposits_total.set( forkyState.data.eth1_deposit_index.toGaugeValue) beacon_current_justified_epoch.set( forkyState.data.current_justified_checkpoint.epoch.toGaugeValue) beacon_current_justified_root.set( forkyState.data.current_justified_checkpoint.root.toGaugeValue) beacon_previous_justified_epoch.set( forkyState.data.previous_justified_checkpoint.epoch.toGaugeValue) beacon_previous_justified_root.set( forkyState.data.previous_justified_checkpoint.root.toGaugeValue) beacon_finalized_epoch.set( forkyState.data.finalized_checkpoint.epoch.toGaugeValue) beacon_finalized_root.set( forkyState.data.finalized_checkpoint.root.toGaugeValue) let active_validators = count_active_validators( forkyState.data, forkyState.data.slot.epoch, cache).toGaugeValue beacon_active_validators.set(active_validators) beacon_current_active_validators.set(active_validators) import blockchain_dag_light_client export blockchain_dag_light_client.getLightClientBootstrap, blockchain_dag_light_client.getLightClientUpdateForPeriod, blockchain_dag_light_client.getLightClientFinalityUpdate, blockchain_dag_light_client.getLightClientOptimisticUpdate proc putState(dag: ChainDAGRef, state: ForkedHashedBeaconState, bid: BlockId) = # Store a state and its root let slot = getStateField(state, slot) logScope: blck = shortLog(bid) stateSlot = shortLog(slot) stateRoot = shortLog(getStateRoot(state)) if not dag.isStateCheckpoint(BlockSlotId.init(bid, slot)): return # Don't consider legacy tables here, they are slow to read so we'll want to # rewrite things in the new table anyway. if dag.db.containsState( dag.cfg.consensusForkAtEpoch(slot.epoch), getStateRoot(state), legacy = false): return let startTick = Moment.now() # Ideally we would save the state and the root lookup cache in a single # transaction to prevent database inconsistencies, but the state loading code # is resilient against one or the other going missing withState(state): dag.db.putState(forkyState) debug "Stored state", putStateDur = Moment.now() - startTick proc advanceSlots*( dag: ChainDAGRef, state: var ForkedHashedBeaconState, slot: Slot, save: bool, cache: var StateCache, info: var ForkedEpochInfo) = # Given a state, advance it zero or more slots by applying empty slot # processing - the state must be positioned at or before `slot` doAssert getStateField(state, slot) <= slot let stateBid = state.latest_block_id while getStateField(state, slot) < slot: let preEpoch = getStateField(state, slot).epoch loadStateCache(dag, cache, stateBid, getStateField(state, slot).epoch) process_slots( dag.cfg, state, getStateField(state, slot) + 1, cache, info, dag.updateFlags).expect("process_slots shouldn't fail when state slot is correct") if save: dag.putState(state, stateBid) # The reward information in the state transition is computed for epoch # transitions - when transitioning into epoch N, the activities in epoch # N-2 are translated into balance updates, and this is what we capture # in the monitor. This may be inaccurate during a deep reorg (>1 epoch) # which is an acceptable tradeoff for monitoring. withState(state): let postEpoch = forkyState.data.slot.epoch if preEpoch != postEpoch and postEpoch >= 2: var proposers: array[SLOTS_PER_EPOCH, Opt[ValidatorIndex]] let epochRef = dag.findEpochRef(stateBid, postEpoch - 2) if epochRef.isSome(): proposers = epochRef[][].beacon_proposers dag.validatorMonitor[].registerEpochInfo( forkyState.data, proposers, info) proc applyBlock( dag: ChainDAGRef, state: var ForkedHashedBeaconState, bid: BlockId, cache: var StateCache, info: var ForkedEpochInfo): Result[void, cstring] = loadStateCache(dag, cache, bid, getStateField(state, slot).epoch) discard case dag.cfg.consensusForkAtEpoch(bid.slot.epoch) of ConsensusFork.Phase0: let data = getBlock(dag, bid, phase0.TrustedSignedBeaconBlock).valueOr: return err("Block load failed") ? state_transition( dag.cfg, state, data, cache, info, dag.updateFlags + {slotProcessed}, noRollback) of ConsensusFork.Altair: let data = getBlock(dag, bid, altair.TrustedSignedBeaconBlock).valueOr: return err("Block load failed") ? state_transition( dag.cfg, state, data, cache, info, dag.updateFlags + {slotProcessed}, noRollback) of ConsensusFork.Bellatrix: let data = getBlock(dag, bid, bellatrix.TrustedSignedBeaconBlock).valueOr: return err("Block load failed") ? state_transition( dag.cfg, state, data, cache, info, dag.updateFlags + {slotProcessed}, noRollback) of ConsensusFork.Capella: let data = getBlock(dag, bid, capella.TrustedSignedBeaconBlock).valueOr: return err("Block load failed") ? state_transition( dag.cfg, state, data, cache, info, dag.updateFlags + {slotProcessed}, noRollback) of ConsensusFork.Deneb: let data = getBlock(dag, bid, deneb.TrustedSignedBeaconBlock).valueOr: return err("Block load failed") ? state_transition( dag.cfg, state, data, cache, info, dag.updateFlags + {slotProcessed}, noRollback) of ConsensusFork.Electra: let data = getBlock(dag, bid, electra.TrustedSignedBeaconBlock).valueOr: return err("Block load failed") ? state_transition( dag.cfg, state, data, cache, info, dag.updateFlags + {slotProcessed}, noRollback) ok() proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, validatorMonitor: ref ValidatorMonitor, updateFlags: UpdateFlags, eraPath = ".", onBlockCb: OnBlockCallback = nil, onHeadCb: OnHeadCallback = nil, onReorgCb: OnReorgCallback = nil, onFinCb: OnFinalizedCallback = nil, vanityLogs = default(VanityLogs), lcDataConfig = default(LightClientDataConfig)): ChainDAGRef = cfg.checkForkConsistency() doAssert updateFlags - {strictVerification} == {}, "Other flags not supported in ChainDAG" # TODO we require that the db contains both a head and a tail block - # asserting here doesn't seem like the right way to go about it however.. # Tail is the first block for which we can construct a state - either # genesis or a checkpoint let startTick = Moment.now() genesisRoot = db.getGenesisBlock() tailRoot = db.getTailBlock().expect( "preInit should have initialized the database with a tail block root") tail = db.getBlockId(tailRoot).expect( "tail block summary in database, database corrupt?") headRoot = db.getHeadBlock().expect("head root, database corrupt?") head = db.getBlockId(headRoot).expect("head block id, database corrupt?") # Have to be careful with this instance, it is not yet fully initialized so # as to avoid having to allocate a separate "init" state dag = ChainDAGRef( db: db, validatorMonitor: validatorMonitor, genesis: genesisRoot.map( proc(x: auto): auto = BlockId(root: x, slot: GENESIS_SLOT)), tail: tail, # The only allowed flag right now is strictVerification, as the others all # allow skipping some validation. updateFlags: updateFlags * {strictVerification}, cfg: cfg, vanityLogs: vanityLogs, lcDataStore: initLightClientDataStore( lcDataConfig, cfg, db.getLightClientDataDB()), onBlockAdded: onBlockCb, onHeadChanged: onHeadCb, onReorgHappened: onReorgCb, onFinHappened: onFinCb, ) loadTick = Moment.now() var headRef, curRef: BlockRef # When starting from a checkpoint with an empty block, we'll store the state # "ahead" of the head slot - this slot would be considered finalized slot = max(head.slot, (tail.slot.epoch + 1).start_slot) # To know the finalized checkpoint of the head, we need to recreate its # state - the tail is implicitly finalized, and if we have a finalized block # table, that provides another hint finalizedSlot = db.finalizedBlocks.high.get(tail.slot) cache: StateCache foundHeadState = false headBlocks: seq[BlockRef] # Load head -> finalized, or all summaries in case the finalized block table # hasn't been written yet for blck in db.getAncestorSummaries(head.root): # The execution block root gets filled in as needed. Nonfinalized Bellatrix # and later blocks are loaded as optimistic, which gets adjusted that first # `VALID` fcU from an EL plus markBlockVerified. Pre-merge blocks still get # marked as `VALID`. let newRef = BlockRef.init( blck.root, Opt.none Eth2Digest, executionValid = false, blck.summary.slot) if headRef == nil: headRef = newRef if curRef != nil: link(newRef, curRef) curRef = newRef dag.forkBlocks.incl(KeyedBlockRef.init(curRef)) if not foundHeadState: foundHeadState = db.getState( cfg, blck.root, blck.summary.slot..slot, dag.headState, noRollback) slot = blck.summary.slot if not foundHeadState: # When the database has been written with a pre-fork version of the # software, it may happen that blocks produced using an "unforked" # chain get written to the database - we need to skip such blocks # when loading the database with a fork-compatible version if containsBlock(cfg, db, curRef.slot, curRef.root): headBlocks.add curRef else: if headBlocks.len > 0: fatal "Missing block needed to create head state, database corrupt?", curRef = shortLog(curRef) quit 1 # Without the block data we can't form a state for this root, so # we'll need to move the head back headRef = nil dag.forkBlocks.excl(KeyedBlockRef.init(curRef)) if curRef.slot <= finalizedSlot: # Only non-finalized slots get a `BlockRef` break let summariesTick = Moment.now() if not foundHeadState: if not dag.getStateByParent(curRef.bid, dag.headState): fatal "Could not load head state, database corrupt?", head = shortLog(head), tail = shortLog(dag.tail) quit 1 block: # EpochRef needs an epoch boundary state assign(dag.epochRefState, dag.headState) var info: ForkedEpochInfo while headBlocks.len > 0: dag.applyBlock( dag.headState, headBlocks.pop().bid, cache, info).expect("head blocks should apply") dag.head = headRef dag.heads = @[headRef] withState(dag.headState): when consensusFork >= ConsensusFork.Altair: dag.headSyncCommittees = forkyState.data.get_sync_committee_cache(cache) assign(dag.clearanceState, dag.headState) if dag.headState.latest_block_root == tail.root: # In case we started from a checkpoint with an empty slot finalizedSlot = getStateField(dag.headState, slot) finalizedSlot = max( finalizedSlot, getStateField(dag.headState, finalized_checkpoint).epoch.start_slot) let configFork = case dag.headState.kind of ConsensusFork.Phase0: genesisFork(cfg) of ConsensusFork.Altair: altairFork(cfg) of ConsensusFork.Bellatrix: bellatrixFork(cfg) of ConsensusFork.Capella: capellaFork(cfg) of ConsensusFork.Deneb: denebFork(cfg) of ConsensusFork.Electra: electraFork(cfg) stateFork = getStateField(dag.headState, fork) # Here, we check only the `current_version` field because the spec # mandates that testnets starting directly from a particular fork # should have `previous_version` set to `current_version` while # this doesn't happen to be the case in network that go through # regular hard-fork upgrades. See for example: # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.4/specs/bellatrix/beacon-chain.md#testing if stateFork.current_version != configFork.current_version: error "State from database does not match network, check --network parameter", tail = dag.tail, headRef, stateFork, configFork quit 1 # Need to load state to find genesis validators root, before loading era db dag.era = EraDB.new( cfg, eraPath, getStateField(dag.headState, genesis_validators_root)) # We used an interim finalizedHead while loading the head state above - now # that we have loaded the dag up to the finalized slot, we can also set # finalizedHead to its real value dag.finalizedHead = headRef.atSlot(finalizedSlot) dag.lastPrunePoint = dag.finalizedHead.toBlockSlotId().expect("not nil") doAssert dag.finalizedHead.blck != nil, "The finalized head should exist at the slot" block: # Top up finalized blocks if db.finalizedBlocks.high.isNone or db.finalizedBlocks.high.get() < dag.finalizedHead.blck.slot: # Versions prior to 1.7.0 did not store finalized blocks in the # database, and / or the application might have crashed between the head # and finalized blocks updates. info "Loading finalized blocks", finHigh = db.finalizedBlocks.high, finalizedHead = shortLog(dag.finalizedHead) var newFinalized: seq[BlockId] tmp = dag.finalizedHead.blck while tmp.parent != nil: newFinalized.add(tmp.bid) let p = tmp.parent tmp.parent = nil tmp = p for blck in db.getAncestorSummaries(tmp.root): if db.finalizedBlocks.high.isSome and blck.summary.slot <= db.finalizedBlocks.high.get: break newFinalized.add(BlockId(slot: blck.summary.slot, root: blck.root)) db.updateFinalizedBlocks(newFinalized) doAssert dag.finalizedHead.blck.parent == nil, "The finalized head is the last BlockRef with a parent" block: let finalized = db.finalizedBlocks.get(db.finalizedBlocks.high.get()).expect( "tail at least") if finalized != dag.finalizedHead.blck.root: error "Head does not lead to finalized block, database corrupt?", head = shortLog(head), finalizedHead = shortLog(dag.finalizedHead), tail = shortLog(dag.tail), finalized = shortLog(finalized) quit 1 dag.backfill = block: let backfillSlot = db.finalizedBlocks.low.expect("tail at least") if backfillSlot <= dag.horizon: # Backfill done, no need to load anything BeaconBlockSummary() elif backfillSlot < dag.tail.slot: let backfillRoot = db.finalizedBlocks.get(backfillSlot).expect( "low to be loadable") db.getBeaconBlockSummary(backfillRoot).expect( "Backfill block must have a summary: " & $backfillRoot) elif dag.containsBlock(dag.tail): db.getBeaconBlockSummary(dag.tail.root).expect( "Tail block must have a summary: " & $dag.tail.root) else: # Checkpoint sync, checkpoint block unavailable BeaconBlockSummary( slot: dag.tail.slot + 1, parent_root: dag.tail.root) dag.forkDigests = newClone ForkDigests.init( cfg, getStateField(dag.headState, genesis_validators_root)) withState(dag.headState): dag.validatorMonitor[].registerState(forkyState.data) updateBeaconMetrics(dag.headState, dag.head.bid, cache) let finalizedTick = Moment.now() if dag.backfill.slot > GENESIS_SLOT: # Try frontfill from era files let backfillSlot = dag.backfill.slot - 1 dag.frontfillBlocks = newSeqOfCap[Eth2Digest](backfillSlot.int) let historical_roots = getStateField(dag.headState, historical_roots).asSeq() historical_summaries = dag.headState.historical_summaries.asSeq() var blocks = 0 # Here, we'll build up the slot->root mapping in memory for the range of # blocks from genesis to backfill, if possible. for bid in dag.era.getBlockIds( historical_roots, historical_summaries, Slot(0), Eth2Digest()): # If backfill has not yet started, the backfill slot itself also needs # to be served from era files. Checkpoint sync starts from state only if bid.slot > backfillSlot or (bid.slot == backfillSlot and bid.root != dag.tail.root): # If we end up in here, we failed the root comparison just below in # an earlier iteration fatal "Era summaries don't lead up to backfill, database or era files corrupt?", bid, backfillSlot quit 1 # In BeaconState.block_roots, empty slots are filled with the root of # the previous block - in our data structure, we use a zero hash instead dag.frontfillBlocks.setLen(bid.slot.int + 1) dag.frontfillBlocks[bid.slot.int] = bid.root if bid.root == dag.backfill.parent_root: # We've reached the backfill point, meaning blocks are available # in the sqlite database from here onwards - remember this point in # time so that we can write summaries to the database - it's a lot # faster to load from database than to iterate over era files with # the current naive era file reader. reset(dag.backfill) dag.updateFrontfillBlocks() break blocks += 1 if blocks > 0: info "Front-filled blocks from era files", blocks, backfillSlot let frontfillTick = Moment.now() # Fill validator key cache in case we're loading an old database that doesn't # have a cache dag.updateValidatorKeys(getStateField(dag.headState, validators).asSeq()) # Initialize pruning such that when starting with a database that hasn't been # pruned, we work our way from the tail to the horizon in incremental steps dag.lastHistoryPruneHorizon = dag.horizon() dag.lastHistoryPruneBlockHorizon = block: let boundary = min(dag.tail.slot, dag.horizon()) if boundary.epoch() >= EPOCHS_PER_STATE_SNAPSHOT: start_slot(boundary.epoch() - EPOCHS_PER_STATE_SNAPSHOT) else: Slot(0) info "Block DAG initialized", head = shortLog(dag.head), finalizedHead = shortLog(dag.finalizedHead), tail = shortLog(dag.tail), backfill = shortLog(dag.backfill), loadDur = loadTick - startTick, summariesDur = summariesTick - loadTick, finalizedDur = finalizedTick - summariesTick, frontfillDur = frontfillTick - finalizedTick, keysDur = Moment.now() - frontfillTick dag.initLightClientDataCache() dag template genesis_validators_root*(dag: ChainDAGRef): Eth2Digest = getStateField(dag.headState, genesis_validators_root) proc genesisBlockRoot*(dag: ChainDAGRef): Eth2Digest = dag.db.getGenesisBlock().expect("DB must be initialized with genesis block") func getEpochRef*( dag: ChainDAGRef, state: ForkedHashedBeaconState, cache: var StateCache): EpochRef = ## Get a cached `EpochRef` or construct one based on the given state - always ## returns an EpochRef instance let bid = state.latest_block_id epoch = state.get_current_epoch() dag.findEpochRef(bid, epoch).valueOr: let res = EpochRef.init(dag, state, cache) dag.putEpochRef(res) res proc getEpochRef*( dag: ChainDAGRef, bid: BlockId, epoch: Epoch, preFinalized: bool): Result[EpochRef, cstring] = ## Return a cached EpochRef or construct one from the database, if possible - ## returns `none` on failure. ## ## When `preFinalized` is true, include epochs from before the finalized ## checkpoint in the search - this potentially can result in long processing ## times due to state replays. ## ## Requests for epochs >= dag.finalizedHead.slot.epoch always return an ## instance. One must be careful to avoid race conditions in `async` code ## where the finalized head might change during an `await`. ## ## Requests for epochs < dag.finalizedHead.slot.epoch may fail, either because ## the search was limited by the `preFinalized` flag or because state history ## has been pruned - `none` will be returned in this case. if not preFinalized and epoch < dag.finalizedHead.slot.epoch: return err("Requesting pre-finalized EpochRef") if bid.slot < dag.tail.slot or epoch < dag.tail.slot.epoch: return err("Requesting EpochRef for pruned state") let epochRef = dag.findEpochRef(bid, epoch) if epochRef.isOk(): beacon_state_data_cache_hits.inc return ok epochRef.get() beacon_state_data_cache_misses.inc let ancestor = dag.epochAncestor(bid, epoch).valueOr: # If we got in here, the bid must be unknown or we would have gotten # _some_ ancestor (like the tail) return err("Requesting EpochRef for non-canonical block") var cache: StateCache if not updateState(dag, dag.epochRefState, ancestor, false, cache): return err("Could not load requested state") ok(dag.getEpochRef(dag.epochRefState, cache)) proc getEpochRef*( dag: ChainDAGRef, blck: BlockRef, epoch: Epoch, preFinalized: bool): Result[EpochRef, cstring] = dag.getEpochRef(blck.bid, epoch, preFinalized) proc getFinalizedEpochRef*(dag: ChainDAGRef): EpochRef = dag.getEpochRef( dag.finalizedHead.blck, dag.finalizedHead.slot.epoch, false).expect( "getEpochRef for finalized head should always succeed") proc ancestorSlot*( dag: ChainDAGRef, state: ForkyHashedBeaconState, bid: BlockId, lowSlot: Slot): Opt[Slot] = ## Return common ancestor slot of `bid` and `state`, if at least `lowSlot`. ## Return `none` if no common ancestor is found with slot >= `lowSlot`. if state.data.slot < lowSlot or bid.slot < lowSlot: return Opt.none(Slot) var stateBid = ? dag.getBlockIdAtSlot(state, bid.slot) if stateBid.slot < lowSlot: return Opt.none(Slot) var blockBid = (? dag.atSlot(bid, stateBid.slot)).bid if blockBid.slot < lowSlot: return Opt.none(Slot) while stateBid != blockBid: if stateBid.slot >= blockBid.slot: stateBid = ? dag.getBlockIdAtSlot( state, min(blockBid.slot, stateBid.slot - 1)) if stateBid.slot < lowSlot: return Opt.none(Slot) else: blockBid = ? dag.parent(blockBid) if blockBid.slot < lowSlot: return Opt.none(Slot) Opt.some stateBid.slot proc computeRandaoMix( bdata: ForkedTrustedSignedBeaconBlock): Opt[Eth2Digest] = ## Compute the requested RANDAO mix for `bdata` without `state`, if possible. withBlck(bdata): when consensusFork >= ConsensusFork.Bellatrix: if forkyBlck.message.is_execution_block: var mix = eth2digest(forkyBlck.message.body.randao_reveal.toRaw()) mix.data.mxor forkyBlck.message.body.execution_payload.prev_randao.data return ok mix Opt.none(Eth2Digest) proc computeRandaoMix*( dag: ChainDAGRef, state: ForkyHashedBeaconState, bid: BlockId, lowSlot: Slot): Opt[Eth2Digest] = ## Compute the requested RANDAO mix for `bid` based on `state`. ## Return `none` if `state` and `bid` do not share a common ancestor ## with slot >= `lowSlot`. let ancestorSlot = ? dag.ancestorSlot(state, bid, lowSlot) doAssert ancestorSlot <= state.data.slot doAssert ancestorSlot <= bid.slot # If `blck` is post merge, RANDAO information is immediately available let bdata = ? dag.getForkedBlock(bid) fullMix = computeRandaoMix(bdata) if fullMix.isSome: return fullMix # RANDAO mix has to be recomputed from `bid` and `state` var mix {.noinit.}: Eth2Digest proc mixToAncestor(highBid: BlockId): Opt[void] = ## Mix in/out RANDAO reveals back to `ancestorSlot` var bid = highBid while bid.slot > ancestorSlot: let bdata = ? dag.getForkedBlock(bid) withBlck(bdata): # See `process_randao` / `process_randao_mixes_reset` mix.data.mxor eth2digest( forkyBlck.message.body.randao_reveal.toRaw()).data bid = ? dag.parent(bid) ok() # Mix in RANDAO from `bid` if ancestorSlot < bid.slot: withBlck(bdata): mix = eth2digest(forkyBlck.message.body.randao_reveal.toRaw()) ? mixToAncestor(? dag.parent(bid)) else: mix.reset() # Mix in RANDAO from `state` let ancestorEpoch = ancestorSlot.epoch if ancestorEpoch + EPOCHS_PER_HISTORICAL_VECTOR <= state.data.slot.epoch: return Opt.none(Eth2Digest) let mixRoot = state.dependent_root(ancestorEpoch + 1) if mixRoot.isZero: return Opt.none(Eth2Digest) ? mixToAncestor(? dag.getBlockId(mixRoot)) mix.data.mxor state.data.get_randao_mix(ancestorEpoch).data ok mix proc computeRandaoMixFromMemory*( dag: ChainDAGRef, bid: BlockId, lowSlot: Slot): Opt[Eth2Digest] = ## Compute requested RANDAO mix for `bid` from available states (~5 ms). template tryWithState(state: ForkedHashedBeaconState) = block: withState(state): let mix = dag.computeRandaoMix(forkyState, bid, lowSlot) if mix.isSome: return mix tryWithState dag.headState tryWithState dag.epochRefState tryWithState dag.clearanceState proc computeRandaoMixFromDatabase*( dag: ChainDAGRef, bid: BlockId, lowSlot: Slot): Opt[Eth2Digest] = ## Compute requested RANDAO mix for `bid` using closest DB state (~500 ms). let state = assignClone(dag.headState) ? dag.getNearbyState(state[], bid, lowSlot) withState(state[]): dag.computeRandaoMix(forkyState, bid, lowSlot) proc computeRandaoMix( dag: ChainDAGRef, bid: BlockId, lowSlot: Slot): Opt[Eth2Digest] = # Try to compute from states available in memory let mix = dag.computeRandaoMixFromMemory(bid, lowSlot) if mix.isSome: return mix # If `blck` is post merge, RANDAO information is immediately available let bdata = ? dag.getForkedBlock(bid) fullMix = computeRandaoMix(bdata) if fullMix.isSome: return fullMix # Fall back to database dag.computeRandaoMixFromDatabase(bid, lowSlot) proc computeRandaoMix*(dag: ChainDAGRef, bid: BlockId): Opt[Eth2Digest] = ## Compute requested RANDAO mix for `bid`. const maxSlotDistance = SLOTS_PER_HISTORICAL_ROOT let lowSlot = max(bid.slot, maxSlotDistance.Slot) - maxSlotDistance dag.computeRandaoMix(bid, lowSlot) proc lowSlotForAttesterShuffling*(epoch: Epoch): Slot = ## Return minimum slot that a state must share ancestry with a block history ## so that RANDAO at `epoch.attester_dependent_slot` can be computed. # A state must be somewhat recent so that `get_active_validator_indices` # for the queried `epoch` cannot be affected by any such skipped processing. const numDelayEpochs = compute_activation_exit_epoch(GENESIS_EPOCH).uint64 let lowEpoch = max(epoch, (numDelayEpochs - 1).Epoch) - (numDelayEpochs - 1) lowEpoch.start_slot proc computeShufflingRef*( dag: ChainDAGRef, state: ForkyHashedBeaconState, blck: BlockRef, epoch: Epoch): Opt[ShufflingRef] = ## Compute `ShufflingRef` for `blck@epoch` based on `state`. ## If `state` has unviable `get_active_validator_indices`, return `none`. let dependentBid = (? dag.atSlot(blck.bid, epoch.attester_dependent_slot)).bid lowSlot = epoch.lowSlotForAttesterShuffling mix = ? dag.computeRandaoMix(state, dependentBid, lowSlot) return ok ShufflingRef( epoch: epoch, attester_dependent_root: dependentBid.root, shuffled_active_validator_indices: state.data.get_shuffled_active_validator_indices(epoch, mix)) proc computeShufflingRefFromMemory*( dag: ChainDAGRef, blck: BlockRef, epoch: Epoch): Opt[ShufflingRef] = ## Compute `ShufflingRef` from available states (~5 ms). template tryWithState(state: ForkedHashedBeaconState) = block: withState(state): let shufflingRef = dag.computeShufflingRef(forkyState, blck, epoch) if shufflingRef.isOk: return shufflingRef tryWithState dag.headState tryWithState dag.epochRefState tryWithState dag.clearanceState proc getShufflingRef*( dag: ChainDAGRef, blck: BlockRef, epoch: Epoch, preFinalized: bool): Opt[ShufflingRef] = ## Return the shuffling in the given history and epoch - this potentially is ## faster than returning a full EpochRef because the shuffling is determined ## an epoch in advance and therefore is less sensitive to reorgs var shufflingRef = dag.findShufflingRef(blck.bid, epoch) if shufflingRef.isSome: return shufflingRef # Use existing states to quickly compute the shuffling shufflingRef = dag.computeShufflingRefFromMemory(blck, epoch) if shufflingRef.isSome: dag.putShufflingRef(shufflingRef.get) return shufflingRef # Last resort, this can take several seconds as this may replay states let epochRef = dag.getEpochRef(blck, epoch, preFinalized).valueOr: return Opt.none ShufflingRef dag.putShufflingRef(epochRef.shufflingRef) Opt.some epochRef.shufflingRef func stateCheckpoint*(dag: ChainDAGRef, bsi: BlockSlotId): BlockSlotId = ## The first ancestor BlockSlot that is a state checkpoint var bsi = bsi while not dag.isStateCheckpoint(bsi): if bsi.isProposed: bsi.bid = dag.parent(bsi.bid).valueOr: break else: bsi.slot = bsi.slot - 1 bsi template forkAtEpoch*(dag: ChainDAGRef, epoch: Epoch): Fork = forkAtEpoch(dag.cfg, epoch) proc getBlockRange*( dag: ChainDAGRef, startSlot: Slot, skipStep: uint64, output: var openArray[BlockId]): Natural = ## This function populates an `output` buffer of blocks ## with a slots ranging from `startSlot` up to, but not including, ## `startSlot + skipStep * output.len`, skipping any slots that don't have ## a block. ## ## Blocks will be written to `output` from the end without gaps, even if ## a block is missing in a particular slot. The return value shows how ## many slots were missing blocks - to iterate over the result, start ## at this index. ## ## If there were no blocks in the range, `output.len` will be returned. let requestedCount = output.lenu64 headSlot = dag.head.slot trace "getBlockRange entered", head = shortLog(dag.head.root), requestedCount, startSlot, skipStep, headSlot if startSlot < dag.backfill.slot: debug "Got request for pre-backfill slot", startSlot, backfillSlot = dag.backfill.slot, horizonSlot = dag.horizon return output.len if headSlot <= startSlot or requestedCount == 0: return output.len # Identical to returning an empty set of block as indicated above let runway = uint64(headSlot - startSlot) # This is the number of blocks that will follow the start block extraSlots = min(runway div skipStep, requestedCount - 1) # If `skipStep` is very large, `extraSlots` should be 0 from # the previous line, so `endSlot` will be equal to `startSlot`: endSlot = startSlot + extraSlots * skipStep var curSlot = endSlot o = output.len # Process all blocks that follow the start block (may be zero blocks) while curSlot > startSlot: let bs = dag.getBlockIdAtSlot(curSlot) if bs.isSome and bs.get().isProposed(): o -= 1 output[o] = bs.get().bid curSlot -= skipStep # Handle start slot separately (to avoid underflow when computing curSlot) let bs = dag.getBlockIdAtSlot(startSlot) if bs.isSome and bs.get().isProposed(): o -= 1 output[o] = bs.get().bid o # Return the index of the first non-nil item in the output proc updateState*( dag: ChainDAGRef, state: var ForkedHashedBeaconState, bsi: BlockSlotId, save: bool, cache: var StateCache): bool = ## Rewind or advance state such that it matches the given block and slot - ## this may include replaying from an earlier snapshot if blck is on a ## different branch or has advanced to a higher slot number than slot ## If `bs.slot` is higher than `bs.blck.slot`, `updateState` will fill in ## with empty/non-block slots # First, see if we're already at the requested block. If we are, also check # that the state has not been advanced past the desired block - if it has, # an earlier state must be loaded since there's no way to undo the slot # transitions let startTick = Moment.now() current {.used.} = withState(state): BlockSlotId.init(forkyState.latest_block_id, forkyState.data.slot) var ancestors: seq[BlockId] found = false template exactMatch(state: ForkedHashedBeaconState, bsi: BlockSlotId): bool = # The block is the same and we're at an early enough slot - the state can # be used to arrive at the desired blockslot state.matches_block_slot(bsi.bid.root, bsi.slot) template canAdvance(state: ForkedHashedBeaconState, bsi: BlockSlotId): bool = # The block is the same and we're at an early enough slot - the state can # be used to arrive at the desired blockslot state.can_advance_slots(bsi.bid.root, bsi.slot) # Fast path: check all caches for an exact match - this is faster than # advancing a state where there's epoch processing to do, by a wide margin - # it also avoids `hash_tree_root` for slot processing if exactMatch(state, bsi): found = true elif not save: # When required to save states, we cannot rely on the caches because that # would skip the extra processing that save does - not all information that # goes into the database is cached if exactMatch(dag.headState, bsi): assign(state, dag.headState) found = true elif exactMatch(dag.clearanceState, bsi): assign(state, dag.clearanceState) found = true elif exactMatch(dag.epochRefState, bsi): assign(state, dag.epochRefState) found = true const RewindBlockThreshold = 64 if not found: # No exact match found - see if any in-memory state can be used as a base # onto which we can apply a few blocks - there's a tradeoff here between # loading the state from disk and performing the block applications var cur = bsi while ancestors.len < RewindBlockThreshold: if isZero(cur.bid.root): # tail reached break if canAdvance(state, cur): # Typical case / fast path when there's no reorg found = true break if not save: # see above if canAdvance(dag.headState, cur): assign(state, dag.headState) found = true break if canAdvance(dag.clearanceState, cur): assign(state, dag.clearanceState) found = true break if canAdvance(dag.epochRefState, cur): assign(state, dag.epochRefState) found = true break if cur.isProposed(): # This is not an empty slot, so the block will need to be applied to # eventually reach bs ancestors.add(cur.bid) # Move slot by slot to capture epoch boundary states cur = dag.parentOrSlot(cur).valueOr: break if not found: debug "UpdateStateData cache miss", current = shortLog(current), target = shortLog(bsi) # Either the state is too new or was created by applying a different block. # We'll now resort to loading the state from the database then reapplying # blocks until we reach the desired point in time. var cur = bsi ancestors.setLen(0) # Look for a state in the database and load it - as long as it cannot be # found, keep track of the blocks that are needed to reach it from the # state that eventually will be found. # If we hit the tail, it means that we've reached a point for which we can # no longer recreate history - this happens for example when starting from # a checkpoint block let startEpoch = bsi.slot.epoch while not canAdvance(state, cur) and not dag.db.getState(dag.cfg, cur.bid.root, cur.slot, state, noRollback): # There's no state saved for this particular BlockSlot combination, and # the state we have can't trivially be advanced (in case it was older than # RewindBlockThreshold), keep looking.. if cur.isProposed(): # This is not an empty slot, so the block will need to be applied to # eventually reach bs ancestors.add(cur.bid) if cur.slot == GENESIS_SLOT or (cur.slot < dag.finalizedHead.slot and cur.slot.epoch + uint64(EPOCHS_PER_STATE_SNAPSHOT) * 2 < startEpoch): # We've either walked two full state snapshot lengths or hit the tail # and still can't find a matching state: this can happen when # starting the node from an arbitrary finalized checkpoint and not # backfilling the states notice "Request for pruned historical state", request = shortLog(bsi), tail = shortLog(dag.tail), cur = shortLog(cur), finalized = shortLog(dag.finalizedHead) return false # Move slot by slot to capture epoch boundary states cur = dag.parentOrSlot(cur).valueOr: if not dag.getStateByParent(cur.bid, state): notice "Request for pruned historical state", request = shortLog(bsi), tail = shortLog(dag.tail), cur = shortLog(cur) return false break beacon_state_rewinds.inc() # Starting state has been assigned, either from memory or database let assignTick = Moment.now() ancestor {.used.} = withState(state): BlockSlotId.init(forkyState.latest_block_id, forkyState.data.slot) ancestorRoot {.used.} = getStateRoot(state) var info: ForkedEpochInfo # Time to replay all the blocks between then and now for i in countdown(ancestors.len - 1, 0): # Because the ancestors are in the database, there's no need to persist them # again. Also, because we're applying blocks that were loaded from the # database, we can skip certain checks that have already been performed # before adding the block to the database. if (let res = dag.applyBlock(state, ancestors[i], cache, info); res.isErr): warn "Failed to apply block from database", blck = shortLog(ancestors[i]), state_bid = shortLog(state.latest_block_id), error = res.error() return false # ...and make sure to process empty slots as requested dag.advanceSlots(state, bsi.slot, save, cache, info) # ...and make sure to load the state cache, if it exists loadStateCache(dag, cache, bsi.bid, getStateField(state, slot).epoch) let assignDur = assignTick - startTick replayDur = Moment.now() - assignTick beacon_dag_state_replay_seconds.inc(replayDur.toFloatSeconds) # TODO https://github.com/status-im/nim-chronicles/issues/108 if (assignDur + replayDur) >= MinSignificantProcessingDuration: # This might indicate there's a cache that's not in order or a disk that is # too slow - for now, it's here for investigative purposes and the cutoff # time might need tuning info "State replayed", blocks = ancestors.len, slots = getStateField(state, slot) - ancestor.slot, current = shortLog(current), ancestor = shortLog(ancestor), target = shortLog(bsi), ancestorStateRoot = shortLog(ancestorRoot), targetStateRoot = shortLog(getStateRoot(state)), found, assignDur, replayDur elif ancestors.len > 0: debug "State replayed", blocks = ancestors.len, slots = getStateField(state, slot) - ancestor.slot, current = shortLog(current), ancestor = shortLog(ancestor), target = shortLog(bsi), ancestorStateRoot = shortLog(ancestorRoot), targetStateRoot = shortLog(getStateRoot(state)), found, assignDur, replayDur else: # Normal case! trace "State advanced", blocks = ancestors.len, slots = getStateField(state, slot) - ancestor.slot, current = shortLog(current), ancestor = shortLog(ancestor), target = shortLog(bsi), ancestorStateRoot = shortLog(ancestorRoot), targetStateRoot = shortLog(getStateRoot(state)), found, assignDur, replayDur true proc delState(dag: ChainDAGRef, bsi: BlockSlotId) = # Delete state and mapping for a particular block+slot if not dag.isStateCheckpoint(bsi): return # We only ever save epoch states if (let root = dag.db.getStateRoot(bsi.bid.root, bsi.slot); root.isSome()): dag.db.withManyWrites: dag.db.delStateRoot(bsi.bid.root, bsi.slot) dag.db.delState( dag.cfg.consensusForkAtEpoch(bsi.slot.epoch), root.get()) proc pruneBlockSlot(dag: ChainDAGRef, bs: BlockSlot) = # TODO: should we move that disk I/O to `onSlotEnd` dag.delState(bs.toBlockSlotId().expect("not nil")) if bs.isProposed(): # Update light client data dag.deleteLightClientData(bs.blck.bid) bs.blck.executionValid = true dag.forkBlocks.excl(KeyedBlockRef.init(bs.blck)) discard dag.db.delBlock( dag.cfg.consensusForkAtEpoch(bs.blck.slot.epoch), bs.blck.root) proc pruneBlocksDAG(dag: ChainDAGRef) = ## This prunes the block DAG ## This does NOT prune the cached state checkpoints and EpochRef ## This must be done after a new finalization point is reached ## to invalidate pending blocks or attestations referring ## to a now invalid fork. ## ## This does NOT update the `dag.lastPrunePoint` field. ## as the caches and fork choice can be pruned at a later time. # Clean up block refs, walking block by block let startTick = Moment.now() # Finalization means that we choose a single chain as the canonical one - # it also means we're no longer interested in any branches from that chain # up to the finalization point let hlen = dag.heads.len for i in 0..= ConsensusFork.Altair: let period = sync_committee_period(slot) curPeriod = sync_committee_period(forkyState.data.slot) if period == curPeriod: @(dag.headSyncCommittees.current_sync_committee) elif period == curPeriod + 1: @(dag.headSyncCommittees.next_sync_committee) else: @[] else: @[] func getSubcommitteePositionsAux( dag: ChainDAGRef, syncCommittee: openArray[ValidatorIndex], subcommitteeIdx: SyncSubcommitteeIndex, validatorIdx: uint64): seq[uint64] = var pos = 0'u64 for valIdx in syncCommittee.syncSubcommittee(subcommitteeIdx): if validatorIdx == uint64(valIdx): result.add pos inc pos func getSubcommitteePositions*( dag: ChainDAGRef, slot: Slot, subcommitteeIdx: SyncSubcommitteeIndex, validatorIdx: uint64): seq[uint64] = withState(dag.headState): when consensusFork >= ConsensusFork.Altair: let period = sync_committee_period(slot) curPeriod = sync_committee_period(forkyState.data.slot) template search(syncCommittee: openArray[ValidatorIndex]): seq[uint64] = dag.getSubcommitteePositionsAux( syncCommittee, subcommitteeIdx, validatorIdx) if period == curPeriod: search(dag.headSyncCommittees.current_sync_committee) elif period == curPeriod + 1: search(dag.headSyncCommittees.next_sync_committee) else: @[] else: @[] template syncCommitteeParticipants*( dag: ChainDAGRef, slot: Slot, subcommitteeIdx: SyncSubcommitteeIndex): seq[ValidatorIndex] = toSeq(syncSubcommittee(dag.syncCommitteeParticipants(slot), subcommitteeIdx)) iterator syncCommitteeParticipants*( dag: ChainDAGRef, slot: Slot, subcommitteeIdx: SyncSubcommitteeIndex, aggregationBits: SyncCommitteeAggregationBits): ValidatorIndex = for pos, valIdx in dag.syncCommitteeParticipants(slot, subcommitteeIdx): if pos < aggregationBits.bits and aggregationBits[pos]: yield valIdx func needStateCachesAndForkChoicePruning*(dag: ChainDAGRef): bool = dag.lastPrunePoint != dag.finalizedHead.toBlockSlotId().expect("not nil") proc pruneStateCachesDAG*(dag: ChainDAGRef) = ## This prunes the cached state checkpoints and EpochRef ## This does NOT prune the state associated with invalidated blocks on a fork ## They are pruned via `pruneBlocksDAG` ## ## This updates the `dag.lastPrunePoint` variable doAssert dag.needStateCachesAndForkChoicePruning() let startTick = Moment.now() block: # Remove states, walking slot by slot # We remove all state checkpoints that come _before_ the current finalized # head, as we might frequently be asked to replay states from the # finalized checkpoint and onwards (for example when validating blocks and # attestations) var finPoint = dag.finalizedHead.toBlockSlotId().expect("not nil") cur = dag.parentOrSlot(dag.stateCheckpoint(finPoint)) prev = dag.parentOrSlot(dag.stateCheckpoint(dag.lastPrunePoint)) while cur.isSome and prev.isSome and cur.get() != prev.get(): let bs = cur.get() if not isFinalizedStateSnapshot(bs.slot) and bs.slot != dag.tail.slot: dag.delState(bs) let tmp = cur.get() cur = dag.parentOrSlot(tmp) let statePruneTick = Moment.now() block: # Clean up old EpochRef instances # After finalization, we can clear up the epoch cache and save memory - # it will be recomputed if needed dag.epochRefs.delIt(it.epoch < dag.finalizedHead.slot.epoch) dag.shufflingRefs.delIt(it.epoch < dag.finalizedHead.slot.epoch) let epochRefPruneTick = Moment.now() dag.lastPrunePoint = dag.finalizedHead.toBlockSlotId().expect("not nil") debug "Pruned the state checkpoints and DAG caches.", statePruneDur = statePruneTick - startTick, epochRefPruneDur = epochRefPruneTick - statePruneTick func pruneStep(horizon, lastHorizon, lastBlockHorizon: Slot): tuple[stateHorizon, blockHorizon: Slot] = ## Compute a reasonable incremental pruning step considering the current ## horizon, how far the database has been pruned already and where we want the ## tail to be - the return value shows the first state and block that we ## should _keep_ (inclusive). const SLOTS_PER_STATE_SNAPSHOT = uint64(EPOCHS_PER_STATE_SNAPSHOT * SLOTS_PER_EPOCH) let blockHorizon = block: let # Keep up with horizon if it's moving fast, ie if we're syncing maxSlots = max(horizon - lastHorizon, MAX_SLOTS_PER_PRUNE) # Move the block horizon cap with a lag so that it moves slot-by-slot # instead of a big jump every time we prune a state - assuming we # prune every slot, this makes us prune one slot at a time instead of # a burst of prunes (as computed by maxSlots) around every snapshot # change followed by no pruning for the rest of the period maxBlockHorizon = if horizon + 1 >= SLOTS_PER_STATE_SNAPSHOT: horizon + 1 - SLOTS_PER_STATE_SNAPSHOT else: Slot(0) # `lastBlockHorizon` captures the case where we're incrementally # pruning a database that hasn't been pruned for a while: it's # initialized to a pre-tail value on startup and moves to approach # `maxBlockHorizon`. min(maxBlockHorizon, lastBlockHorizon + maxSlots) # Round up such that we remove state only once blocks have been removed stateHorizon = ((blockHorizon + SLOTS_PER_STATE_SNAPSHOT - 1) div SLOTS_PER_STATE_SNAPSHOT) * SLOTS_PER_STATE_SNAPSHOT (Slot(stateHorizon), blockHorizon) proc pruneHistory*(dag: ChainDAGRef, startup = false) = ## Perform an incremental pruning step of the history if dag.db.db.readOnly: return let horizon = dag.horizon() (stateHorizon, blockHorizon) = pruneStep( horizon, dag.lastHistoryPruneHorizon, dag.lastHistoryPruneBlockHorizon) doAssert blockHorizon <= stateHorizon, "we must never prune blocks while leaving the state" debug "Pruning history", horizon, blockHorizon, stateHorizon, lastHorizon = dag.lastHistoryPruneHorizon, lastBlockHorizon = dag.lastHistoryPruneBlockHorizon, tail = dag.tail, head = dag.head dag.lastHistoryPruneHorizon = horizon dag.lastHistoryPruneBlockHorizon = blockHorizon dag.db.withManyWrites: if stateHorizon > dag.tail.slot: # First, we want to see if it's possible to prune any states - we store one # state every EPOCHS_PER_STATE_SNAPSHOT, so this happens infrequently. var cur = dag.getBlockIdAtSlot(stateHorizon) var first = true while cur.isSome(): let bs = cur.get() # We don't delete legacy states because the legacy database is openend # in read-only and slow to delete from due to its sub-optimal structure if dag.db.containsState( dag.cfg, bs.bid.root, bs.slot..bs.slot, legacy = first): if first: # We leave the state on the prune horizon intact and update the tail # to point to this state, indicating the new point in time from # which we can load states in general. debug "Updating tail", bs dag.db.putTailBlock(bs.bid.root) dag.tail = bs.bid first = false else: debug "Pruning historical state", bs dag.delState(bs) elif not bs.isProposed: trace "Reached already-pruned slot, done pruning states", bs break if bs.isProposed: # We store states either at the same slot at the block (checkpoint) or # by advancing the slot to the nearest epoch start - check both when # pruning cur = dag.parentOrSlot(bs) elif bs.slot.epoch > EPOCHS_PER_STATE_SNAPSHOT: # Jump one snapshot interval at a time, but don't prune genesis cur = dag.getBlockIdAtSlot(start_slot(bs.slot.epoch() - EPOCHS_PER_STATE_SNAPSHOT)) else: break # Prune blocks after sanity-checking that we don't prune post-tail blocks - # this could happen if a state is missing at the expected state horizon and # would indicate a partially inconsistent database since the base # invariant is that there exists a state at the snapshot slot - better not # further mess things up regardless if blockHorizon > GENESIS_SLOT and blockHorizon <= dag.tail.slot: var # Leave the horizon block itself cur = dag.getBlockIdAtSlot(blockHorizon - 1).map(proc(x: auto): auto = x.bid) while cur.isSome: let bid = cur.get() fork = dag.cfg.consensusForkAtEpoch(bid.slot.epoch) if bid.slot == GENESIS_SLOT: # Leave genesis block for nostalgia and the REST API break if not dag.db.delBlock(fork, bid.root): # Stop at the first gap - this is typically the pruning point of the # previous call to pruneHistory. An inconsistent DB might have more # blocks beyond that point but we have no efficient way of detecting # that. break cur = dag.parent(bid) # TODO There have been varied reports of startup pruning causing long # startup times - an incremental approach would be needed here also if false and startup and dag.cfg.consensusForkAtEpoch(blockHorizon.epoch) > ConsensusFork.Phase0: # Once during start, we'll clear all "old fork" data - this ensures we get # rid of any leftover junk in the tables - we do so after linear pruning # so as to "mostly" clean up the phase0 tables as well (which cannot be # pruned easily by fork) - one fork at a time, so as not to take too long let stateFork = dag.cfg.consensusForkAtEpoch(dag.tail.slot.epoch) var clearedStates = false if stateFork > ConsensusFork.Phase0: for fork in ConsensusFork.Phase0.. ConsensusFork.Phase0: for fork in ConsensusFork.Phase0..= ConsensusFork.Bellatrix: Opt.some forkyBlck.message.body.execution_payload.block_hash else: Opt.some ZERO_HASH proc loadExecutionBlockHash*( dag: ChainDAGRef, blck: BlockRef): Opt[Eth2Digest] = if blck.executionBlockHash.isNone: blck.executionBlockHash = dag.loadExecutionBlockHash(blck.bid) blck.executionBlockHash from std/packedsets import PackedSet, incl, items func getValidatorChangeStatuses( state: ForkedHashedBeaconState, vis: openArray[ValidatorIndex]): PackedSet[ValidatorIndex] = var res: PackedSet[ValidatorIndex] withState(state): for vi in vis: if forkyState.data.validators[vi].withdrawal_credentials.data[0] == BLS_WITHDRAWAL_PREFIX: res.incl vi res func checkBlsToExecutionChanges( state: ForkedHashedBeaconState, vis: PackedSet[ValidatorIndex]): bool = # Within each fork, BLS_WITHDRAWAL_PREFIX to ETH1_ADDRESS_WITHDRAWAL_PREFIX # and never ETH1_ADDRESS_WITHDRAWAL_PREFIX to BLS_WITHDRAWAL_PREFIX. Latter # can still happen via reorgs. # Cases: # 1) unchanged (BLS_WITHDRAWAL_PREFIX or ETH1_ADDRESS_WITHDRAWAL_PREFIX) from # old to new head. # 2) ETH1_ADDRESS_WITHDRAWAL_PREFIX to BLS_WITHDRAWAL_PREFIX # 3) BLS_WITHDRAWAL_PREFIX to ETH1_ADDRESS_WITHDRAWAL_PREFIX # # Only report (3), i.e. whether there were validator indices with withdrawal # credentials previously using BLS_WITHDRAWAL_PREFIX now using, instead, the # ETH1_ADDRESS_WITHDRAWAL_PREFIX prefix indicating a BLS to execution change # went through. # # Since it tracks head, it's possible reorgs trigger reporting the same # validator indices multiple times; this is fine. withState(state): anyIt( vis, forkyState.data.validators[it].has_eth1_withdrawal_credential) proc updateHead*( dag: ChainDAGRef, newHead: BlockRef, quarantine: var Quarantine, knownValidators: openArray[ValidatorIndex]) = ## Update what we consider to be the current head, as given by the fork ## choice. ## ## The choice of head affects the choice of finalization point - the order ## of operations naturally becomes important here - after updating the head, ## blocks that were once considered potential candidates for a tree will ## now fall from grace, or no longer be considered resolved. doAssert not newHead.isNil() # Could happen if enough blocks get invalidated and would corrupt database - # When finalized checkpoint is empty, the slot may also be smaller doAssert newHead.slot >= dag.finalizedHead.slot or newHead == dag.finalizedHead.blck let lastHead = dag.head logScope: newHead = shortLog(newHead) lastHead = shortLog(lastHead) if lastHead == newHead: trace "No head block update" return if newHead.parent.isNil: # The new head should always have the finalizedHead as ancestor - thus, # this should not happen except in a race condition where the selected # `BlockRef` had its parent set to nil as happens during finalization - # notably, resetting the head to be the finalizedHead is not allowed error "Cannot update head to block without parent" return let lastHeadStateRoot = getStateRoot(dag.headState) lastHeadMergeComplete = dag.headState.is_merge_transition_complete() lastHeadKind = dag.headState.kind lastKnownValidatorsChangeStatuses = getValidatorChangeStatuses( dag.headState, knownValidators) # Start off by making sure we have the right state - updateState will try # to use existing in-memory states to make this smooth var cache: StateCache if not updateState( dag, dag.headState, newHead.bid.atSlot(), false, cache): # Advancing the head state should never fail, given that the tail is # implicitly finalised, the head is an ancestor of the tail and we always # store the tail state in the database, as well as every epoch slot state in # between fatal "Unable to load head state during head update, database corrupt?", lastHead = shortLog(lastHead) quit 1 dag.head = newHead if dag.headState.is_merge_transition_complete() and not lastHeadMergeComplete and dag.vanityLogs.onMergeTransitionBlock != nil: dag.vanityLogs.onMergeTransitionBlock() if dag.headState.kind > lastHeadKind: case dag.headState.kind of ConsensusFork.Phase0 .. ConsensusFork.Bellatrix: discard of ConsensusFork.Capella: if dag.vanityLogs.onUpgradeToCapella != nil: dag.vanityLogs.onUpgradeToCapella() of ConsensusFork.Deneb: if dag.vanityLogs.onUpgradeToDeneb != nil: dag.vanityLogs.onUpgradeToDeneb() of ConsensusFork.Electra: if dag.vanityLogs.onUpgradeToElectra != nil: dag.vanityLogs.onUpgradeToElectra() if dag.vanityLogs.onKnownBlsToExecutionChange != nil and checkBlsToExecutionChanges( dag.headState, lastKnownValidatorsChangeStatuses): dag.vanityLogs.onKnownBlsToExecutionChange() dag.db.putHeadBlock(newHead.root) updateBeaconMetrics(dag.headState, dag.head.bid, cache) withState(dag.headState): when consensusFork >= ConsensusFork.Altair: dag.headSyncCommittees = forkyState.data.get_sync_committee_cache(cache) let finalized_checkpoint = getStateField(dag.headState, finalized_checkpoint) finalizedSlot = # finalized checkpoint may move back in the head state compared to what # we've seen in other forks - it does not move back in fork choice # however, so we'll use the last-known-finalized in that case max(finalized_checkpoint.epoch.start_slot(), dag.finalizedHead.slot) finalizedHead = newHead.atSlot(finalizedSlot) doAssert (not finalizedHead.blck.isNil), "Block graph should always lead to a finalized block" # Update light client data dag.processHeadChangeForLightClient() let (isAncestor, ancestorDepth) = lastHead.getDepth(newHead) if not(isAncestor): notice "Updated head block with chain reorg", headParent = shortLog(newHead.parent), stateRoot = shortLog(getStateRoot(dag.headState)), justified = shortLog(getStateField( dag.headState, current_justified_checkpoint)), finalized = shortLog(getStateField(dag.headState, finalized_checkpoint)), isOptHead = not newHead.executionValid if not(isNil(dag.onReorgHappened)): let # TODO (cheatfate): Proper implementation required data = ReorgInfoObject.init(dag.head.slot, uint64(ancestorDepth), lastHead.root, newHead.root, lastHeadStateRoot, getStateRoot(dag.headState)) dag.onReorgHappened(data) # A reasonable criterion for "reorganizations of the chain" quarantine.clearAfterReorg() beacon_reorgs_total_total.inc() beacon_reorgs_total.inc() else: debug "Updated head block", stateRoot = shortLog(getStateRoot(dag.headState)), justified = shortLog(getStateField( dag.headState, current_justified_checkpoint)), finalized = shortLog(getStateField(dag.headState, finalized_checkpoint)), isOptHead = not newHead.executionValid if not(isNil(dag.onHeadChanged)): let depRoot = withState(dag.headState): forkyState.proposer_dependent_root prevDepRoot = withState(dag.headState): forkyState.attester_dependent_root epochTransition = (finalizedHead != dag.finalizedHead) # TODO (cheatfate): Proper implementation required data = HeadChangeInfoObject.init(dag.head.slot, dag.head.root, getStateRoot(dag.headState), epochTransition, prevDepRoot, depRoot) dag.onHeadChanged(data) withState(dag.headState): # Every time the head changes, the "canonical" view of balances and other # state-related metrics change - notify the validator monitor. # Doing this update during head update ensures there's a reasonable number # of such updates happening - at most once per valid block. dag.validatorMonitor[].registerState(forkyState.data) if finalizedHead != dag.finalizedHead: debug "Reached new finalization checkpoint", stateRoot = shortLog(getStateRoot(dag.headState)), justified = shortLog(getStateField( dag.headState, current_justified_checkpoint)), finalized = shortLog(getStateField(dag.headState, finalized_checkpoint)) let oldFinalizedHead = dag.finalizedHead block: # Update `dag.finalizedBlocks` with all newly finalized blocks (those # newer than the previous finalized head), then update `dag.finalizedHead` var newFinalized: seq[BlockId] var tmp = finalizedHead.blck while not isNil(tmp) and tmp.slot >= dag.finalizedHead.slot: newFinalized.add(tmp.bid) if tmp != finalizedHead.blck: # The newly finalized block itself should remain in here so that fork # choice still can find it via root dag.forkBlocks.excl(KeyedBlockRef.init(tmp)) let p = tmp.parent tmp.parent = nil # Reset all parent links to release memory tmp = p dag.finalizedHead = finalizedHead dag.db.updateFinalizedBlocks(newFinalized) let oldBlockHash = dag.loadExecutionBlockHash(oldFinalizedHead.blck) if oldBlockHash.isSome and oldBlockHash.unsafeGet.isZero: let newBlockHash = dag.loadExecutionBlockHash(dag.finalizedHead.blck) if newBlockHash.isSome and not newBlockHash.unsafeGet.isZero: if dag.vanityLogs.onFinalizedMergeTransitionBlock != nil: dag.vanityLogs.onFinalizedMergeTransitionBlock() # Pruning the block dag is required every time the finalized head changes # in order to clear out blocks that are no longer viable and should # therefore no longer be considered as part of the chain we're following dag.pruneBlocksDAG() # Update light client data dag.processFinalizationForLightClient(oldFinalizedHead) # Send notification about new finalization point via callback. if not(isNil(dag.onFinHappened)): let stateRoot = if dag.finalizedHead.slot == dag.head.slot: getStateRoot(dag.headState) elif dag.finalizedHead.slot + SLOTS_PER_HISTORICAL_ROOT > dag.head.slot: getStateField(dag.headState, state_roots).data[ int(dag.finalizedHead.slot mod SLOTS_PER_HISTORICAL_ROOT)] else: Eth2Digest() # The thing that finalized was >8192 blocks old? # TODO (cheatfate): Proper implementation required let data = FinalizationInfoObject.init( dag.finalizedHead.blck.root, stateRoot, dag.finalizedHead.slot.epoch) dag.onFinHappened(dag, data) proc isInitialized*(T: type ChainDAGRef, db: BeaconChainDB): Result[void, cstring] = ## Lightweight check to see if it is likely that the given database has been ## initialized let tailBlockRoot = db.getTailBlock() if not tailBlockRoot.isSome(): return err("Tail block root missing") let tailBlock = db.getBlockId(tailBlockRoot.get()) if not tailBlock.isSome(): return err("Tail block information missing") ok() proc preInit*( T: type ChainDAGRef, db: BeaconChainDB, state: ForkedHashedBeaconState) = ## Initialize a database using the given state, which potentially may be a ## non-genesis state. ## ## When used with a non-genesis state, the resulting database will not be ## compatible with pre-22.11 versions. logScope: stateRoot = $getStateRoot(state) stateSlot = getStateField(state, slot) doAssert getStateField(state, slot).is_epoch, "Can only initialize database from epoch states" withState(state): db.putState(forkyState) if forkyState.data.slot == GENESIS_SLOT: let blck = get_initial_beacon_block(forkyState) db.putBlock(blck) db.putGenesisBlock(blck.root) db.putHeadBlock(blck.root) db.putTailBlock(blck.root) notice "Database initialized from genesis", blockRoot = $blck.root else: let blockRoot = forkyState.latest_block_root() # We write a summary but not the block contents - these will have to be # backfilled from the network db.putBeaconBlockSummary(blockRoot, BeaconBlockSummary( slot: forkyState.data.latest_block_header.slot, parent_root: forkyState.data.latest_block_header.parent_root )) db.putHeadBlock(blockRoot) db.putTailBlock(blockRoot) if db.getGenesisBlock().isSome(): notice "Checkpoint written to database", blockRoot = $blockRoot else: notice "Database initialized from checkpoint", blockRoot = $blockRoot proc getProposer*( dag: ChainDAGRef, head: BlockRef, slot: Slot): Opt[ValidatorIndex] = let epochRef = dag.getEpochRef(head.bid, slot.epoch(), false).valueOr: notice "Cannot load EpochRef for given head", head, slot, error return Opt.none(ValidatorIndex) slotInEpoch = slot.since_epoch_start() let proposer = epochRef.beacon_proposers[slotInEpoch] if proposer.isSome(): if proposer.get().uint64 >= dag.db.immutableValidators.lenu64(): # Sanity check - it should never happen that the key cache doesn't contain # a key for the selected proposer - that would mean that we somehow # created validators in the state without updating the cache! warn "Proposer key not found", keys = dag.db.immutableValidators.lenu64(), proposer = proposer.get() return Opt.none(ValidatorIndex) proposer proc getProposalState*( dag: ChainDAGRef, head: BlockRef, slot: Slot, cache: var StateCache): Result[ref ForkedHashedBeaconState, cstring] = ## Return a state suitable for making proposals for the given head and slot - ## in particular, the state can be discarded after use and does not have a ## state root set # Start with the clearance state, since this one typically has been advanced # and thus has a hot hash tree cache let state = assignClone(dag.clearanceState) var info = ForkedEpochInfo() if not state[].can_advance_slots(head.root, slot): # The last state root will be computed as part of block production, so skip # it now if not dag.updateState( state[], head.atSlot(slot - 1).toBlockSlotId().expect("not nil"), false, cache): error "Cannot get proposal state - skipping block production, database corrupt?", head = shortLog(head), slot return err("Cannot create proposal state") else: loadStateCache(dag, cache, head.bid, slot.epoch) if getStateField(state[], slot) < slot: process_slots( dag.cfg, state[], slot, cache, info, {skipLastStateRootCalculation}).expect("advancing 1 slot should not fail") ok state func aggregateAll*( dag: ChainDAGRef, validator_indices: openArray[ValidatorIndex]): Result[CookedPubKey, cstring] = if validator_indices.len == 0: # Aggregation spec requires non-empty collection # - https://tools.ietf.org/html/draft-irtf-cfrg-bls-signature-04 # Consensus specs require at least one attesting index in attestation # - https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.6/specs/phase0/beacon-chain.md#is_valid_indexed_attestation return err("aggregate: no attesting keys") let firstKey = dag.validatorKey(validator_indices[0]).valueOr: return err("aggregate: invalid validator index") var aggregateKey{.noinit.}: AggregatePublicKey aggregateKey.init(firstKey) for i in 1 ..< validator_indices.len: let key = dag.validatorKey(validator_indices[i]).valueOr: return err("aggregate: invalid validator index") aggregateKey.aggregate(key) ok(finish(aggregateKey)) func aggregateAll*( dag: ChainDAGRef, validator_indices: openArray[ValidatorIndex|uint64], bits: BitSeq | BitArray): Result[CookedPubKey, cstring] = if validator_indices.len() != bits.len(): return err("aggregateAll: mismatch in bits length") var aggregateKey{.noinit.}: AggregatePublicKey inited = false for i in 0.. dag.horizon proc rebuildIndex*(dag: ChainDAGRef) = ## After a checkpoint sync, we lack intermediate states to replay from - this ## function rebuilds them so that historical replay can take place again ## TODO the pruning of junk states could be moved to a separate function that ## runs either on startup # First, we check what states we already have in the database - that allows # resuming the operation at any time let roots = dag.db.loadStateRoots() historicalRoots = getStateField(dag.headState, historical_roots).asSeq() historicalSummaries = dag.headState.historical_summaries.asSeq() var canonical = newSeq[Eth2Digest]( (dag.finalizedHead.slot.epoch + EPOCHS_PER_STATE_SNAPSHOT - 1) div EPOCHS_PER_STATE_SNAPSHOT) # `junk` puts in place some infrastructure to prune unnecessary states - it # will be more useful in the future as a base for pruning junk: seq[((Slot, Eth2Digest), Eth2Digest)] for k, v in roots: if k[0] >= dag.finalizedHead.slot: continue # skip newer stuff if k[0] < dag.backfill.slot: continue # skip stuff for which we have no blocks if not isFinalizedStateSnapshot(k[0]): # `tail` will move at the end of the process, so we won't need any # intermediate states junk.add((k, v)) continue # skip non-snapshot slots if k[0] > 0: let bs = dag.getBlockIdAtSlot(k[0] - 1) if bs.isNone or bs.get().bid.root != k[1]: # remove things that are no longer a canonical part of the chain or # cannot be reached via a block junk.add((k, v)) continue if not dag.db.containsState(dag.cfg.consensusForkAtEpoch(k[0].epoch), v): continue # If it's not in the database.. canonical[k[0].epoch div EPOCHS_PER_STATE_SNAPSHOT] = v let state = (ref ForkedHashedBeaconState)() var cache: StateCache info: ForkedEpochInfo tailBid: Opt[BlockId] states: int # `canonical` holds all slots at which a state is expected to appear, using a # zero root whenever a particular state is missing - this way, if there's # partial progress or gaps, they will be dealt with correctly for i, state_root in canonical.mpairs(): let slot = Epoch(i * EPOCHS_PER_STATE_SNAPSHOT).start_slot if slot < dag.backfill.slot: # TODO if we have era files, we could try to load blocks from them at # this point # TODO if we don't do the above, we can of course compute the starting `i` continue if tailBid.isNone(): if state_root.isZero: # If we can find an era file with this state, use it as an alternative # starting point - ignore failures for now if dag.era.getState( historicalRoots, historicalSummaries, slot, state[]).isOk(): state_root = getStateRoot(state[]) withState(state[]): dag.db.putState(forkyState) tailBid = Opt.some state[].latest_block_id() else: if not dag.db.getState( dag.cfg.consensusForkAtEpoch(slot.epoch), state_root, state[], noRollback): fatal "Cannot load state, database corrupt or created for a different network?", state_root, slot quit 1 tailBid = Opt.some state[].latest_block_id() continue if i == 0 or canonical[i - 1].isZero: reset(tailBid) # No unbroken history! continue if not state_root.isZero: states += 1 continue let startSlot = Epoch((i - 1) * EPOCHS_PER_STATE_SNAPSHOT).start_slot info "Recreating state snapshot", slot, startStateRoot = canonical[i - 1], startSlot if getStateRoot(state[]) != canonical[i - 1]: if not dag.db.getState( dag.cfg.consensusForkAtEpoch(startSlot.epoch), canonical[i - 1], state[], noRollback): error "Can't load start state, database corrupt?", startStateRoot = shortLog(canonical[i - 1]), slot = startSlot return for slot in startSlot.. 0: info "Dropping redundant states", states, redundant = junk.len for i in junk: dag.db.delStateRoot(i[0][1], i[0][0]) dag.db.delState(dag.cfg.consensusForkAtEpoch(i[0][0].epoch), i[1])