# beacon_chain # Copyright (c) 2018-2021 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. {.push raises: [Defect].} import # Standard libraries std/[options, tables, sequtils], # Status libraries metrics, chronicles, stew/byteutils, json_serialization/std/sets as jsonSets, # Internal ../spec/[beaconstate, eth2_merkleization, forks, helpers, validator], ../spec/datatypes/[phase0, altair], "."/[spec_cache, blockchain_dag, block_quarantine], ".."/[beacon_clock, beacon_node_types], ../fork_choice/fork_choice export beacon_node_types logScope: topics = "attpool" declareGauge attestation_pool_block_attestation_packing_time, "Time it took to create list of attestations for block" proc init*(T: type AttestationPool, dag: ChainDAGRef, quarantine: QuarantineRef, onAttestation: OnAttestationCallback = nil): T = ## Initialize an AttestationPool from the dag `headState` ## The `finalized_root` works around the finalized_checkpoint of the genesis block ## holding a zero_root. let finalizedEpochRef = dag.getFinalizedEpochRef() var forkChoice = ForkChoice.init( finalizedEpochRef, dag.finalizedHead.blck) # Feed fork choice with unfinalized history - during startup, block pool only # keeps track of a single history so we just need to follow it doAssert dag.heads.len == 1, "Init only supports a single history" var blocks: seq[BlockRef] var cur = dag.head # When the chain is finalizing, the votes between the head block and the # finalized checkpoint should be enough for a stable fork choice - when the # chain is not finalizing, we want to seed it with as many votes as possible # since the whole history of each branch might be significant. It is however # a game of diminishing returns, and we have to weigh it against the time # it takes to replay that many blocks during startup and thus miss _new_ # votes. const ForkChoiceHorizon = 256 while cur != dag.finalizedHead.blck: blocks.add cur cur = cur.parent info "Initializing fork choice", unfinalized_blocks = blocks.len var epochRef = finalizedEpochRef for i in 0..= pool.startingSlot and slot < (pool.startingSlot + pool.candidates.lenu64): some(int(slot mod pool.candidates.lenu64)) else: none(int) proc updateCurrent(pool: var AttestationPool, wallSlot: Slot) = if wallSlot + 1 < pool.candidates.lenu64: return # Genesis let newStartingSlot = wallSlot + 1 - pool.candidates.lenu64 if newStartingSlot < pool.startingSlot: error "Current slot older than attestation pool view, clock reset?", startingSlot = pool.startingSlot, newStartingSlot, wallSlot return # As time passes we'll clear out any old attestations as they are no longer # viable to be included in blocks if newStartingSlot - pool.startingSlot >= pool.candidates.lenu64(): # In case many slots passed since the last update, avoid iterating over # the same indices over and over pool.candidates = default(type(pool.candidates)) else: for i in pool.startingSlot..newStartingSlot: pool.candidates[i.uint64 mod pool.candidates.lenu64] = AttestationTable() pool.startingSlot = newStartingSlot proc oneIndex(bits: CommitteeValidatorsBits): Option[int] = # Find the index of the set bit, iff one bit is set var res = none(int) for idx in 0.. maxAttestationSlot: # Around genesis.. break let slot = Slot(maxAttestationSlot - i) candidateIdx = pool.candidateIdx(slot) if candidateIdx.isNone(): # Passed the collection horizon - shouldn't happen because it's based on # ATTESTATION_LOOKBACK break for _, entry in pool.candidates[candidateIdx.get()].mpairs(): entry.updateAggregates() for j in 0.. 0 and res.lenu64() < MAX_ATTESTATIONS: block: # Find the candidate with the highest score - slot is used as a # tie-breaker so that more recent attestations are added first let candidate = # Fast path for when all remaining candidates fit if candidates.lenu64 < MAX_ATTESTATIONS: candidates.len - 1 else: maxIndex(candidates) # TODO slot not used; replace with _ when # https://github.com/nim-lang/Nim/issues/15972 and # https://github.com/nim-lang/Nim/issues/16217 are # fixed in Status's Nim. (_, slot, entry, j) = candidates[candidate] candidates.del(candidate) # careful, `del` reorders candidates if entry[].data.target.epoch == prevEpoch: if prevEpochSpace < 1: continue # No need to rescore since we didn't add the attestation prevEpochSpace -= 1 res.add(entry[].toAttestation(entry[].aggregates[j])) # Update cache so that the new votes are taken into account when updating # the score below attCache.add(entry[].data, entry[].aggregates[j].aggregation_bits) block: # Because we added some votes, it's quite possible that some candidates # are no longer interesting - update the scores of the existing candidates for it in candidates.mitems(): it.score = attCache.score( it.entry[].data, it.entry[].aggregates[it.validation].aggregation_bits) candidates.keepItIf: # Only keep candidates that might add coverage it.score > 0 let packingDur = Moment.now() - startPackingTick debug "Packed attestations for block", newBlockSlot, packingDur, totalCandidates, attestations = res.len() attestation_pool_block_attestation_packing_time.set( packingDur.toFloatSeconds()) res func bestValidation(aggregates: openArray[Validation]): (int, int) = # Look for best validation based on number of votes in the aggregate doAssert aggregates.len() > 0, "updateAggregates should have created at least one aggregate" var bestIndex = 0 best = aggregates[bestIndex].aggregation_bits.countOnes() for i in 1.. best: best = count bestIndex = i (bestIndex, best) func getAggregatedAttestation*(pool: var AttestationPool, slot: Slot, attestation_data_root: Eth2Digest): Option[Attestation] = let candidateIdx = pool.candidateIdx(slot) if candidateIdx.isNone: return none(Attestation) pool.candidates[candidateIdx.get].withValue(attestation_data_root, entry): entry[].updateAggregates() let (bestIndex, _) = bestValidation(entry[].aggregates) # Found the right hash, no need to look further return some(entry[].toAttestation(entry[].aggregates[bestIndex])) none(Attestation) proc getAggregatedAttestation*(pool: var AttestationPool, slot: Slot, index: CommitteeIndex): Option[Attestation] = ## Select the attestation that has the most votes going for it in the given ## slot/index ## https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#construct-aggregate let candidateIdx = pool.candidateIdx(slot) if candidateIdx.isNone: return none(Attestation) var res: Option[Attestation] for _, entry in pool.candidates[candidateIdx.get].mpairs(): doAssert entry.data.slot == slot if index.uint64 != entry.data.index: continue entry.updateAggregates() let (bestIndex, best) = bestValidation(entry.aggregates) if res.isNone() or best > res.get().aggregation_bits.countOnes(): res = some(entry.toAttestation(entry.aggregates[bestIndex])) res proc selectHead*(pool: var AttestationPool, wallSlot: Slot): BlockRef = ## Trigger fork choice and returns the new head block. ## Can return `nil` let newHead = pool.forkChoice.get_head(pool.dag, wallSlot) if newHead.isErr: error "Couldn't select head", err = newHead.error nil else: let ret = pool.dag.getRef(newHead.get()) if ret.isNil: # This should normally not happen, but if the chain dag and fork choice # get out of sync, we'll need to try to download the selected head - in # the meantime, return nil to indicate that no new head was chosen warn "Fork choice selected unknown head, trying to sync", root = newHead.get() pool.quarantine.addMissing(newHead.get()) ret proc prune*(pool: var AttestationPool) = if (let v = pool.forkChoice.prune(); v.isErr): # If pruning fails, it's likely the result of a bug - this shouldn't happen # but we'll keep running hoping that the fork chocie will recover eventually error "Couldn't prune fork choice, bug?", err = v.error()