refactor batch validation not to require genesis_validators_root each time (#4640)

This commit is contained in:
tersec 2023-02-20 09:26:22 +01:00 committed by GitHub
parent 0cc0c7e6b7
commit 629b005c27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 55 additions and 53 deletions

View File

@ -98,6 +98,10 @@ type
# we accumulate here instead # we accumulate here instead
counts: tuple[signatures, batches, aggregates: int64] counts: tuple[signatures, batches, aggregates: int64]
# Most scheduled checks require this immutable value, so don't require it
# to be provided separately each time
genesis_validators_root: Eth2Digest
const const
# We cap waiting for an idle slot in case there's a lot of network traffic # We cap waiting for an idle slot in case there's a lot of network traffic
# taking up all CPU - we don't want to _completely_ stop processing # taking up all CPU - we don't want to _completely_ stop processing
@ -122,10 +126,12 @@ const
proc new*( proc new*(
T: type BatchCrypto, rng: ref HmacDrbgContext, T: type BatchCrypto, rng: ref HmacDrbgContext,
eager: Eager, taskpool: TaskPoolPtr): ref BatchCrypto = eager: Eager, genesis_validators_root: Eth2Digest, taskpool: TaskPoolPtr):
ref BatchCrypto =
(ref BatchCrypto)( (ref BatchCrypto)(
verifier: BatchVerifier(rng: rng, taskpool: taskpool), verifier: BatchVerifier(rng: rng, taskpool: taskpool),
eager: eager, eager: eager,
genesis_validators_root: genesis_validators_root,
pruneTime: Moment.now()) pruneTime: Moment.now())
func len(batch: Batch): int = func len(batch: Batch): int =
@ -161,12 +167,12 @@ proc pruneBatchQueue(batchCrypto: ref BatchCrypto) =
batchCrypto.batches.popFirst()[].skip() batchCrypto.batches.popFirst()[].skip()
proc combine(a: var Signature, b: Signature) = func combine(a: var Signature, b: Signature) =
var tmp = AggregateSignature.init(CookedSig(a)) var tmp = AggregateSignature.init(CookedSig(a))
tmp.aggregate(b) tmp.aggregate(b)
a = Signature(tmp.finish()) a = Signature(tmp.finish())
proc combine(a: var PublicKey, b: PublicKey) = func combine(a: var PublicKey, b: PublicKey) =
var tmp = AggregatePublicKey.init(CookedPubKey(a)) var tmp = AggregatePublicKey.init(CookedPubKey(a))
tmp.aggregate(b) tmp.aggregate(b)
a = PublicKey(tmp.finish()) a = PublicKey(tmp.finish())
@ -303,10 +309,9 @@ template withBatch(
# See also verify_attestation_signature # See also verify_attestation_signature
proc scheduleAttestationCheck*( proc scheduleAttestationCheck*(
batchCrypto: ref BatchCrypto, batchCrypto: ref BatchCrypto, fork: Fork,
fork: Fork, genesis_validators_root: Eth2Digest, attestationData: AttestationData, pubkey: CookedPubKey,
attestationData: AttestationData, signature: ValidatorSig
pubkey: CookedPubKey, signature: ValidatorSig
): Result[tuple[fut: Future[BatchResult], sig: CookedSig], cstring] = ): Result[tuple[fut: Future[BatchResult], sig: CookedSig], cstring] =
## Schedule crypto verification of an attestation ## Schedule crypto verification of an attestation
## ##
@ -322,15 +327,14 @@ proc scheduleAttestationCheck*(
return err("attestation: cannot load signature") return err("attestation: cannot load signature")
fut = batchCrypto.withBatch("batch_validation.scheduleAttestationCheck"): fut = batchCrypto.withBatch("batch_validation.scheduleAttestationCheck"):
attestation_signature_set( attestation_signature_set(
fork, genesis_validators_root, attestationData, pubkey, sig) fork, batchCrypto[].genesis_validators_root, attestationData, pubkey,
sig)
ok((fut, sig)) ok((fut, sig))
proc scheduleAggregateChecks*( proc scheduleAggregateChecks*(
batchCrypto: ref BatchCrypto, batchCrypto: ref BatchCrypto, fork: Fork,
fork: Fork, genesis_validators_root: Eth2Digest, signedAggregateAndProof: SignedAggregateAndProof, dag: ChainDAGRef,
signedAggregateAndProof: SignedAggregateAndProof,
dag: ChainDAGRef,
attesting_indices: openArray[ValidatorIndex] attesting_indices: openArray[ValidatorIndex]
): Result[tuple[ ): Result[tuple[
aggregatorFut, slotFut, aggregateFut: Future[BatchResult], aggregatorFut, slotFut, aggregateFut: Future[BatchResult],
@ -368,24 +372,23 @@ proc scheduleAggregateChecks*(
let let
aggregatorFut = batchCrypto.withBatch("scheduleAggregateChecks.aggregator"): aggregatorFut = batchCrypto.withBatch("scheduleAggregateChecks.aggregator"):
aggregate_and_proof_signature_set( aggregate_and_proof_signature_set(
fork, genesis_validators_root, aggregate_and_proof, aggregatorKey, fork, batchCrypto[].genesis_validators_root, aggregate_and_proof,
aggregatorSig) aggregatorKey, aggregatorSig)
slotFut = batchCrypto.withBatch("scheduleAggregateChecks.selection_proof"): slotFut = batchCrypto.withBatch("scheduleAggregateChecks.selection_proof"):
slot_signature_set( slot_signature_set(
fork, genesis_validators_root, aggregate.data.slot, aggregatorKey, fork, batchCrypto[].genesis_validators_root, aggregate.data.slot,
slotSig) aggregatorKey, slotSig)
aggregateFut = batchCrypto.withBatch("scheduleAggregateChecks.aggregate"): aggregateFut = batchCrypto.withBatch("scheduleAggregateChecks.aggregate"):
attestation_signature_set( attestation_signature_set(
fork, genesis_validators_root, aggregate.data, aggregateKey, fork, batchCrypto[].genesis_validators_root, aggregate.data,
aggregateSig) aggregateKey, aggregateSig)
ok((aggregatorFut, slotFut, aggregateFut, aggregateSig)) ok((aggregatorFut, slotFut, aggregateFut, aggregateSig))
proc scheduleSyncCommitteeMessageCheck*( proc scheduleSyncCommitteeMessageCheck*(
batchCrypto: ref BatchCrypto, batchCrypto: ref BatchCrypto, fork: Fork, slot: Slot,
fork: Fork, genesis_validators_root: Eth2Digest, beacon_block_root: Eth2Digest, pubkey: CookedPubKey,
slot: Slot, beacon_block_root: Eth2Digest, signature: ValidatorSig
pubkey: CookedPubKey, signature: ValidatorSig
): Result[tuple[fut: Future[BatchResult], sig: CookedSig], cstring] = ): Result[tuple[fut: Future[BatchResult], sig: CookedSig], cstring] =
## Schedule crypto verification of an attestation ## Schedule crypto verification of an attestation
## ##
@ -401,16 +404,15 @@ proc scheduleSyncCommitteeMessageCheck*(
return err("SyncCommitteMessage: cannot load signature") return err("SyncCommitteMessage: cannot load signature")
fut = batchCrypto.withBatch("scheduleSyncCommitteeMessageCheck"): fut = batchCrypto.withBatch("scheduleSyncCommitteeMessageCheck"):
sync_committee_message_signature_set( sync_committee_message_signature_set(
fork, genesis_validators_root, slot, beacon_block_root, pubkey, sig) fork, batchCrypto[].genesis_validators_root, slot, beacon_block_root,
pubkey, sig)
ok((fut, sig)) ok((fut, sig))
proc scheduleContributionChecks*( proc scheduleContributionChecks*(
batchCrypto: ref BatchCrypto, batchCrypto: ref BatchCrypto,
fork: Fork, genesis_validators_root: Eth2Digest, fork: Fork, signedContributionAndProof: SignedContributionAndProof,
signedContributionAndProof: SignedContributionAndProof, subcommitteeIdx: SyncSubcommitteeIndex, dag: ChainDAGRef): Result[tuple[
subcommitteeIdx: SyncSubcommitteeIndex,
dag: ChainDAGRef): Result[tuple[
aggregatorFut, proofFut, contributionFut: Future[BatchResult], aggregatorFut, proofFut, contributionFut: Future[BatchResult],
sig: CookedSig], cstring] = sig: CookedSig], cstring] =
## Schedule crypto verification of all signatures in a ## Schedule crypto verification of all signatures in a
@ -444,22 +446,22 @@ proc scheduleContributionChecks*(
let let
aggregatorFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.aggregator"): aggregatorFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.aggregator"):
contribution_and_proof_signature_set( contribution_and_proof_signature_set(
fork, genesis_validators_root, contribution_and_proof, aggregatorKey, fork, batchCrypto[].genesis_validators_root, contribution_and_proof,
aggregatorSig) aggregatorKey, aggregatorSig)
proofFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.selection_proof"): proofFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.selection_proof"):
sync_committee_selection_proof_set( sync_committee_selection_proof_set(
fork, genesis_validators_root, contribution.slot, fork, batchCrypto[].genesis_validators_root, contribution.slot,
subcommitteeIdx, aggregatorKey, proofSig) subcommitteeIdx, aggregatorKey, proofSig)
contributionFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.contribution"): contributionFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.contribution"):
sync_committee_message_signature_set( sync_committee_message_signature_set(
fork, genesis_validators_root, contribution.slot, fork, batchCrypto[].genesis_validators_root, contribution.slot,
contribution.beacon_block_root, contributionKey, contributionSig) contribution.beacon_block_root, contributionKey, contributionSig)
ok((aggregatorFut, proofFut, contributionFut, contributionSig)) ok((aggregatorFut, proofFut, contributionFut, contributionSig))
proc scheduleBlsToExecutionChangeCheck*( proc scheduleBlsToExecutionChangeCheck*(
batchCrypto: ref BatchCrypto, batchCrypto: ref BatchCrypto,
genesisFork: Fork, genesis_validators_root: Eth2Digest, genesisFork: Fork,
signedBLSToExecutionChange: SignedBLSToExecutionChange): Result[tuple[ signedBLSToExecutionChange: SignedBLSToExecutionChange): Result[tuple[
blsToExecutionFut: Future[BatchResult], blsToExecutionFut: Future[BatchResult],
sig: CookedSig], cstring] = sig: CookedSig], cstring] =
@ -487,7 +489,7 @@ proc scheduleBlsToExecutionChangeCheck*(
return err("scheduleBlsToExecutionChangeCheck: invalid validator change signature") return err("scheduleBlsToExecutionChangeCheck: invalid validator change signature")
validatorChangeFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.contribution"): validatorChangeFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.contribution"):
bls_to_execution_change_signature_set( bls_to_execution_change_signature_set(
genesis_fork, genesis_validators_root, genesis_fork, batchCrypto[].genesis_validators_root,
signedBLSToExecutionChange.message, signedBLSToExecutionChange.message,
validatorChangePubkey, validatorChangeSig) validatorChangePubkey, validatorChangeSig)

View File

@ -181,7 +181,7 @@ proc new*(T: type Eth2Processor,
# Only run eager attestation signature verification if we're not # Only run eager attestation signature verification if we're not
# processing blocks in order to give priority to block processing # processing blocks in order to give priority to block processing
eager = proc(): bool = not blockProcessor[].hasBlocks(), eager = proc(): bool = not blockProcessor[].hasBlocks(),
taskpool) genesis_validators_root = dag.genesis_validators_root, taskpool)
) )
# Each validator logs, validates then passes valid data to its destination # Each validator logs, validates then passes valid data to its destination

View File

@ -550,8 +550,6 @@ proc validateAttestation*(
let let
fork = pool.dag.forkAtEpoch(attestation.data.slot.epoch) fork = pool.dag.forkAtEpoch(attestation.data.slot.epoch)
genesis_validators_root =
getStateField(pool.dag.headState, genesis_validators_root)
attesting_index = get_attesting_indices_one( attesting_index = get_attesting_indices_one(
shufflingRef, slot, committee_index, attestation.aggregation_bits) shufflingRef, slot, committee_index, attestation.aggregation_bits)
@ -581,8 +579,8 @@ proc validateAttestation*(
# Attestation signatures are batch-verified # Attestation signatures are batch-verified
let deferredCrypto = batchCrypto let deferredCrypto = batchCrypto
.scheduleAttestationCheck( .scheduleAttestationCheck(
fork, genesis_validators_root, attestation.data, fork, attestation.data, pubkey,
pubkey, attestation.signature) attestation.signature)
if deferredCrypto.isErr(): if deferredCrypto.isErr():
return checkedReject(deferredCrypto.error) return checkedReject(deferredCrypto.error)
@ -736,8 +734,6 @@ proc validateAggregate*(
let let
fork = pool.dag.forkAtEpoch(aggregate.data.slot.epoch) fork = pool.dag.forkAtEpoch(aggregate.data.slot.epoch)
genesis_validators_root =
getStateField(pool.dag.headState, genesis_validators_root)
attesting_indices = get_attesting_indices( attesting_indices = get_attesting_indices(
shufflingRef, slot, committee_index, aggregate.aggregation_bits) shufflingRef, slot, committee_index, aggregate.aggregation_bits)
@ -745,8 +741,8 @@ proc validateAggregate*(
sig = if checkSignature: sig = if checkSignature:
let deferredCrypto = batchCrypto let deferredCrypto = batchCrypto
.scheduleAggregateChecks( .scheduleAggregateChecks(
fork, genesis_validators_root, fork, signedAggregateAndProof, pool.dag,
signedAggregateAndProof, pool.dag, attesting_indices attesting_indices
) )
if deferredCrypto.isErr(): if deferredCrypto.isErr():
return checkedReject(deferredCrypto.error) return checkedReject(deferredCrypto.error)
@ -842,8 +838,7 @@ proc validateBlsToExecutionChange*(
# BLS to execution change signatures are batch-verified # BLS to execution change signatures are batch-verified
let deferredCrypto = batchCrypto.scheduleBlsToExecutionChangeCheck( let deferredCrypto = batchCrypto.scheduleBlsToExecutionChangeCheck(
pool.dag.cfg.genesisFork, pool.dag.genesis_validators_root, pool.dag.cfg.genesisFork, signed_address_change)
signed_address_change)
if deferredCrypto.isErr(): if deferredCrypto.isErr():
return checkedReject(deferredCrypto.error) return checkedReject(deferredCrypto.error)
@ -980,7 +975,6 @@ proc validateSyncCommitteeMessage*(
let let
epoch = msg.slot.epoch epoch = msg.slot.epoch
fork = dag.forkAtEpoch(epoch) fork = dag.forkAtEpoch(epoch)
genesis_validators_root = dag.genesis_validators_root
senderPubKey = dag.validatorKey(msg.validator_index).valueOr: senderPubKey = dag.validatorKey(msg.validator_index).valueOr:
return errReject("SyncCommitteeMessage: invalid validator index") return errReject("SyncCommitteeMessage: invalid validator index")
@ -989,8 +983,7 @@ proc validateSyncCommitteeMessage*(
# Attestation signatures are batch-verified # Attestation signatures are batch-verified
let deferredCrypto = batchCrypto let deferredCrypto = batchCrypto
.scheduleSyncCommitteeMessageCheck( .scheduleSyncCommitteeMessageCheck(
fork, genesis_validators_root, fork, msg.slot, msg.beacon_block_root,
msg.slot, msg.beacon_block_root,
senderPubKey, msg.signature) senderPubKey, msg.signature)
if deferredCrypto.isErr(): if deferredCrypto.isErr():
return errReject(deferredCrypto.error) return errReject(deferredCrypto.error)
@ -1060,7 +1053,6 @@ proc validateContribution*(
let let
epoch = msg.message.contribution.slot.epoch epoch = msg.message.contribution.slot.epoch
fork = dag.forkAtEpoch(epoch) fork = dag.forkAtEpoch(epoch)
genesis_validators_root = dag.genesis_validators_root
if msg.message.contribution.aggregation_bits.countOnes() == 0: if msg.message.contribution.aggregation_bits.countOnes() == 0:
# [REJECT] The contribution has participants # [REJECT] The contribution has participants
@ -1081,7 +1073,7 @@ proc validateContribution*(
let sig = if checkSignature: let sig = if checkSignature:
let deferredCrypto = batchCrypto.scheduleContributionChecks( let deferredCrypto = batchCrypto.scheduleContributionChecks(
fork, genesis_validators_root, msg, subcommitteeIdx, dag) fork, msg, subcommitteeIdx, dag)
if deferredCrypto.isErr(): if deferredCrypto.isErr():
return errReject(deferredCrypto.error) return errReject(deferredCrypto.error)

View File

@ -330,7 +330,8 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
quarantine = newClone(Quarantine.init()) quarantine = newClone(Quarantine.init())
attPool = AttestationPool.init(dag, quarantine) attPool = AttestationPool.init(dag, quarantine)
batchCrypto = BatchCrypto.new( batchCrypto = BatchCrypto.new(
keys.newRng(), eager = func(): bool = true, taskpool) keys.newRng(), eager = func(): bool = true,
genesis_validators_root = dag.genesis_validators_root, taskpool)
syncCommitteePool = newClone SyncCommitteeMsgPool.init(keys.newRng()) syncCommitteePool = newClone SyncCommitteeMsgPool.init(keys.newRng())
timers: array[Timers, RunningStat] timers: array[Timers, RunningStat]
attesters: RunningStat attesters: RunningStat

View File

@ -1,5 +1,5 @@
# beacon_chain # beacon_chain
# Copyright (c) 2018-2022 Status Research & Development GmbH # Copyright (c) 2018-2023 Status Research & Development GmbH
# Licensed and distributed under either of # Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -47,7 +47,9 @@ suite "Gossip validation " & preset():
state = newClone(dag.headState) state = newClone(dag.headState)
cache = StateCache() cache = StateCache()
info = ForkedEpochInfo() info = ForkedEpochInfo()
batchCrypto = BatchCrypto.new(keys.newRng(), eager = proc(): bool = false, taskpool) batchCrypto = BatchCrypto.new(
keys.newRng(), eager = proc(): bool = false,
genesis_validators_root = dag.genesis_validators_root, taskpool)
# Slot 0 is a finalized slot - won't be making attestations for it.. # Slot 0 is a finalized slot - won't be making attestations for it..
check: check:
process_slots( process_slots(
@ -185,7 +187,6 @@ suite "Gossip validation - Extra": # Not based on preset config
cfg cfg
taskpool = Taskpool.new() taskpool = Taskpool.new()
quarantine = newClone(Quarantine.init()) quarantine = newClone(Quarantine.init())
batchCrypto = BatchCrypto.new(keys.newRng(), eager = proc(): bool = false, taskpool)
var var
verifier = BatchVerifier(rng: keys.newRng(), taskpool: Taskpool.new()) verifier = BatchVerifier(rng: keys.newRng(), taskpool: Taskpool.new())
dag = block: dag = block:
@ -216,6 +217,12 @@ suite "Gossip validation - Extra": # Not based on preset config
check: added.isOk() check: added.isOk()
dag.updateHead(added[], quarantine[]) dag.updateHead(added[], quarantine[])
dag dag
let batchCrypto = BatchCrypto.new(
keys.newRng(), eager = proc(): bool = false,
genesis_validators_root = dag.genesis_validators_root, taskpool)
var
state = assignClone(dag.headState.altairData) state = assignClone(dag.headState.altairData)
slot = state[].data.slot slot = state[].data.slot