avoid quadratic attestation queue iteration (#5288)

every attestation is processed with a new wall time so we end up
iterating over all attestations for every attestation we queue - this is
4% of cpu time on a subscribe-all-subnets node

* remove redundant zero checks - block root must be an existing block
and therefore cannot be zero
* simplify "hasn't-voted" check to root only (isZeroMemory is dubiously
implemented for objects)
This commit is contained in:
Jacek Sieka 2023-08-14 15:48:30 +02:00 committed by GitHub
parent d171303133
commit f77548310f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -11,7 +11,7 @@ import
# Standard library # Standard library
std/[sequtils, tables], std/[sequtils, tables],
# Status libraries # Status libraries
stew/[objects, results], chronicles, stew/[results], chronicles,
# Internal # Internal
../spec/[beaconstate, helpers, state_transition_block], ../spec/[beaconstate, helpers, state_transition_block],
../spec/datatypes/[phase0, altair, bellatrix], ../spec/datatypes/[phase0, altair, bellatrix],
@ -150,38 +150,17 @@ proc on_tick(
ok() ok()
func process_attestation_queue(self: var ForkChoice) {.gcsafe.} func process_attestation(
proc update_time*(self: var ForkChoice, dag: ChainDAGRef, time: BeaconTime):
FcResult[void] =
const step_size = seconds(SECONDS_PER_SLOT.int)
if time > self.checkpoints.time:
# Call on_tick at least once per slot.
while time >= self.checkpoints.time + step_size:
? self.on_tick(dag, self.checkpoints.time + step_size)
if time > self.checkpoints.time:
# Might create two ticks for the last slot.
? self.on_tick(dag, time)
self.process_attestation_queue() # Only run if time changed!
ok()
func process_attestation*(
self: var ForkChoiceBackend, self: var ForkChoiceBackend,
validator_index: ValidatorIndex, validator_index: ValidatorIndex,
block_root: Eth2Digest, block_root: Eth2Digest,
target_epoch: Epoch target_epoch: Epoch
) = ) =
if block_root.isZero:
return
## Add an attestation to the fork choice context ## Add an attestation to the fork choice context
self.votes.extend(validator_index.int + 1) self.votes.extend(validator_index.int + 1)
template vote: untyped = self.votes[validator_index] template vote: untyped = self.votes[validator_index]
if target_epoch > vote.next_epoch or vote.isZeroMemory: if target_epoch > vote.next_epoch or vote.next_root.isZero:
vote.next_root = block_root vote.next_root = block_root
vote.next_epoch = target_epoch vote.next_epoch = target_epoch
@ -189,9 +168,12 @@ func process_attestation*(
validator_index = validator_index, validator_index = validator_index,
new_vote = shortLog(vote) new_vote = shortLog(vote)
func process_attestation_queue(self: var ForkChoice) = func process_attestation_queue(self: var ForkChoice, slot: Slot) =
# Spec:
# Attestations can only affect the fork choice of subsequent slots.
# Delay consideration in the fork choice until their slot is in the past.
self.queuedAttestations.keepItIf: self.queuedAttestations.keepItIf:
if it.slot < self.checkpoints.time.slotOrZero: if it.slot < slot:
for validator_index in it.attesting_indices: for validator_index in it.attesting_indices:
self.backend.process_attestation( self.backend.process_attestation(
validator_index, it.block_root, it.slot.epoch()) validator_index, it.block_root, it.slot.epoch())
@ -206,6 +188,27 @@ func contains*(self: ForkChoiceBackend, block_root: Eth2Digest): bool =
## In particular, before adding a block, its parent must be known to the fork choice ## In particular, before adding a block, its parent must be known to the fork choice
self.proto_array.indices.contains(block_root) self.proto_array.indices.contains(block_root)
proc update_time*(self: var ForkChoice, dag: ChainDAGRef, time: BeaconTime):
FcResult[void] =
# `time` is the wall time, meaning it changes on every call typically
const step_size = seconds(SECONDS_PER_SLOT.int)
if time > self.checkpoints.time:
let
preSlot = self.checkpoints.time.slotOrZero()
postSlot = time.slotOrZero()
# Call on_tick at least once per slot.
while time >= self.checkpoints.time + step_size:
? self.on_tick(dag, self.checkpoints.time + step_size)
if time > self.checkpoints.time:
# Might create two ticks for the last slot.
? self.on_tick(dag, time)
if preSlot != postSlot:
self.process_attestation_queue(postSlot)
ok()
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.1/specs/phase0/fork-choice.md#on_attestation # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.1/specs/phase0/fork-choice.md#on_attestation
proc on_attestation*( proc on_attestation*(
self: var ForkChoice, self: var ForkChoice,
@ -217,9 +220,6 @@ proc on_attestation*(
): FcResult[void] = ): FcResult[void] =
? self.update_time(dag, max(wallTime, attestation_slot.start_beacon_time)) ? self.update_time(dag, max(wallTime, attestation_slot.start_beacon_time))
if beacon_block_root.isZero:
return ok()
if attestation_slot < self.checkpoints.time.slotOrZero: if attestation_slot < self.checkpoints.time.slotOrZero:
for validator_index in attesting_indices: for validator_index in attesting_indices:
# attestation_slot and target epoch must match, per attestation rules # attestation_slot and target epoch must match, per attestation rules