# beacon_chain # Copyright (c) 2018-2023 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. {.push raises: [].} import # Standard libraries std/[tables, sequtils], # Status libraries metrics, chronicles, stew/[byteutils, results], # Internal ../spec/[ beaconstate, eth2_merkleization, forks, helpers, state_transition_epoch, validator], ../spec/datatypes/[phase0, altair, bellatrix], "."/[spec_cache, blockchain_dag, block_quarantine], ../fork_choice/fork_choice, ../beacon_clock from ../spec/datatypes/capella import HashedBeaconState, shortLog export tables, results, phase0, altair, bellatrix, blockchain_dag, fork_choice const ATTESTATION_LOOKBACK* = min(24'u64, SLOTS_PER_EPOCH) + MIN_ATTESTATION_INCLUSION_DELAY ## The number of slots we'll keep track of in terms of "free" attestations ## that potentially could be added to a newly created block type OnAttestationCallback* = proc(data: Attestation) {.gcsafe, raises: [Defect].} Validation* = object ## Validations collect a set of signatures for a distict attestation - in ## eth2, a single bit is used to keep track of which signatures have been ## added to the aggregate meaning that only non-overlapping aggregates may ## be further combined. aggregation_bits*: CommitteeValidatorsBits aggregate_signature*: AggregateSignature AttestationEntry* = object ## Each entry holds the known signatures for a particular, distinct vote data*: AttestationData committee_len*: int singles*: Table[int, CookedSig] ## \ ## On the attestation subnets, only attestations with a single vote are ## allowed - these can be collected separately to top up aggregates with - ## here we collect them by mapping index in committee to a vote aggregates*: seq[Validation] AttestationTable* = Table[Eth2Digest, AttestationEntry] ## Depending on the world view of the various validators, they may have ## voted on different states - this map keeps track of each vote keyed by ## hash_tree_root(AttestationData) AttestationPool* = object ## The attestation pool keeps track of all attestations that potentially ## could be added to a block during block production. ## These attestations also contribute to the fork choice, which combines ## "free" attestations with those found in past blocks - these votes ## are tracked separately in the fork choice. candidates*: array[ATTESTATION_LOOKBACK, AttestationTable] ## \ ## We keep one item per slot such that indexing matches slot number ## together with startingSlot startingSlot*: Slot ## \ ## Generally, we keep attestations only until a slot has been finalized - ## after that, they may no longer affect fork choice. dag*: ChainDAGRef quarantine*: ref Quarantine forkChoice*: ForkChoice nextAttestationEpoch*: seq[tuple[subnet: Epoch, aggregate: Epoch]] ## \ ## sequence based on validator indices onAttestationAdded*: OnAttestationCallback logScope: topics = "attpool" declareGauge attestation_pool_block_attestation_packing_time, "Time it took to create list of attestations for block" proc init*(T: type AttestationPool, dag: ChainDAGRef, quarantine: ref Quarantine, onAttestation: OnAttestationCallback = nil): T = ## Initialize an AttestationPool from the dag `headState` ## The `finalized_root` works around the finalized_checkpoint of the genesis block ## holding a zero_root. let finalizedEpochRef = dag.getFinalizedEpochRef() var forkChoice = ForkChoice.init( finalizedEpochRef, dag.finalizedHead.blck) # Feed fork choice with unfinalized history - during startup, block pool only # keeps track of a single history so we just need to follow it doAssert dag.heads.len == 1, "Init only supports a single history" var blocks: seq[BlockRef] var cur = dag.head # When the chain is finalizing, the votes between the head block and the # finalized checkpoint should be enough for a stable fork choice - when the # chain is not finalizing, we want to seed it with as many votes as possible # since the whole history of each branch might be significant. It is however # a game of diminishing returns, and we have to weigh it against the time # it takes to replay that many blocks during startup and thus miss _new_ # votes. const ForkChoiceHorizon = 256 while cur != dag.finalizedHead.blck: blocks.add cur cur = cur.parent info "Initializing fork choice", unfinalized_blocks = blocks.len var epochRef = finalizedEpochRef for i in 0..= ConsensusFork.Altair: forkyState.data.compute_unrealized_finality() else: var cache: StateCache forkyState.data.compute_unrealized_finality(cache) else: default(FinalityCheckpoints) withBlck(blck): forkChoice.process_block( dag, epochRef, blckRef, unrealized, blck.message, blckRef.slot.start_beacon_time) doAssert status.isOk(), "Error in preloading the fork choice: " & $status.error info "Fork choice initialized", justified = shortLog(getStateField( dag.headState, current_justified_checkpoint)), finalized = shortLog(getStateField(dag.headState, finalized_checkpoint)) T( dag: dag, quarantine: quarantine, forkChoice: forkChoice, onAttestationAdded: onAttestation ) proc addForkChoiceVotes( pool: var AttestationPool, slot: Slot, attesting_indices: openArray[ValidatorIndex], block_root: Eth2Digest, wallTime: BeaconTime) = # Add attestation votes to fork choice if (let v = pool.forkChoice.on_attestation( pool.dag, slot, block_root, attesting_indices, wallTime); v.isErr): # This indicates that the fork choice and the chain dag are out of sync - # this is most likely the result of a bug, but we'll try to keep going - # hopefully the fork choice will heal itself over time. error "Couldn't add attestation to fork choice, bug?", err = v.error() func candidateIdx(pool: AttestationPool, slot: Slot): Opt[int] = if slot >= pool.startingSlot and slot < (pool.startingSlot + pool.candidates.lenu64): Opt.some(int(slot mod pool.candidates.lenu64)) else: Opt.none(int) proc updateCurrent(pool: var AttestationPool, wallSlot: Slot) = if wallSlot + 1 < pool.candidates.lenu64: return # Genesis let newStartingSlot = wallSlot + 1 - pool.candidates.lenu64 if newStartingSlot < pool.startingSlot: error "Current slot older than attestation pool view, clock reset?", startingSlot = pool.startingSlot, newStartingSlot, wallSlot return # As time passes we'll clear out any old attestations as they are no longer # viable to be included in blocks if newStartingSlot - pool.startingSlot >= pool.candidates.lenu64(): # In case many slots passed since the last update, avoid iterating over # the same indices over and over pool.candidates = default(type(pool.candidates)) else: for i in pool.startingSlot..newStartingSlot: pool.candidates[i.uint64 mod pool.candidates.lenu64] = AttestationTable() pool.startingSlot = newStartingSlot func oneIndex(bits: CommitteeValidatorsBits): Opt[int] = # Find the index of the set bit, iff one bit is set var res = Opt.none(int) for idx in 0.. maxAttestationSlot: # Around genesis.. break let slot = Slot(maxAttestationSlot - i) candidateIdx = pool.candidateIdx(slot) if candidateIdx.isNone(): # Passed the collection horizon - shouldn't happen because it's based on # ATTESTATION_LOOKBACK break for _, entry in pool.candidates[candidateIdx.get()].mpairs(): entry.updateAggregates() for j in 0.. 0 and res.lenu64() < MAX_ATTESTATIONS: block: # Find the candidate with the highest score - slot is used as a # tie-breaker so that more recent attestations are added first let candidate = # Fast path for when all remaining candidates fit if candidates.lenu64 < MAX_ATTESTATIONS: candidates.len - 1 else: maxIndex(candidates) (_, _, entry, j) = candidates[candidate] candidates.del(candidate) # careful, `del` reorders candidates if entry[].data.target.epoch == prevEpoch: if prevEpochSpace < 1: continue # No need to rescore since we didn't add the attestation prevEpochSpace -= 1 res.add(entry[].toAttestation(entry[].aggregates[j])) # Update cache so that the new votes are taken into account when updating # the score below attCache.add(entry[].data, entry[].aggregates[j].aggregation_bits) block: # Because we added some votes, it's quite possible that some candidates # are no longer interesting - update the scores of the existing candidates for it in candidates.mitems(): it.score = attCache.score( it.entry[].data, it.entry[].aggregates[it.validation].aggregation_bits) candidates.keepItIf: # Only keep candidates that might add coverage it.score > 0 let packingDur = Moment.now() - startPackingTick debug "Packed attestations for block", newBlockSlot, packingDur, totalCandidates, attestations = res.len() attestation_pool_block_attestation_packing_time.set( packingDur.toFloatSeconds()) res proc getAttestationsForBlock*(pool: var AttestationPool, state: ForkedHashedBeaconState, cache: var StateCache): seq[Attestation] = withState(state): pool.getAttestationsForBlock(forkyState, cache) func bestValidation(aggregates: openArray[Validation]): (int, int) = # Look for best validation based on number of votes in the aggregate doAssert aggregates.len() > 0, "updateAggregates should have created at least one aggregate" var bestIndex = 0 best = aggregates[bestIndex].aggregation_bits.countOnes() for i in 1.. best: best = count bestIndex = i (bestIndex, best) func getAggregatedAttestation*(pool: var AttestationPool, slot: Slot, attestation_data_root: Eth2Digest): Opt[Attestation] = let candidateIdx = pool.candidateIdx(slot) if candidateIdx.isNone: return Opt.none(Attestation) pool.candidates[candidateIdx.get].withValue(attestation_data_root, entry): entry[].updateAggregates() let (bestIndex, _) = bestValidation(entry[].aggregates) # Found the right hash, no need to look further return Opt.some(entry[].toAttestation(entry[].aggregates[bestIndex])) Opt.none(Attestation) func getAggregatedAttestation*(pool: var AttestationPool, slot: Slot, index: CommitteeIndex): Opt[Attestation] = ## Select the attestation that has the most votes going for it in the given ## slot/index ## https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/phase0/validator.md#construct-aggregate let candidateIdx = pool.candidateIdx(slot) if candidateIdx.isNone: return Opt.none(Attestation) var res: Opt[Attestation] for _, entry in pool.candidates[candidateIdx.get].mpairs(): doAssert entry.data.slot == slot if index != entry.data.index: continue entry.updateAggregates() let (bestIndex, best) = bestValidation(entry.aggregates) if res.isNone() or best > res.get().aggregation_bits.countOnes(): res = Opt.some(entry.toAttestation(entry.aggregates[bestIndex])) res type BeaconHead* = object blck*: BlockRef safeExecutionPayloadHash*, finalizedExecutionPayloadHash*: Eth2Digest proc getBeaconHead*( pool: AttestationPool, headBlock: BlockRef): BeaconHead = let finalizedExecutionPayloadHash = pool.dag.loadExecutionBlockHash(pool.dag.finalizedHead.blck) # https://github.com/ethereum/consensus-specs/blob/v1.4.0-alpha.0/fork_choice/safe-block.md#get_safe_execution_payload_hash safeBlockRoot = pool.forkChoice.get_safe_beacon_block_root() safeBlock = pool.dag.getBlockRef(safeBlockRoot) safeExecutionPayloadHash = if safeBlock.isErr: # Safe block is currently the justified block determined by fork choice. # If finality already advanced beyond the current justified checkpoint, # e.g., because we have selected a head that did not yet realize the cp, # the justified block may end up not having a `BlockRef` anymore. # Because we know that a different fork already finalized a later point, # let's just report the finalized execution payload hash instead. finalizedExecutionPayloadHash else: pool.dag.loadExecutionBlockHash(safeBlock.get) BeaconHead( blck: headBlock, safeExecutionPayloadHash: safeExecutionPayloadHash, finalizedExecutionPayloadHash: finalizedExecutionPayloadHash) proc selectOptimisticHead*( pool: var AttestationPool, wallTime: BeaconTime): Opt[BeaconHead] = ## Trigger fork choice and returns the new head block. # TODO rename this to get_optimistic_head let newHeadRoot = pool.forkChoice.get_head(pool.dag, wallTime) if newHeadRoot.isErr: error "Couldn't select head", err = newHeadRoot.error return err() let headBlock = pool.dag.getBlockRef(newHeadRoot.get()).valueOr: # This should normally not happen, but if the chain dag and fork choice # get out of sync, we'll need to try to download the selected head - in # the meantime, return nil to indicate that no new head was chosen warn "Fork choice selected unknown head, trying to sync", root = newHeadRoot.get() pool.quarantine[].addMissing(newHeadRoot.get()) return err() ok pool.getBeaconHead(headBlock) proc prune*(pool: var AttestationPool) = if (let v = pool.forkChoice.prune(); v.isErr): # If pruning fails, it's likely the result of a bug - this shouldn't happen # but we'll keep running hoping that the fork chocie will recover eventually error "Couldn't prune fork choice, bug?", err = v.error() proc validatorSeenAtEpoch*(pool: AttestationPool, epoch: Epoch, vindex: ValidatorIndex): bool = if uint64(vindex) < lenu64(pool.nextAttestationEpoch): let mark = pool.nextAttestationEpoch[vindex] (mark.subnet > epoch) or (mark.aggregate > epoch) else: false