492 lines
20 KiB
Nim
492 lines
20 KiB
Nim
# beacon_chain
|
|
# Copyright (c) 2018-2021 Status Research & Development GmbH
|
|
# Licensed and distributed under either of
|
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
|
|
{.push raises: [Defect].}
|
|
|
|
import
|
|
# Standard libraries
|
|
std/[options, tables, sequtils],
|
|
# Status libraries
|
|
chronicles, stew/[byteutils], json_serialization/std/sets as jsonSets,
|
|
# Internal
|
|
../spec/[beaconstate, datatypes, crypto, digest],
|
|
../ssz/merkleization,
|
|
"."/[spec_cache, blockchain_dag, block_quarantine],
|
|
../beacon_node_types,
|
|
../fork_choice/fork_choice
|
|
|
|
export beacon_node_types
|
|
|
|
logScope: topics = "attpool"
|
|
|
|
proc init*(T: type AttestationPool, chainDag: ChainDAGRef, quarantine: QuarantineRef): T =
|
|
## Initialize an AttestationPool from the chainDag `headState`
|
|
## The `finalized_root` works around the finalized_checkpoint of the genesis block
|
|
## holding a zero_root.
|
|
let finalizedEpochRef = chainDag.getFinalizedEpochRef()
|
|
|
|
var forkChoice = ForkChoice.init(
|
|
finalizedEpochRef,
|
|
chainDag.finalizedHead.blck)
|
|
|
|
# Feed fork choice with unfinalized history - during startup, block pool only
|
|
# keeps track of a single history so we just need to follow it
|
|
doAssert chainDag.heads.len == 1, "Init only supports a single history"
|
|
|
|
var blocks: seq[BlockRef]
|
|
var cur = chainDag.head
|
|
|
|
# When the chain is finalizing, the votes between the head block and the
|
|
# finalized checkpoint should be enough for a stable fork choice - when the
|
|
# chain is not finalizing, we want to seed it with as many votes as possible
|
|
# since the whole history of each branch might be significant. It is however
|
|
# a game of diminishing returns, and we have to weigh it against the time
|
|
# it takes to replay that many blocks during startup and thus miss _new_
|
|
# votes.
|
|
const ForkChoiceHorizon = 256
|
|
while cur != chainDag.finalizedHead.blck:
|
|
blocks.add cur
|
|
cur = cur.parent
|
|
|
|
info "Initializing fork choice", unfinalized_blocks = blocks.len
|
|
|
|
var epochRef = finalizedEpochRef
|
|
for i in 0..<blocks.len:
|
|
let
|
|
blck = blocks[blocks.len - i - 1]
|
|
status =
|
|
if i < (blocks.len - ForkChoiceHorizon) and (i mod 1024 != 0):
|
|
# Fork choice needs to know about the full block tree up to the
|
|
# finalization point, but doesn't really need to have overly accurate
|
|
# justification and finalization points until we get close to head -
|
|
# nonetheless, we'll make sure to pass a fresh finalization point now
|
|
# and then to make sure the fork choice data structure doesn't grow
|
|
# too big - getting an EpochRef can be expensive.
|
|
forkChoice.backend.process_block(
|
|
blck.root, blck.parent.root,
|
|
epochRef.current_justified_checkpoint.epoch,
|
|
epochRef.finalized_checkpoint.epoch)
|
|
else:
|
|
epochRef = chainDag.getEpochRef(blck, blck.slot.epoch)
|
|
forkChoice.process_block(
|
|
chainDag, epochRef, blck, chainDag.get(blck).data.message, blck.slot)
|
|
|
|
doAssert status.isOk(), "Error in preloading the fork choice: " & $status.error
|
|
|
|
info "Fork choice initialized",
|
|
justified_epoch = chainDag.headState.data.data.current_justified_checkpoint.epoch,
|
|
finalized_epoch = chainDag.headState.data.data.finalized_checkpoint.epoch,
|
|
finalized_root = shortlog(chainDag.finalizedHead.blck.root)
|
|
|
|
T(
|
|
chainDag: chainDag,
|
|
quarantine: quarantine,
|
|
forkChoice: forkChoice
|
|
)
|
|
|
|
proc addForkChoiceVotes(
|
|
pool: var AttestationPool, slot: Slot, participants: seq[ValidatorIndex],
|
|
block_root: Eth2Digest, wallSlot: Slot) =
|
|
# Add attestation votes to fork choice
|
|
if (let v = pool.forkChoice.on_attestation(
|
|
pool.chainDag, slot, block_root, participants, wallSlot);
|
|
v.isErr):
|
|
# This indicates that the fork choice and the chain dag are out of sync -
|
|
# this is most likely the result of a bug, but we'll try to keep going -
|
|
# hopefully the fork choice will heal itself over time.
|
|
error "Couldn't add attestation to fork choice, bug?", err = v.error()
|
|
|
|
func candidateIdx(pool: AttestationPool, slot: Slot): Option[uint64] =
|
|
if slot >= pool.startingSlot and
|
|
slot < (pool.startingSlot + pool.candidates.lenu64):
|
|
some(slot mod pool.candidates.lenu64)
|
|
else:
|
|
none(uint64)
|
|
|
|
proc updateCurrent(pool: var AttestationPool, wallSlot: Slot) =
|
|
if wallSlot + 1 < pool.candidates.lenu64:
|
|
return
|
|
|
|
if pool.startingSlot + pool.candidates.lenu64 - 1 > wallSlot:
|
|
error "Current slot older than attestation pool view, clock reset?",
|
|
poolSlot = pool.startingSlot, wallSlot
|
|
return
|
|
|
|
# As time passes we'll clear out any old attestations as they are no longer
|
|
# viable to be included in blocks
|
|
|
|
let newWallSlot = wallSlot + 1 - pool.candidates.lenu64
|
|
for i in pool.startingSlot..newWallSlot:
|
|
pool.candidates[i.uint64 mod pool.candidates.lenu64] = AttestationsSeen()
|
|
|
|
pool.startingSlot = newWallSlot
|
|
|
|
# now also clear old aggregated attestations
|
|
var keysToRemove: seq[Slot] = @[]
|
|
for k, v in pool.attestationAggregates.pairs:
|
|
if k < pool.startingSlot:
|
|
keysToRemove.add k
|
|
for k in keysToRemove:
|
|
pool.attestationAggregates.del k
|
|
|
|
func addToAggregates(pool: var AttestationPool, attestation: Attestation) =
|
|
# do a lookup for the current slot and get it's associated htrs/attestations
|
|
var aggregated_attestation = pool.attestationAggregates.mgetOrPut(
|
|
attestation.data.slot, Table[Eth2Digest, Attestation]()).
|
|
# do a lookup for the same attestation data htr and get the attestation
|
|
mgetOrPut(attestation.data.hash_tree_root, attestation)
|
|
# if the aggregation bits differ (we didn't just insert it into the table)
|
|
# and only if there is no overlap of the signatures ==> aggregate!
|
|
if not aggregated_attestation.aggregation_bits.overlaps(attestation.aggregation_bits):
|
|
var agg {.noInit.}: AggregateSignature
|
|
agg.init(aggregated_attestation.signature)
|
|
aggregated_attestation.aggregation_bits.combine(attestation.aggregation_bits)
|
|
agg.aggregate(attestation.signature)
|
|
aggregated_attestation.signature = agg.finish()
|
|
|
|
proc addAttestation*(pool: var AttestationPool,
|
|
attestation: Attestation,
|
|
participants: seq[ValidatorIndex],
|
|
wallSlot: Slot) =
|
|
## Add an attestation to the pool, assuming it's been validated already.
|
|
## Attestations may be either agggregated or not - we're pursuing an eager
|
|
## strategy where we'll drop validations we already knew about and combine
|
|
## the new attestation with an existing one if possible.
|
|
##
|
|
## This strategy is not optimal in the sense that it would be possible to find
|
|
## a better packing of attestations by delaying the aggregation, but because
|
|
## it's possible to include more than one aggregate in a block we can be
|
|
## somewhat lazy instead of looking for a perfect packing.
|
|
logScope:
|
|
attestation = shortLog(attestation)
|
|
|
|
updateCurrent(pool, wallSlot)
|
|
|
|
let candidateIdx = pool.candidateIdx(attestation.data.slot)
|
|
if candidateIdx.isNone:
|
|
debug "Skipping old attestation for block production",
|
|
startingSlot = pool.startingSlot
|
|
return
|
|
|
|
pool.addToAggregates(attestation)
|
|
|
|
let
|
|
attestationsSeen = addr pool.candidates[candidateIdx.get]
|
|
validation = Validation(
|
|
aggregation_bits: attestation.aggregation_bits,
|
|
aggregate_signature: attestation.signature)
|
|
|
|
var found = false
|
|
for a in attestationsSeen.attestations.mitems():
|
|
if a.data == attestation.data:
|
|
for v in a.validations:
|
|
if validation.aggregation_bits.isSubsetOf(v.aggregation_bits):
|
|
# The validations in the new attestation are a subset of one of the
|
|
# attestations that we already have on file - no need to add this
|
|
# attestation to the database
|
|
trace "Ignoring subset attestation", newParticipants = participants
|
|
found = true
|
|
break
|
|
|
|
if not found:
|
|
# Attestations in the pool that are a subset of the new attestation
|
|
# can now be removed per same logic as above
|
|
|
|
trace "Removing subset attestations", newParticipants = participants
|
|
|
|
a.validations.keepItIf(
|
|
not it.aggregation_bits.isSubsetOf(validation.aggregation_bits))
|
|
|
|
a.validations.add(validation)
|
|
pool.addForkChoiceVotes(
|
|
attestation.data.slot, participants, attestation.data.beacon_block_root,
|
|
wallSlot)
|
|
|
|
debug "Attestation resolved",
|
|
attestation = shortLog(attestation),
|
|
validations = a.validations.len()
|
|
|
|
found = true
|
|
|
|
break
|
|
|
|
if not found:
|
|
attestationsSeen.attestations.add(AttestationEntry(
|
|
data: attestation.data,
|
|
validations: @[validation],
|
|
aggregation_bits: attestation.aggregation_bits
|
|
))
|
|
pool.addForkChoiceVotes(
|
|
attestation.data.slot, participants, attestation.data.beacon_block_root,
|
|
wallSlot)
|
|
|
|
debug "Attestation resolved",
|
|
attestation = shortLog(attestation),
|
|
validations = 1
|
|
|
|
proc addForkChoice*(pool: var AttestationPool,
|
|
epochRef: EpochRef,
|
|
blckRef: BlockRef,
|
|
blck: TrustedBeaconBlock,
|
|
wallSlot: Slot) =
|
|
## Add a verified block to the fork choice context
|
|
let state = pool.forkChoice.process_block(
|
|
pool.chainDag, epochRef, blckRef, blck, wallSlot)
|
|
|
|
if state.isErr:
|
|
# This indicates that the fork choice and the chain dag are out of sync -
|
|
# this is most likely the result of a bug, but we'll try to keep going -
|
|
# hopefully the fork choice will heal itself over time.
|
|
error "Couldn't add block to fork choice, bug?",
|
|
blck = shortLog(blck), err = state.error
|
|
|
|
proc getAttestationsForSlot(pool: AttestationPool, newBlockSlot: Slot):
|
|
Option[AttestationsSeen] =
|
|
if newBlockSlot < (GENESIS_SLOT + MIN_ATTESTATION_INCLUSION_DELAY):
|
|
debug "Too early for attestations", newBlockSlot = shortLog(newBlockSlot)
|
|
return none(AttestationsSeen)
|
|
|
|
let
|
|
attestationSlot = newBlockSlot - MIN_ATTESTATION_INCLUSION_DELAY
|
|
candidateIdx = pool.candidateIdx(attestationSlot)
|
|
|
|
if candidateIdx.isNone:
|
|
trace "No attestations matching the slot range",
|
|
attestationSlot = shortLog(attestationSlot),
|
|
startingSlot = shortLog(pool.startingSlot)
|
|
return none(AttestationsSeen)
|
|
|
|
some(pool.candidates[candidateIdx.get()])
|
|
|
|
iterator attestations*(pool: AttestationPool, slot: Option[Slot],
|
|
index: Option[CommitteeIndex]): Attestation =
|
|
for seenAttestations in pool.candidates.items():
|
|
for entry in seenAttestations.attestations.items():
|
|
let slotInclude =
|
|
if slot.isSome():
|
|
entry.data.slot == slot.get()
|
|
else:
|
|
true
|
|
let committeeInclude =
|
|
if index.isSome():
|
|
CommitteeIndex(entry.data.index) == index.get()
|
|
else:
|
|
true
|
|
if slotInclude or committeeInclude:
|
|
for validation in entry.validations.items():
|
|
yield Attestation(
|
|
aggregation_bits: validation.aggregation_bits,
|
|
data: entry.data,
|
|
signature: validation.aggregate_signature
|
|
)
|
|
|
|
func getAttestationDataKey(ad: AttestationData): AttestationDataKey =
|
|
# This determines the rest of the AttestationData
|
|
(ad.slot, ad.index, ad.source.epoch, ad.target.epoch)
|
|
|
|
func incorporateCacheAttestation(
|
|
pool: var AttestationPool, attestation: PendingAttestation) =
|
|
let key = attestation.data.getAttestationDataKey
|
|
try:
|
|
var validatorBits = pool.attestedValidators[key]
|
|
validatorBits.combine(attestation.aggregation_bits)
|
|
pool.attestedValidators[key] = validatorBits
|
|
except KeyError:
|
|
pool.attestedValidators[key] = attestation.aggregation_bits
|
|
|
|
func populateAttestationCache(pool: var AttestationPool, state: BeaconState) =
|
|
pool.attestedValidators.clear()
|
|
|
|
for pendingAttestation in state.previous_epoch_attestations:
|
|
pool.incorporateCacheAttestation(pendingAttestation)
|
|
|
|
for pendingAttestation in state.current_epoch_attestations:
|
|
pool.incorporateCacheAttestation(pendingAttestation)
|
|
|
|
func updateAttestationsCache(pool: var AttestationPool,
|
|
state: BeaconState) =
|
|
# There have likely been additional attestations integrated into BeaconState
|
|
# since last block production, an epoch change, or even a tree restructuring
|
|
# so that there's nothing in common in the BeaconState altogether, since the
|
|
# last time requested.
|
|
if (
|
|
(pool.lastPreviousEpochAttestationsLen == 0 or
|
|
(pool.lastPreviousEpochAttestationsLen <= state.previous_epoch_attestations.len and
|
|
pool.lastPreviousEpochAttestation ==
|
|
state.previous_epoch_attestations[pool.lastPreviousEpochAttestationsLen - 1])) and
|
|
(pool.lastCurrentEpochAttestationsLen == 0 or
|
|
(pool.lastCurrentEpochAttestationsLen <= state.current_epoch_attestations.len and
|
|
pool.lastCurrentEpochAttestation ==
|
|
state.current_epoch_attestations[pool.lastCurrentEpochAttestationsLen - 1]))
|
|
):
|
|
# There are multiple validators attached to this node proposing in the
|
|
# same epoch. As such, incorporate that new attestation. Both previous
|
|
# and current attestations lists might have been appended to.
|
|
for i in pool.lastPreviousEpochAttestationsLen ..<
|
|
state.previous_epoch_attestations.len:
|
|
pool.incorporateCacheAttestation(state.previous_epoch_attestations[i])
|
|
for i in pool.lastCurrentEpochAttestationsLen ..<
|
|
state.current_epoch_attestations.len:
|
|
pool.incorporateCacheAttestation(state.current_epoch_attestations[i])
|
|
else:
|
|
# Tree restructuring or other cache flushing event. This must trigger
|
|
# sometimes to clear old attestations.
|
|
pool.populateAttestationCache(state)
|
|
|
|
pool.lastPreviousEpochAttestationsLen = state.previous_epoch_attestations.len
|
|
pool.lastCurrentEpochAttestationsLen = state.current_epoch_attestations.len
|
|
if pool.lastPreviousEpochAttestationsLen > 0:
|
|
pool.lastPreviousEpochAttestation =
|
|
state.previous_epoch_attestations[pool.lastPreviousEpochAttestationsLen - 1]
|
|
if pool.lastCurrentEpochAttestationsLen > 0:
|
|
pool.lastCurrentEpochAttestation =
|
|
state.current_epoch_attestations[pool.lastCurrentEpochAttestationsLen - 1]
|
|
|
|
proc getAttestationsForBlock*(pool: var AttestationPool,
|
|
state: BeaconState,
|
|
cache: var StateCache): seq[Attestation] =
|
|
## Retrieve attestations that may be added to a new block at the slot of the
|
|
## given state
|
|
let newBlockSlot = state.slot.uint64
|
|
var attestations: seq[AttestationEntry]
|
|
|
|
pool.updateAttestationsCache(state)
|
|
|
|
# Consider attestations from the current slot and ranging back up to
|
|
# ATTESTATION_LOOKBACK slots, excluding the special genesis slot. As
|
|
# unsigned subtraction (mostly avoided in this codebase, partly as a
|
|
# consequence) will otherwise wrap through zero, clamp value which's
|
|
# subtracted so that slots through ATTESTATION_LOOKBACK don't do so.
|
|
for i in max(
|
|
1'u64, newBlockSlot - min(newBlockSlot, ATTESTATION_LOOKBACK)) ..
|
|
newBlockSlot:
|
|
let maybeSlotData = getAttestationsForSlot(pool, i.Slot)
|
|
if maybeSlotData.isSome:
|
|
insert(attestations, maybeSlotData.get.attestations)
|
|
|
|
if attestations.len == 0:
|
|
return
|
|
|
|
for a in attestations:
|
|
var
|
|
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#construct-attestation
|
|
attestation = Attestation(
|
|
aggregation_bits: a.validations[0].aggregation_bits,
|
|
data: a.data,
|
|
signature: a.validations[0].aggregate_signature
|
|
)
|
|
|
|
agg {.noInit.}: AggregateSignature
|
|
agg.init(a.validations[0].aggregate_signature)
|
|
|
|
# Signature verification here is more of a sanity check - it could
|
|
# be optimized away, though for now it's easier to reuse the logic from
|
|
# the state transition function to ensure that the new block will not
|
|
# fail application.
|
|
if (let v = check_attestation(state, attestation, {}, cache); v.isErr):
|
|
warn "Attestation no longer validates...",
|
|
attestation = shortLog(attestation),
|
|
err = v.error
|
|
|
|
continue
|
|
|
|
for i in 1..a.validations.high:
|
|
if not attestation.aggregation_bits.overlaps(
|
|
a.validations[i].aggregation_bits):
|
|
attestation.aggregation_bits.combine(a.validations[i].aggregation_bits)
|
|
agg.aggregate(a.validations[i].aggregate_signature)
|
|
|
|
# Since each validator attests exactly once per epoch and its attestation
|
|
# has been validated to have been included in the attestation pool, there
|
|
# only exists one possible slot/committee combination to check.
|
|
try:
|
|
if a.aggregation_bits.isSubsetOf(
|
|
pool.attestedValidators[a.data.getAttestationDataKey]):
|
|
continue
|
|
except KeyError:
|
|
# No record of inclusion, so go ahead and include attestation.
|
|
discard
|
|
|
|
attestation.signature = agg.finish()
|
|
result.add(attestation)
|
|
|
|
if result.lenu64 >= MAX_ATTESTATIONS:
|
|
debug "getAttestationsForBlock: returning early after hitting MAX_ATTESTATIONS",
|
|
attestationSlot = newBlockSlot - 1
|
|
return
|
|
|
|
func getAggregatedAttestation*(pool: AttestationPool,
|
|
slot: Slot,
|
|
ad_htr: Eth2Digest): Option[Attestation] =
|
|
try:
|
|
if pool.attestationAggregates.contains(slot) and
|
|
pool.attestationAggregates[slot].contains(ad_htr):
|
|
return some pool.attestationAggregates[slot][ad_htr]
|
|
except KeyError:
|
|
doAssert(false) # shouldn't be possible because we check with `contains`
|
|
none(Attestation)
|
|
|
|
proc getAggregatedAttestation*(pool: AttestationPool,
|
|
slot: Slot,
|
|
index: CommitteeIndex): Option[Attestation] =
|
|
let attestations = pool.getAttestationsForSlot(
|
|
slot + MIN_ATTESTATION_INCLUSION_DELAY)
|
|
if attestations.isNone:
|
|
return none(Attestation)
|
|
|
|
for a in attestations.get.attestations:
|
|
doAssert a.data.slot == slot
|
|
if index.uint64 != a.data.index:
|
|
continue
|
|
|
|
var
|
|
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#construct-attestation
|
|
attestation = Attestation(
|
|
aggregation_bits: a.validations[0].aggregation_bits,
|
|
data: a.data,
|
|
signature: a.validations[0].aggregate_signature
|
|
)
|
|
|
|
agg {.noInit.}: AggregateSignature
|
|
|
|
agg.init(a.validations[0].aggregate_signature)
|
|
for v in a.validations[1..^1]:
|
|
if not attestation.aggregation_bits.overlaps(v.aggregation_bits):
|
|
attestation.aggregation_bits.combine(v.aggregation_bits)
|
|
agg.aggregate(v.aggregate_signature)
|
|
|
|
attestation.signature = agg.finish()
|
|
|
|
return some(attestation)
|
|
|
|
none(Attestation)
|
|
|
|
proc selectHead*(pool: var AttestationPool, wallSlot: Slot): BlockRef =
|
|
## Trigger fork choice and returns the new head block.
|
|
## Can return `nil`
|
|
let newHead = pool.forkChoice.get_head(pool.chainDag, wallSlot)
|
|
|
|
if newHead.isErr:
|
|
error "Couldn't select head", err = newHead.error
|
|
nil
|
|
else:
|
|
let ret = pool.chainDag.getRef(newHead.get())
|
|
if ret.isNil:
|
|
# This should normally not happen, but if the chain dag and fork choice
|
|
# get out of sync, we'll need to try to download the selected head - in
|
|
# the meantime, return nil to indicate that no new head was chosen
|
|
warn "Fork choice selected unknown head, trying to sync", root = newHead.get()
|
|
pool.quarantine.addMissing(newHead.get())
|
|
|
|
ret
|
|
|
|
proc prune*(pool: var AttestationPool) =
|
|
if (let v = pool.forkChoice.prune(); v.isErr):
|
|
# If pruning fails, it's likely the result of a bug - this shouldn't happen
|
|
# but we'll keep running hoping that the fork chocie will recover eventually
|
|
error "Couldn't prune fork choice, bug?", err = v.error()
|