attestation processing speedups

* avoid creating indexed attestation just to check signatures - above
all, don't create it when not checking signatures ;)
* avoid pointer op when adding attestation to pool
* better iterator for yielding attestations
* add metric / log for attestation packing time
This commit is contained in:
Jacek Sieka 2021-04-14 16:43:29 +02:00 committed by zah
parent 6806ffe1c8
commit f1f424cc2d
5 changed files with 135 additions and 108 deletions

View File

@ -11,18 +11,22 @@ import
# Standard libraries # Standard libraries
std/[options, tables, sequtils], std/[options, tables, sequtils],
# Status libraries # Status libraries
metrics,
chronicles, stew/byteutils, json_serialization/std/sets as jsonSets, chronicles, stew/byteutils, json_serialization/std/sets as jsonSets,
# Internal # Internal
../spec/[beaconstate, datatypes, crypto, digest, validator], ../spec/[beaconstate, datatypes, crypto, digest, validator],
../ssz/merkleization, ../ssz/merkleization,
"."/[spec_cache, blockchain_dag, block_quarantine], "."/[spec_cache, blockchain_dag, block_quarantine],
../beacon_node_types, ../extras, ".."/[beacon_clock, beacon_node_types, extras],
../fork_choice/fork_choice ../fork_choice/fork_choice
export beacon_node_types export beacon_node_types
logScope: topics = "attpool" logScope: topics = "attpool"
declareGauge attestation_pool_block_attestation_packing_time,
"Time it took to create list of attestations for block"
proc init*(T: type AttestationPool, chainDag: ChainDAGRef, quarantine: QuarantineRef): T = proc init*(T: type AttestationPool, chainDag: ChainDAGRef, quarantine: QuarantineRef): T =
## Initialize an AttestationPool from the chainDag `headState` ## Initialize an AttestationPool from the chainDag `headState`
## The `finalized_root` works around the finalized_checkpoint of the genesis block ## The `finalized_root` works around the finalized_checkpoint of the genesis block
@ -102,12 +106,12 @@ proc addForkChoiceVotes(
# hopefully the fork choice will heal itself over time. # hopefully the fork choice will heal itself over time.
error "Couldn't add attestation to fork choice, bug?", err = v.error() error "Couldn't add attestation to fork choice, bug?", err = v.error()
func candidateIdx(pool: AttestationPool, slot: Slot): Option[uint64] = func candidateIdx(pool: AttestationPool, slot: Slot): Option[int] =
if slot >= pool.startingSlot and if slot >= pool.startingSlot and
slot < (pool.startingSlot + pool.candidates.lenu64): slot < (pool.startingSlot + pool.candidates.lenu64):
some(slot mod pool.candidates.lenu64) some(int(slot mod pool.candidates.lenu64))
else: else:
none(uint64) none(int)
proc updateCurrent(pool: var AttestationPool, wallSlot: Slot) = proc updateCurrent(pool: var AttestationPool, wallSlot: Slot) =
if wallSlot + 1 < pool.candidates.lenu64: if wallSlot + 1 < pool.candidates.lenu64:
@ -210,6 +214,52 @@ func updateAggregates(entry: var AttestationEntry) =
inc j inc j
inc i inc i
proc addAttestation(entry: var AttestationEntry,
attestation: Attestation,
signature: CookedSig): bool =
logScope:
attestation = shortLog(attestation)
let
singleIndex = oneIndex(attestation.aggregation_bits)
if singleIndex.isSome():
if singleIndex.get() in entry.singles:
trace "Attestation already seen",
singles = entry.singles.len(),
aggregates = entry.aggregates.len()
return false
debug "Attestation resolved",
singles = entry.singles.len(),
aggregates = entry.aggregates.len()
entry.singles[singleIndex.get()] = signature
else:
# More than one vote in this attestation
for i in 0..<entry.aggregates.len():
if attestation.aggregation_bits.isSubsetOf(entry.aggregates[i].aggregation_bits):
trace "Aggregate already seen",
singles = entry.singles.len(),
aggregates = entry.aggregates.len()
return false
# Since we're adding a new aggregate, we can now remove existing
# aggregates that don't add any new votes
entry.aggregates.keepItIf(
not it.aggregation_bits.isSubsetOf(attestation.aggregation_bits))
entry.aggregates.add(Validation(
aggregation_bits: attestation.aggregation_bits,
aggregate_signature: AggregateSignature.init(signature)))
debug "Aggregate resolved",
singles = entry.singles.len(),
aggregates = entry.aggregates.len()
true
proc addAttestation*(pool: var AttestationPool, proc addAttestation*(pool: var AttestationPool,
attestation: Attestation, attestation: Attestation,
participants: seq[ValidatorIndex], participants: seq[ValidatorIndex],
@ -234,50 +284,23 @@ proc addAttestation*(pool: var AttestationPool,
startingSlot = pool.startingSlot startingSlot = pool.startingSlot
return return
let let attestation_data_root = hash_tree_root(attestation.data)
singleIndex = oneIndex(attestation.aggregation_bits)
root = hash_tree_root(attestation.data)
# Careful with pointer, candidate table must not be touched after here
entry = addr pool.candidates[candidateIdx.get].mGetOrPut(
root,
AttestationEntry(
data: attestation.data,
committee_len: attestation.aggregation_bits.len()))
if singleIndex.isSome():
if singleIndex.get() in entry[].singles:
trace "Attestation already seen",
singles = entry[].singles.len(),
aggregates = entry[].aggregates.len()
# TODO withValue is an abomination but hard to use anything else too without
# creating an unnecessary AttestationEntry on the hot path and avoiding
# multiple lookups
pool.candidates[candidateIdx.get()].withValue(attestation_data_root, entry) do:
if not addAttestation(entry[], attestation, signature):
return
do:
if not addAttestation(
pool.candidates[candidateIdx.get()].mGetOrPut(
attestation_data_root,
AttestationEntry(
data: attestation.data,
committee_len: attestation.aggregation_bits.len())),
attestation, signature):
return return
debug "Attestation resolved",
singles = entry[].singles.len(),
aggregates = entry[].aggregates.len()
entry[].singles[singleIndex.get()] = signature
else:
# More than one vote in this attestation
for i in 0..<entry[].aggregates.len():
if attestation.aggregation_bits.isSubsetOf(entry[].aggregates[i].aggregation_bits):
trace "Aggregate already seen",
singles = entry[].singles.len(),
aggregates = entry[].aggregates.len()
return
# Since we're adding a new aggregate, we can now remove existing
# aggregates that don't add any new votes
entry[].aggregates.keepItIf(
not it.aggregation_bits.isSubsetOf(attestation.aggregation_bits))
entry[].aggregates.add(Validation(
aggregation_bits: attestation.aggregation_bits,
aggregate_signature: AggregateSignature.init(signature)))
debug "Aggregate resolved",
singles = entry[].singles.len(),
aggregates = entry[].aggregates.len()
pool.addForkChoiceVotes( pool.addForkChoiceVotes(
attestation.data.slot, participants, attestation.data.beacon_block_root, attestation.data.slot, participants, attestation.data.beacon_block_root,
@ -301,8 +324,18 @@ proc addForkChoice*(pool: var AttestationPool,
iterator attestations*(pool: AttestationPool, slot: Option[Slot], iterator attestations*(pool: AttestationPool, slot: Option[Slot],
index: Option[CommitteeIndex]): Attestation = index: Option[CommitteeIndex]): Attestation =
template processTable(table: AttestationTable) = let candidateIndices =
for _, entry in table: if slot.isSome():
let candidateIdx = pool.candidateIdx(slot.get())
if candidateIdx.isSome():
candidateIdx.get() .. candidateIdx.get()
else:
1 .. 0
else:
0 ..< pool.candidates.len()
for candidateIndex in candidateIndices:
for _, entry in pool.candidates[candidateIndex]:
if index.isNone() or entry.data.index == index.get().uint64: if index.isNone() or entry.data.index == index.get().uint64:
var singleAttestation = Attestation( var singleAttestation = Attestation(
aggregation_bits: CommitteeValidatorsBits.init(entry.committee_len), aggregation_bits: CommitteeValidatorsBits.init(entry.committee_len),
@ -317,14 +350,6 @@ iterator attestations*(pool: AttestationPool, slot: Option[Slot],
for v in entry.aggregates: for v in entry.aggregates:
yield entry.toAttestation(v) yield entry.toAttestation(v)
if slot.isSome():
let candidateIdx = pool.candidateIdx(slot.get())
if candidateIdx.isSome():
processTable(pool.candidates[candidateIdx.get()])
else:
for i in 0..<pool.candidates.len():
processTable(pool.candidates[i])
type type
AttestationCacheKey* = (Slot, uint64) AttestationCacheKey* = (Slot, uint64)
AttestationCache = Table[AttestationCacheKey, CommitteeValidatorsBits] ##\ AttestationCache = Table[AttestationCacheKey, CommitteeValidatorsBits] ##\
@ -393,6 +418,7 @@ proc getAttestationsForBlock*(pool: var AttestationPool,
# Attestations produced in a particular slot are added to the block # Attestations produced in a particular slot are added to the block
# at the slot where at least MIN_ATTESTATION_INCLUSION_DELAY have passed # at the slot where at least MIN_ATTESTATION_INCLUSION_DELAY have passed
maxAttestationSlot = newBlockSlot - MIN_ATTESTATION_INCLUSION_DELAY maxAttestationSlot = newBlockSlot - MIN_ATTESTATION_INCLUSION_DELAY
startPackingTime = Moment.now()
var var
candidates: seq[tuple[ candidates: seq[tuple[
@ -422,9 +448,8 @@ proc getAttestationsForBlock*(pool: var AttestationPool,
# Attestations are checked based on the state that we're adding the # Attestations are checked based on the state that we're adding the
# attestation to - there might have been a fork between when we first # attestation to - there might have been a fork between when we first
# saw the attestation and the time that we added it # saw the attestation and the time that we added it
# TODO avoid creating a full attestation here and instead do the checks if not check_attestation(
# based on the attestation data and bits state, attestation, {skipBlsValidation}, cache).isOk():
if not check_attestation(state, attestation, {skipBlsValidation}, cache).isOk():
continue continue
let score = attCache.score( let score = attCache.score(
@ -453,7 +478,7 @@ proc getAttestationsForBlock*(pool: var AttestationPool,
state.previous_epoch_attestations.maxLen - state.previous_epoch_attestations.len() state.previous_epoch_attestations.maxLen - state.previous_epoch_attestations.len()
var res: seq[Attestation] var res: seq[Attestation]
let totalCandidates = candidates.len()
while candidates.len > 0 and res.lenu64() < MAX_ATTESTATIONS: while candidates.len > 0 and res.lenu64() < MAX_ATTESTATIONS:
block: block:
# Find the candidate with the highest score - slot is used as a # Find the candidate with the highest score - slot is used as a
@ -491,6 +516,14 @@ proc getAttestationsForBlock*(pool: var AttestationPool,
# Only keep candidates that might add coverage # Only keep candidates that might add coverage
it.score > 0 it.score > 0
let
packingTime = Moment.now() - startPackingTime
debug "Packed attestations for block",
newBlockSlot, packingTime, totalCandidates, attestations = res.len()
attestation_pool_block_attestation_packing_time.set(
packingTime.toFloatSeconds())
res res
func bestValidation(aggregates: openArray[Validation]): (int, int) = func bestValidation(aggregates: openArray[Validation]): (int, int) =

View File

@ -145,24 +145,29 @@ proc is_valid_indexed_attestation*(
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/beacon-chain.md#is_valid_indexed_attestation # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/beacon-chain.md#is_valid_indexed_attestation
proc is_valid_indexed_attestation*( proc is_valid_indexed_attestation*(
fork: Fork, genesis_validators_root: Eth2Digest, fork: Fork, genesis_validators_root: Eth2Digest,
epochRef: EpochRef, attesting_indices: auto, epochRef: EpochRef,
attestation: SomeAttestation, flags: UpdateFlags): Result[void, cstring] = attestation: SomeAttestation, flags: UpdateFlags): Result[void, cstring] =
# This is a variation on `is_valid_indexed_attestation` that works directly # This is a variation on `is_valid_indexed_attestation` that works directly
# with an attestation instead of first constructing an `IndexedAttestation` # with an attestation instead of first constructing an `IndexedAttestation`
# and then validating it - for the purpose of validating the signature, the # and then validating it - for the purpose of validating the signature, the
# order doesn't matter and we can proceed straight to validating the # order doesn't matter and we can proceed straight to validating the
# signature instead # signature instead
if attesting_indices.len == 0: let sigs = attestation.aggregation_bits.countOnes()
return err("indexed_attestation: no attesting indices") if sigs == 0:
return err("is_valid_indexed_attestation: no attesting indices")
# Verify aggregate signature # Verify aggregate signature
if not (skipBLSValidation in flags or attestation.signature is TrustedSig): if not (skipBLSValidation in flags or attestation.signature is TrustedSig):
let pubkeys = mapIt( var
attesting_indices, epochRef.validator_keys[it]) pubkeys = newSeqOfCap[ValidatorPubKey](sigs)
for index in get_attesting_indices(
epochRef, attestation.data, attestation.aggregation_bits):
pubkeys.add(epochRef.validator_keys[index])
if not verify_attestation_signature( if not verify_attestation_signature(
fork, genesis_validators_root, attestation.data, fork, genesis_validators_root, attestation.data,
pubkeys, attestation.signature): pubkeys, attestation.signature):
return err("indexed attestation: signature verification failure") return err("is_valid_indexed_attestation: signature verification failure")
ok() ok()

View File

@ -277,8 +277,7 @@ proc validateAttestation*(
block: block:
# First pass - without cryptography # First pass - without cryptography
let v = is_valid_indexed_attestation( let v = is_valid_indexed_attestation(
fork, genesis_validators_root, epochRef, attesting_indices, fork, genesis_validators_root, epochRef, attestation,
attestation,
{skipBLSValidation}) {skipBLSValidation})
if v.isErr(): if v.isErr():
return err((ValidationResult.Reject, v.error)) return err((ValidationResult.Reject, v.error))

View File

@ -489,53 +489,44 @@ iterator get_attesting_indices*(state: BeaconState,
bits: CommitteeValidatorsBits, bits: CommitteeValidatorsBits,
cache: var StateCache): ValidatorIndex = cache: var StateCache): ValidatorIndex =
## Return the set of attesting indices corresponding to ``data`` and ``bits``. ## Return the set of attesting indices corresponding to ``data`` and ``bits``.
if bits.lenu64 != get_beacon_committee_len(state, data.slot, data.index.CommitteeIndex, cache): if bits.lenu64 != get_beacon_committee_len(
state, data.slot, data.index.CommitteeIndex, cache):
trace "get_attesting_indices: inconsistent aggregation and committee length" trace "get_attesting_indices: inconsistent aggregation and committee length"
else: else:
var i = 0 var i = 0
for index in get_beacon_committee(state, data.slot, data.index.CommitteeIndex, cache): for index in get_beacon_committee(
state, data.slot, data.index.CommitteeIndex, cache):
if bits[i]: if bits[i]:
yield index yield index
inc i inc i
iterator get_sorted_attesting_indices*(state: BeaconState, proc is_valid_indexed_attestation*(
data: AttestationData, state: BeaconState, attestation: SomeAttestation, flags: UpdateFlags,
bits: CommitteeValidatorsBits, cache: var StateCache): Result[void, cstring] =
cache: var StateCache): ValidatorIndex = # This is a variation on `is_valid_indexed_attestation` that works directly
var heap = initHeapQueue[ValidatorIndex]() # with an attestation instead of first constructing an `IndexedAttestation`
for index in get_attesting_indices(state, data, bits, cache): # and then validating it - for the purpose of validating the signature, the
heap.push(index) # order doesn't matter and we can proceed straight to validating the
# signature instead
while heap.len > 0: let sigs = attestation.aggregation_bits.countOnes()
yield heap.pop() if sigs == 0:
return err("is_valid_indexed_attestation: no attesting indices")
func get_sorted_attesting_indices_list*( # Verify aggregate signature
state: BeaconState, data: AttestationData, bits: CommitteeValidatorsBits, if not (skipBLSValidation in flags or attestation.signature is TrustedSig):
cache: var StateCache): List[uint64, Limit MAX_VALIDATORS_PER_COMMITTEE] = var
for index in get_sorted_attesting_indices(state, data, bits, cache): pubkeys = newSeqOfCap[ValidatorPubKey](sigs)
if not result.add index.uint64: for index in get_attesting_indices(
raiseAssert "The `result` list has the same max size as the sorted `bits` input" state, attestation.data, attestation.aggregation_bits, cache):
pubkeys.add(state.validators[index].pubkey)
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/beacon-chain.md#get_indexed_attestation if not verify_attestation_signature(
func get_indexed_attestation(state: BeaconState, attestation: Attestation, state.fork, state.genesis_validators_root, attestation.data,
cache: var StateCache): IndexedAttestation = pubkeys, attestation.signature):
## Return the indexed attestation corresponding to ``attestation``. return err("indexed attestation: signature verification failure")
IndexedAttestation(
attesting_indices: get_sorted_attesting_indices_list(
state, attestation.data, attestation.aggregation_bits, cache),
data: attestation.data,
signature: attestation.signature
)
func get_indexed_attestation(state: BeaconState, attestation: TrustedAttestation, ok()
cache: var StateCache): TrustedIndexedAttestation =
## Return the indexed attestation corresponding to ``attestation``.
TrustedIndexedAttestation(
attesting_indices: get_sorted_attesting_indices_list(
state, attestation.data, attestation.aggregation_bits, cache),
data: attestation.data,
signature: attestation.signature
)
# Attestation validation # Attestation validation
# ------------------------------------------------------------------------------------------ # ------------------------------------------------------------------------------------------
@ -610,8 +601,7 @@ proc check_attestation*(
if not (data.source == state.previous_justified_checkpoint): if not (data.source == state.previous_justified_checkpoint):
return err("FFG data not matching previous justified epoch") return err("FFG data not matching previous justified epoch")
? is_valid_indexed_attestation( ? is_valid_indexed_attestation(state, attestation, flags, cache)
state, get_indexed_attestation(state, attestation, cache), flags)
ok() ok()

View File

@ -218,7 +218,7 @@ proc createAndSendAttestation(node: BeaconNode,
let deadline = attestationData.slot.toBeaconTime() + let deadline = attestationData.slot.toBeaconTime() +
seconds(int(SECONDS_PER_SLOT div 3)) seconds(int(SECONDS_PER_SLOT div 3))
let (delayStr, delayMillis) = let (delayStr, delaySecs) =
if wallTime < deadline: if wallTime < deadline:
("-" & $(deadline - wallTime), -toFloatSeconds(deadline - wallTime)) ("-" & $(deadline - wallTime), -toFloatSeconds(deadline - wallTime))
else: else:
@ -228,7 +228,7 @@ proc createAndSendAttestation(node: BeaconNode,
validator = shortLog(validator), delay = delayStr, validator = shortLog(validator), delay = delayStr,
indexInCommittee = indexInCommittee indexInCommittee = indexInCommittee
beacon_attestation_sent_delay.observe(delayMillis) beacon_attestation_sent_delay.observe(delaySecs)
proc getBlockProposalEth1Data*(node: BeaconNode, proc getBlockProposalEth1Data*(node: BeaconNode,
stateData: StateData): BlockProposalEth1Data = stateData: StateData): BlockProposalEth1Data =