# beacon_chain # Copyright (c) 2018-2024 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. {.push raises: [].} import std/algorithm, # Status libraries metrics, chronicles, stew/byteutils, # Internal ../spec/[ beaconstate, eth2_merkleization, forks, state_transition_epoch, validator], "."/[spec_cache, blockchain_dag, block_quarantine], ../fork_choice/fork_choice, ../beacon_clock from std/sequtils import keepItIf, maxIndex export blockchain_dag, fork_choice const # TODO since deneb, this is looser (whole previous epoch) ATTESTATION_LOOKBACK = min(24'u64, SLOTS_PER_EPOCH) + MIN_ATTESTATION_INCLUSION_DELAY ## The number of slots we'll keep track of in terms of "free" attestations ## that potentially could be added to a newly created block type OnPhase0AttestationCallback = proc(data: phase0.Attestation) {.gcsafe, raises: [].} OnElectraAttestationCallback = proc(data: electra.Attestation) {.gcsafe, raises: [].} Validation[CVBType] = object ## Validations collect a set of signatures for a distict attestation - in ## eth2, a single bit is used to keep track of which signatures have been ## added to the aggregate meaning that only non-overlapping aggregates may ## be further combined. aggregation_bits: CVBType aggregate_signature: AggregateSignature Phase0Validation = Validation[CommitteeValidatorsBits] ElectraValidation = Validation[ElectraCommitteeValidatorsBits] AttestationEntry[CVBType] = object ## Each entry holds the known signatures for a particular, distinct vote ## For electra+, the data has been changed to hold the committee index data: AttestationData committee_len: int singles: Table[int, CookedSig] ## \ ## On the attestation subnets, only attestations with a single vote are ## allowed - these can be collected separately to top up aggregates with - ## here we collect them by mapping index in committee to a vote aggregates: seq[Validation[CVBType]] Phase0AttestationEntry = AttestationEntry[CommitteeValidatorsBits] ElectraAttestationEntry = AttestationEntry[ElectraCommitteeValidatorsBits] AttestationTable[CVBType] = Table[Eth2Digest, AttestationEntry[CVBType]] ## Depending on the world view of the various validators, they may have ## voted on different states - this map keeps track of each vote keyed by ## getAttestationCandidateKey() AttestationPool* = object ## The attestation pool keeps track of all attestations that potentially ## could be added to a block during block production. ## These attestations also contribute to the fork choice, which combines ## "free" attestations with those found in past blocks - these votes ## are tracked separately in the fork choice. phase0Candidates: array[ATTESTATION_LOOKBACK.int, AttestationTable[CommitteeValidatorsBits]] ## \ ## We keep one item per slot such that indexing matches slot number ## together with startingSlot electraCandidates: array[ATTESTATION_LOOKBACK.int, AttestationTable[ElectraCommitteeValidatorsBits]] ## \ ## We keep one item per slot such that indexing matches slot number ## together with startingSlot startingSlot: Slot ## \ ## Generally, we keep attestations only until a slot has been finalized - ## after that, they may no longer affect fork choice. dag*: ChainDAGRef quarantine*: ref Quarantine forkChoice*: ForkChoice nextAttestationEpoch*: seq[tuple[subnet: Epoch, aggregate: Epoch]] ## \ ## sequence based on validator indices onPhase0AttestationAdded: OnPhase0AttestationCallback onElectraAttestationAdded: OnElectraAttestationCallback logScope: topics = "attpool" declareGauge attestation_pool_block_attestation_packing_time, "Time it took to create list of attestations for block" proc init*(T: type AttestationPool, dag: ChainDAGRef, quarantine: ref Quarantine, onAttestation: OnPhase0AttestationCallback = nil, onElectraAttestation: OnElectraAttestationCallback = nil): T = ## Initialize an AttestationPool from the dag `headState` ## The `finalized_root` works around the finalized_checkpoint of the genesis block ## holding a zero_root. let finalizedEpochRef = dag.getFinalizedEpochRef() var forkChoice = ForkChoice.init( finalizedEpochRef, dag.finalizedHead.blck) # Feed fork choice with unfinalized history - during startup, block pool only # keeps track of a single history so we just need to follow it doAssert dag.heads.len == 1, "Init only supports a single history" var blocks: seq[BlockRef] var cur = dag.head # When the chain is finalizing, the votes between the head block and the # finalized checkpoint should be enough for a stable fork choice - when the # chain is not finalizing, we want to seed it with as many votes as possible # since the whole history of each branch might be significant. It is however # a game of diminishing returns, and we have to weigh it against the time # it takes to replay that many blocks during startup and thus miss _new_ # votes. const ForkChoiceHorizon = 256 while cur != dag.finalizedHead.blck: blocks.add cur cur = cur.parent info "Initializing fork choice", unfinalized_blocks = blocks.len var epochRef = finalizedEpochRef for i in 0..= ConsensusFork.Altair: forkyState.data.compute_unrealized_finality() else: var cache: StateCache forkyState.data.compute_unrealized_finality(cache) else: default(FinalityCheckpoints) withBlck(blck): forkChoice.process_block( dag, epochRef, blckRef, unrealized, forkyBlck.message, blckRef.slot.start_beacon_time) doAssert status.isOk(), "Error in preloading the fork choice: " & $status.error debugComment "nothing initializes electra callback externally" info "Fork choice initialized", justified = shortLog(getStateField( dag.headState, current_justified_checkpoint)), finalized = shortLog(getStateField(dag.headState, finalized_checkpoint)) T( dag: dag, quarantine: quarantine, forkChoice: forkChoice, onPhase0AttestationAdded: onAttestation, onElectraAttestationAdded: onElectraAttestation ) proc addForkChoiceVotes( pool: var AttestationPool, slot: Slot, attesting_indices: openArray[ValidatorIndex], block_root: Eth2Digest, wallTime: BeaconTime) = # Add attestation votes to fork choice if (let v = pool.forkChoice.on_attestation( pool.dag, slot, block_root, attesting_indices, wallTime); v.isErr): # This indicates that the fork choice and the chain dag are out of sync - # this is most likely the result of a bug, but we'll try to keep going - # hopefully the fork choice will heal itself over time. error "Couldn't add attestation to fork choice, bug?", err = v.error() func candidateIdx(pool: AttestationPool, slot: Slot): Opt[int] = static: doAssert pool.phase0Candidates.len == pool.electraCandidates.len if slot >= pool.startingSlot and slot < (pool.startingSlot + pool.phase0Candidates.lenu64): Opt.some(int(slot mod pool.phase0Candidates.lenu64)) else: Opt.none(int) proc updateCurrent(pool: var AttestationPool, wallSlot: Slot) = if wallSlot + 1 < pool.phase0Candidates.lenu64: return # Genesis static: doAssert pool.phase0Candidates.len == pool.electraCandidates.len let newStartingSlot = wallSlot + 1 - pool.phase0Candidates.lenu64 if newStartingSlot < pool.startingSlot: error "Current slot older than attestation pool view, clock reset?", startingSlot = pool.startingSlot, newStartingSlot, wallSlot return # As time passes we'll clear out any old attestations as they are no longer # viable to be included in blocks if newStartingSlot - pool.startingSlot >= pool.phase0Candidates.lenu64(): # In case many slots passed since the last update, avoid iterating over # the same indices over and over pool.phase0Candidates.reset() pool.electraCandidates.reset() else: for i in pool.startingSlot..newStartingSlot: pool.phase0Candidates[i.uint64 mod pool.phase0Candidates.lenu64].reset() pool.electraCandidates[i.uint64 mod pool.electraCandidates.lenu64].reset() pool.startingSlot = newStartingSlot func oneIndex( bits: CommitteeValidatorsBits | ElectraCommitteeValidatorsBits): Opt[int] = # Find the index of the set bit, iff one bit is set var res = Opt.none(int) for idx in 0.. maxAttestationSlot: # Around genesis.. break let slot = Slot(maxAttestationSlot - i) candidateIdx = pool.candidateIdx(slot) if candidateIdx.isNone(): # Passed the collection horizon - shouldn't happen because it's based on # ATTESTATION_LOOKBACK break for _, entry in pool.phase0Candidates[candidateIdx.get()].mpairs(): entry.updateAggregates() for j in 0.. 0 and res.lenu64() < MAX_ATTESTATIONS: let entryCacheKey = block: # Find the candidate with the highest score - slot is used as a # tie-breaker so that more recent attestations are added first let candidate = # Fast path for when all remaining candidates fit if candidates.lenu64 < MAX_ATTESTATIONS: candidates.len - 1 else: maxIndex(candidates) (_, _, entry, j) = candidates[candidate] candidates.del(candidate) # careful, `del` reorders candidates res.add(entry[].toAttestation(entry[].aggregates[j])) # Update cache so that the new votes are taken into account when updating # the score below attCache.add(entry[].data, entry[].aggregates[j].aggregation_bits) entry[].data.getAttestationCacheKey block: # Because we added some votes, it's quite possible that some candidates # are no longer interesting - update the scores of the existing candidates for it in candidates.mitems(): # Aggregates not on the same (slot, committee) pair don't change scores if it.entry[].data.getAttestationCacheKey != entryCacheKey: continue it.score = attCache.score( it.entry[].data, it.entry[].aggregates[it.validation].aggregation_bits) candidates.keepItIf: # Only keep candidates that might add coverage it.score > 0 let packingDur = Moment.now() - startPackingTick debug "Packed attestations for block", newBlockSlot, packingDur, totalCandidates, attestations = res.len() attestation_pool_block_attestation_packing_time.set( packingDur.toFloatSeconds()) res proc getAttestationsForBlock*(pool: var AttestationPool, state: ForkedHashedBeaconState, cache: var StateCache): seq[phase0.Attestation] = withState(state): when consensusFork < ConsensusFork.Electra: pool.getAttestationsForBlock(forkyState, cache) else: default(seq[phase0.Attestation]) proc getElectraAttestationsForBlock*( pool: var AttestationPool, state: electra.HashedBeaconState, cache: var StateCache): seq[electra.Attestation] = let newBlockSlot = state.data.slot.uint64 if newBlockSlot < MIN_ATTESTATION_INCLUSION_DELAY: return @[] # Too close to genesis let # Attestations produced in a particular slot are added to the block # at the slot where at least MIN_ATTESTATION_INCLUSION_DELAY have passed maxAttestationSlot = newBlockSlot - MIN_ATTESTATION_INCLUSION_DELAY startPackingTick = Moment.now() var candidates: seq[tuple[ score: int, slot: Slot, entry: ptr ElectraAttestationEntry, validation: int]] attCache = AttestationCache[ElectraCommitteeValidatorsBits].init(state, cache) for i in 0.. maxAttestationSlot: # Around genesis.. break let slot = Slot(maxAttestationSlot - i) candidateIdx = pool.candidateIdx(slot) if candidateIdx.isNone(): # Passed the collection horizon - shouldn't happen because it's based on # ATTESTATION_LOOKBACK break for _, entry in pool.electraCandidates[candidateIdx.get()].mpairs(): entry.updateAggregates() for j in 0.. 0 and candidatesPerBlock.lenu64() < MAX_ATTESTATIONS_ELECTRA * MAX_COMMITTEES_PER_SLOT: let entryCacheKey = block: let (_, _, entry, j) = # Fast path for when all remaining candidates fit if candidates.lenu64 < MAX_ATTESTATIONS_ELECTRA: candidates[candidates.len - 1] else: # Get the candidate with the highest score candidates.pop() #TODO: Merge candidates per block structure with the candidates one # and score possible on-chain attestations while collecting candidates # (previous loop) and reavaluate cache key definition let key = (entry.data.beacon_block_root, entry.data.slot) newAtt = entry[].toElectraAttestation(entry[].aggregates[j]) candidatesPerBlock.withValue(key, candidate): candidate[].add newAtt do: candidatesPerBlock[key] = @[newAtt] # Update cache so that the new votes are taken into account when updating # the score below attCache.add(entry[].data, entry[].aggregates[j].aggregation_bits) entry[].data.getAttestationCacheKey block: # Because we added some votes, it's quite possible that some candidates # are no longer interesting - update the scores of the existing candidates for it in candidates.mitems(): # Aggregates not on the same (slot, committee) pair don't change scores if it.entry[].data.getAttestationCacheKey != entryCacheKey: continue it.score = attCache.score( it.entry[].data, it.entry[].aggregates[it.validation].aggregation_bits) candidates.keepItIf: # Only keep candidates that might add coverage it.score > 0 # Sort candidates by score use slot as a tie-breaker candidates.sort() # Consolidate attestation aggregates with disjoint comittee bits into single # attestation var res: seq[electra.Attestation] for a in candidatesPerBlock.values(): if a.len > 1: let att = compute_on_chain_aggregate(a).valueOr: continue res.add(att) #no on chain candidates else: res.add(a) if res.lenu64 == MAX_ATTESTATIONS_ELECTRA: break let packingDur = Moment.now() - startPackingTick debug "Packed attestations for block", newBlockSlot, packingDur, totalCandidates, attestations = res.len() attestation_pool_block_attestation_packing_time.set( packingDur.toFloatSeconds()) res proc getElectraAttestationsForBlock*( pool: var AttestationPool, state: ForkedHashedBeaconState, cache: var StateCache): seq[electra.Attestation] = withState(state): when consensusFork >= ConsensusFork.Electra: pool.getElectraAttestationsForBlock(forkyState, cache) else: default(seq[electra.Attestation]) func bestValidation(aggregates: openArray[Phase0Validation]): (int, int) = # Look for best validation based on number of votes in the aggregate doAssert aggregates.len() > 0, "updateAggregates should have created at least one aggregate" var bestIndex = 0 best = aggregates[bestIndex].aggregation_bits.countOnes() for i in 1.. best: best = count bestIndex = i (bestIndex, best) func getAggregatedAttestation*( pool: var AttestationPool, slot: Slot, attestation_data_root: Eth2Digest): Opt[phase0.Attestation] = let candidateIdx = pool.candidateIdx(slot) if candidateIdx.isNone: return Opt.none(phase0.Attestation) pool.phase0Candidates[candidateIdx.get].withValue( attestation_data_root, entry): entry[].updateAggregates() let (bestIndex, _) = bestValidation(entry[].aggregates) # Found the right hash, no need to look further return Opt.some(entry[].toAttestation(entry[].aggregates[bestIndex])) Opt.none(phase0.Attestation) func getAggregatedAttestation*( pool: var AttestationPool, slot: Slot, index: CommitteeIndex): Opt[phase0.Attestation] = ## Select the attestation that has the most votes going for it in the given ## slot/index ## https://github.com/ethereum/consensus-specs/blob/v1.4.0/specs/phase0/validator.md#construct-aggregate let candidateIdx = pool.candidateIdx(slot) if candidateIdx.isNone: return Opt.none(phase0.Attestation) var res: Opt[phase0.Attestation] for _, entry in pool.phase0Candidates[candidateIdx.get].mpairs(): doAssert entry.data.slot == slot if index != entry.data.index: continue entry.updateAggregates() let (bestIndex, best) = bestValidation(entry.aggregates) if res.isNone() or best > res.get().aggregation_bits.countOnes(): res = Opt.some(entry.toAttestation(entry.aggregates[bestIndex])) res type BeaconHead* = object blck*: BlockRef safeExecutionBlockHash*, finalizedExecutionBlockHash*: Eth2Digest proc getBeaconHead*( pool: AttestationPool, headBlock: BlockRef): BeaconHead = let finalizedExecutionBlockHash = pool.dag.loadExecutionBlockHash(pool.dag.finalizedHead.blck) .get(ZERO_HASH) # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.3/fork_choice/safe-block.md#get_safe_execution_payload_hash safeBlockRoot = pool.forkChoice.get_safe_beacon_block_root() safeBlock = pool.dag.getBlockRef(safeBlockRoot) safeExecutionBlockHash = if safeBlock.isErr: # Safe block is currently the justified block determined by fork choice. # If finality already advanced beyond the current justified checkpoint, # e.g., because we have selected a head that did not yet realize the cp, # the justified block may end up not having a `BlockRef` anymore. # Because we know that a different fork already finalized a later point, # let's just report the finalized execution payload hash instead. finalizedExecutionBlockHash else: pool.dag.loadExecutionBlockHash(safeBlock.get) .get(finalizedExecutionBlockHash) BeaconHead( blck: headBlock, safeExecutionBlockHash: safeExecutionBlockHash, finalizedExecutionBlockHash: finalizedExecutionBlockHash) proc selectOptimisticHead*( pool: var AttestationPool, wallTime: BeaconTime): Opt[BeaconHead] = ## Trigger fork choice and returns the new head block. let newHeadRoot = pool.forkChoice.get_head(pool.dag, wallTime) if newHeadRoot.isErr: error "Couldn't select head", err = newHeadRoot.error return err() let headBlock = pool.dag.getBlockRef(newHeadRoot.get()).valueOr: # This should normally not happen, but if the chain dag and fork choice # get out of sync, we'll need to try to download the selected head - in # the meantime, return nil to indicate that no new head was chosen warn "Fork choice selected unknown head, trying to sync", root = newHeadRoot.get() pool.quarantine[].addMissing(newHeadRoot.get()) return err() ok pool.getBeaconHead(headBlock) proc prune*(pool: var AttestationPool) = if (let v = pool.forkChoice.prune(); v.isErr): # If pruning fails, it's likely the result of a bug - this shouldn't happen # but we'll keep running hoping that the fork chocie will recover eventually error "Couldn't prune fork choice, bug?", err = v.error() proc validatorSeenAtEpoch*(pool: AttestationPool, epoch: Epoch, vindex: ValidatorIndex): bool = if uint64(vindex) < lenu64(pool.nextAttestationEpoch): let mark = pool.nextAttestationEpoch[vindex] (mark.subnet > epoch) or (mark.aggregate > epoch) else: false