mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-20 19:40:52 +00:00
0d9fd54857
In order to avoid full replays when validating attestations hailing from untaken forks, it's better to keep shufflings separate from `EpochRef` and perform a lookahead on the shuffling when processing the block that determines them. This also helps performance in the case where REST clients are trying to perform lookahead on attestation duties and decreases memory usage by sharing shufflings between EpochRef instances of the same dependent root.
468 lines
18 KiB
Nim
468 lines
18 KiB
Nim
# beacon_chain
|
|
# Copyright (c) 2019-2022 Status Research & Development GmbH
|
|
# Licensed and distributed under either of
|
|
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
|
|
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
|
|
when (NimMajor, NimMinor) < (1, 4):
|
|
{.push raises: [Defect].}
|
|
else:
|
|
{.push raises: [].}
|
|
|
|
import
|
|
std/[deques, sequtils],
|
|
metrics,
|
|
# Status
|
|
chronicles, chronos,
|
|
../spec/signatures_batch,
|
|
../consensus_object_pools/[blockchain_dag, spec_cache]
|
|
|
|
export signatures_batch, blockchain_dag
|
|
|
|
logScope:
|
|
topics = "gossip_checks"
|
|
|
|
declareCounter batch_verification_batches,
|
|
"Total number of batches processed"
|
|
declareCounter batch_verification_signatures,
|
|
"Total number of verified signatures before aggregation"
|
|
declareCounter batch_verification_aggregates,
|
|
"Total number of verified signatures after aggregation"
|
|
|
|
# Batched gossip validation
|
|
# ----------------------------------------------------------------
|
|
# Batching in the context of BLS means collecting the signatures of several
|
|
# messages and verifying them all at once - this can be done more efficiently
|
|
# than verifying each message one by one, but the downside is that we get an
|
|
# all-or-nothing response - in case of an invalid signature, we must re-check
|
|
# each message separately.
|
|
#
|
|
# In addition to batching, we also perform lazy aggregation:
|
|
#
|
|
# * batching speeds up the verification of multiple signatures over different
|
|
# messages, by a decent amount
|
|
# * lazy aggregation speeds up the verification of multiple signatures over the
|
|
# same message, by a lot
|
|
#
|
|
# Due to the nature of gossip validation in eth2, it is common for messages
|
|
# to arrive in bursts - because most traffic on the network is valid (honest
|
|
# nodes don't re-broadcast invalid traffic and dishonest nodes quickly get
|
|
# disconnected), valid messages by far make up the bulk of traffic.
|
|
#
|
|
# Further, traffic is divided into topics - on a single topic it will be
|
|
# highly likely that the same message appears over and over again, but with
|
|
# different signatures, as most validators have the same view of the network -
|
|
# at least 2/3 or we're in deep trouble :)
|
|
|
|
type
|
|
BatchResult* {.pure.} = enum
|
|
Invalid # Invalid by default
|
|
Valid
|
|
Timeout
|
|
|
|
Eager = proc(): bool {.gcsafe, raises: [Defect].} ##\
|
|
## Callback that returns true if eager processing should be done to lower
|
|
## latency at the expense of spending more cycles validating things, creating
|
|
## a crude timesharing priority mechanism.
|
|
|
|
BatchItem* = object
|
|
sigset: SignatureSet
|
|
fut: Future[BatchResult]
|
|
|
|
Batch* = object
|
|
created: Moment
|
|
sigsets: seq[SignatureSet]
|
|
items: seq[BatchItem]
|
|
|
|
BatchCrypto* = object
|
|
# Each batch is bounded by BatchedCryptoSize which was chosen:
|
|
# - based on "nimble bench" in nim-blscurve
|
|
# so that low power devices like Raspberry Pi 4 can process
|
|
# that many batched verifications within ~30ms on average
|
|
# - based on the accumulation rate of attestations and aggregates
|
|
# in large instances which were 12000 per slot (12s)
|
|
# hence 1 per ms (but the pattern is bursty around the 4s mark)
|
|
# The number of batches is bounded by time - batch validation is skipped if
|
|
# we can't process them in the time that one slot takes, and we return
|
|
# timeout instead which prevents the gossip layer from forwarding the
|
|
# batch.
|
|
batches: Deque[ref Batch]
|
|
eager: Eager ##\
|
|
## Eager is used to enable eager processing of attestations when it's
|
|
## prudent to do so (instead of leaving the CPU for other, presumably more
|
|
## important work like block processing)
|
|
##
|
|
verifier: BatchVerifier
|
|
|
|
pruneTime: Moment ## last time we had to prune something
|
|
|
|
# `nim-metrics` library is a bit too slow to update on every batch, so
|
|
# we accumulate here instead
|
|
counts: tuple[signatures, batches, aggregates: int64]
|
|
|
|
const
|
|
# We cap waiting for an idle slot in case there's a lot of network traffic
|
|
# taking up all CPU - we don't want to _completely_ stop processing
|
|
# attestations - doing so also allows us to benefit from more batching /
|
|
# larger network reads when under load.
|
|
BatchAttAccumTime = 10.milliseconds
|
|
|
|
# Threshold for immediate trigger of batch verification.
|
|
# A balance between throughput and worst case latency.
|
|
# At least 6 so that the constant factors
|
|
# (RNG for blinding and Final Exponentiation)
|
|
# are amortized, but not too big as we need to redo checks one-by-one if one
|
|
# failed.
|
|
# The current value is based on experiments, where 72 gives an average batch
|
|
# size of ~30 signatures per batch, or 2.5 signatures per aggregate (meaning
|
|
# an average of 12 verifications per batch which on a raspberry should be
|
|
# doable in less than 30ms). In the same experiment, a value of 36 resulted
|
|
# in 17-18 signatures per batch and 1.7-1.9 signatures per aggregate - this
|
|
# node was running on mainnet with `--subscribe-all-subnets` turned on -
|
|
# typical nodes will see smaller batches.
|
|
BatchedCryptoSize = 72
|
|
|
|
proc new*(
|
|
T: type BatchCrypto, rng: ref HmacDrbgContext,
|
|
eager: Eager, taskpool: TaskPoolPtr): ref BatchCrypto =
|
|
(ref BatchCrypto)(
|
|
verifier: BatchVerifier(rng: rng, taskpool: taskpool),
|
|
eager: eager,
|
|
pruneTime: Moment.now())
|
|
|
|
func len(batch: Batch): int =
|
|
batch.items.len()
|
|
|
|
func full(batch: Batch): bool =
|
|
batch.len() >= BatchedCryptoSize
|
|
|
|
proc complete(batchItem: var BatchItem, v: BatchResult) =
|
|
batchItem.fut.complete(v)
|
|
batchItem.fut = nil
|
|
|
|
proc complete(batchItem: var BatchItem, ok: bool) =
|
|
batchItem.fut.complete(if ok: BatchResult.Valid else: BatchResult.Invalid)
|
|
batchItem.fut = nil
|
|
|
|
proc skip(batch: var Batch) =
|
|
for res in batch.items.mitems():
|
|
res.complete(BatchResult.Timeout)
|
|
|
|
proc pruneBatchQueue(batchCrypto: ref BatchCrypto) =
|
|
let
|
|
now = Moment.now()
|
|
|
|
# If batches haven't been processed for more than 12 seconds
|
|
while batchCrypto.batches.len() > 0:
|
|
if batchCrypto.batches[0][].created + SECONDS_PER_SLOT.int64.seconds > now:
|
|
break
|
|
if batchCrypto.pruneTime + SECONDS_PER_SLOT.int64.seconds > now:
|
|
notice "Batch queue pruned, skipping attestation validation",
|
|
batches = batchCrypto.batches.len()
|
|
batchCrypto.pruneTime = Moment.now()
|
|
|
|
batchCrypto.batches.popFirst()[].skip()
|
|
|
|
proc combine(a: var Signature, b: Signature) =
|
|
var tmp = AggregateSignature.init(CookedSig(a))
|
|
tmp.aggregate(b)
|
|
a = Signature(tmp.finish())
|
|
|
|
proc combine(a: var PublicKey, b: PublicKey) =
|
|
var tmp = AggregatePublicKey.init(CookedPubKey(a))
|
|
tmp.aggregate(b)
|
|
a = PublicKey(tmp.finish())
|
|
|
|
proc processBatch(batchCrypto: ref BatchCrypto) =
|
|
## Process one batch, if there is any
|
|
|
|
# Pruning the queue here makes sure we catch up with processing if need be
|
|
batchCrypto.pruneBatchQueue() # Skip old batches
|
|
|
|
if batchCrypto[].batches.len() == 0:
|
|
# No more batches left, they might have been eagerly processed or pruned
|
|
return
|
|
|
|
let
|
|
batch = batchCrypto[].batches.popFirst()
|
|
batchSize = batch[].sigsets.len()
|
|
|
|
if batchSize == 0:
|
|
# Nothing to do in this batch, can happen when a batch is created without
|
|
# there being any signatures successfully added to it
|
|
discard
|
|
else:
|
|
trace "batch crypto - starting",
|
|
batchSize
|
|
|
|
let
|
|
startTick = Moment.now()
|
|
ok =
|
|
if batchSize == 1: blsVerify(batch[].sigsets[0])
|
|
else: batchCrypto.verifier.batchVerify(batch[].sigsets)
|
|
|
|
trace "batch crypto - finished",
|
|
batchSize,
|
|
cryptoVerified = ok,
|
|
batchDur = Moment.now() - startTick
|
|
|
|
if ok:
|
|
for res in batch.items.mitems():
|
|
res.complete(BatchResult.Valid)
|
|
else:
|
|
# Batched verification failed meaning that some of the signature checks
|
|
# failed, but we don't know which ones - check each signature separately
|
|
# instead
|
|
debug "batch crypto - failure, falling back",
|
|
items = batch[].items.len()
|
|
|
|
for item in batch[].items.mitems():
|
|
item.complete(blsVerify item.sigset)
|
|
|
|
batchCrypto[].counts.batches += 1
|
|
batchCrypto[].counts.signatures += batch[].items.len()
|
|
batchCrypto[].counts.aggregates += batch[].sigsets.len()
|
|
|
|
if batchCrypto[].counts.batches >= 256:
|
|
# Not too often, so as not to overwhelm our metrics
|
|
batch_verification_batches.inc(batchCrypto[].counts.batches)
|
|
batch_verification_signatures.inc(batchCrypto[].counts.signatures)
|
|
batch_verification_aggregates.inc(batchCrypto[].counts.aggregates)
|
|
|
|
reset(batchCrypto[].counts)
|
|
|
|
proc deferCryptoProcessing(batchCrypto: ref BatchCrypto) {.async.} =
|
|
## Process pending crypto check after some time has passed - the time is
|
|
## chosen such that there's time to fill the batch but not so long that
|
|
## latency across the network is negatively affected
|
|
await sleepAsync(BatchAttAccumTime)
|
|
|
|
# Take the first batch in the queue and process it - if eager processing has
|
|
# stolen it already, that's fine
|
|
batchCrypto.processBatch()
|
|
|
|
proc getBatch(batchCrypto: ref BatchCrypto): (ref Batch, bool) =
|
|
# Get a batch suitable for attestation processing - in particular, attestation
|
|
# batches might be skipped
|
|
batchCrypto.pruneBatchQueue()
|
|
|
|
if batchCrypto.batches.len() == 0 or
|
|
batchCrypto.batches.peekLast[].full():
|
|
# There are no batches in progress - start a new batch and schedule a
|
|
# deferred task to eventually handle it
|
|
let batch = (ref Batch)(created: Moment.now())
|
|
batchCrypto[].batches.addLast(batch)
|
|
(batch, true)
|
|
else:
|
|
let batch = batchCrypto[].batches.peekLast()
|
|
# len will be 0 when the batch was created but nothing added to it
|
|
# because of early failures
|
|
(batch, batch[].len() == 0)
|
|
|
|
proc scheduleBatch(batchCrypto: ref BatchCrypto, fresh: bool) =
|
|
if fresh:
|
|
# Every time we start a new round of batching, we need to launch a deferred
|
|
# task that will compute the result of the batch eventually in case the
|
|
# batch is never filled or eager processing is blocked
|
|
asyncSpawn batchCrypto.deferCryptoProcessing()
|
|
|
|
if batchCrypto.batches.len() > 0 and
|
|
batchCrypto.batches.peekFirst()[].full() and
|
|
batchCrypto.eager():
|
|
# If there's a full batch, process it eagerly assuming the callback allows
|
|
batchCrypto.processBatch()
|
|
|
|
template orReturnErr(v: Option, error: cstring): untyped =
|
|
## Returns with given error string if the option does not have a value
|
|
let tmp = v
|
|
if tmp.isNone:
|
|
return err(error) # this exits the calling scope, as templates are inlined.
|
|
tmp.unsafeGet()
|
|
|
|
template withBatch(
|
|
batchCrypto: ref BatchCrypto, name: cstring,
|
|
body: untyped): Future[BatchResult] =
|
|
block:
|
|
let
|
|
(batch, fresh) = batchCrypto.getBatch()
|
|
|
|
let
|
|
fut = newFuture[BatchResult](name)
|
|
sigset = body
|
|
|
|
var found = false
|
|
# Find existing signature sets with the same message - if we can verify an
|
|
# aggregate instead of several signatures, that is _much_ faster
|
|
for item in batch[].sigsets.mitems():
|
|
if item.message == sigset.message:
|
|
item.signature.combine(sigset.signature)
|
|
item.pubkey.combine(sigset.pubkey)
|
|
found = true
|
|
break
|
|
|
|
if not found:
|
|
batch[].sigsets.add sigset
|
|
|
|
# We need to keep the "original" sigset to allow verifying each signature
|
|
# one by one in the case the combined operation fails
|
|
batch[].items.add(BatchItem(sigset: sigset, fut: fut))
|
|
|
|
batchCrypto.scheduleBatch(fresh)
|
|
fut
|
|
|
|
# See also verify_attestation_signature
|
|
proc scheduleAttestationCheck*(
|
|
batchCrypto: ref BatchCrypto,
|
|
fork: Fork, genesis_validators_root: Eth2Digest,
|
|
attestationData: AttestationData,
|
|
pubkey: CookedPubKey, signature: ValidatorSig
|
|
): Result[tuple[fut: Future[BatchResult], sig: CookedSig], cstring] =
|
|
## Schedule crypto verification of an attestation
|
|
##
|
|
## The buffer is processed:
|
|
## - when eager processing is enabled and the batch is full
|
|
## - otherwise after 10ms (BatchAttAccumTime)
|
|
##
|
|
## This returns an error if crypto sanity checks failed
|
|
## and a future with the deferred attestation check otherwise.
|
|
##
|
|
let
|
|
sig = signature.load().orReturnErr("attestation: cannot load signature")
|
|
fut = batchCrypto.withBatch("batch_validation.scheduleAttestationCheck"):
|
|
attestation_signature_set(
|
|
fork, genesis_validators_root, attestationData, pubkey, sig)
|
|
|
|
ok((fut, sig))
|
|
|
|
proc scheduleAggregateChecks*(
|
|
batchCrypto: ref BatchCrypto,
|
|
fork: Fork, genesis_validators_root: Eth2Digest,
|
|
signedAggregateAndProof: SignedAggregateAndProof,
|
|
dag: ChainDAGRef,
|
|
attesting_indices: openArray[ValidatorIndex]
|
|
): Result[tuple[
|
|
aggregatorFut, slotFut, aggregateFut: Future[BatchResult],
|
|
sig: CookedSig], cstring] =
|
|
## Schedule crypto verification of an aggregate
|
|
##
|
|
## This involves 3 checks:
|
|
## - verify_slot_signature
|
|
## - verify_aggregate_and_proof_signature
|
|
## - is_valid_indexed_attestation
|
|
##
|
|
## The buffer is processed:
|
|
## - when eager processing is enabled and the batch is full
|
|
## - otherwise after 10ms (BatchAttAccumTime)
|
|
##
|
|
## This returns None if the signatures could not be loaded.
|
|
## and 3 futures with the deferred aggregate checks otherwise.
|
|
|
|
template aggregate_and_proof: untyped = signedAggregateAndProof.message
|
|
template aggregate: untyped = aggregate_and_proof.aggregate
|
|
|
|
# Do the eager steps first to avoid polluting batches with needlessly
|
|
let
|
|
aggregatorKey =
|
|
dag.validatorKey(aggregate_and_proof.aggregator_index).orReturnErr(
|
|
"SignedAggregateAndProof: invalid aggregator index")
|
|
aggregatorSig = signedAggregateAndProof.signature.load().orReturnErr(
|
|
"aggregateAndProof: invalid proof signature")
|
|
slotSig = aggregate_and_proof.selection_proof.load().orReturnErr(
|
|
"aggregateAndProof: invalid selection signature")
|
|
aggregateKey = ? aggregateAll(dag, attesting_indices)
|
|
aggregateSig = aggregate.signature.load().orReturnErr(
|
|
"aggregateAndProof: invalid aggregate signature")
|
|
|
|
let
|
|
aggregatorFut = batchCrypto.withBatch("scheduleAggregateChecks.aggregator"):
|
|
aggregate_and_proof_signature_set(
|
|
fork, genesis_validators_root, aggregate_and_proof, aggregatorKey,
|
|
aggregatorSig)
|
|
slotFut = batchCrypto.withBatch("scheduleAggregateChecks.selection_proof"):
|
|
slot_signature_set(
|
|
fork, genesis_validators_root, aggregate.data.slot, aggregatorKey,
|
|
slotSig)
|
|
aggregateFut = batchCrypto.withBatch("scheduleAggregateChecks.aggregate"):
|
|
attestation_signature_set(
|
|
fork, genesis_validators_root, aggregate.data, aggregateKey,
|
|
aggregateSig)
|
|
|
|
ok((aggregatorFut, slotFut, aggregateFut, aggregateSig))
|
|
|
|
proc scheduleSyncCommitteeMessageCheck*(
|
|
batchCrypto: ref BatchCrypto,
|
|
fork: Fork, genesis_validators_root: Eth2Digest,
|
|
slot: Slot, beacon_block_root: Eth2Digest,
|
|
pubkey: CookedPubKey, signature: ValidatorSig
|
|
): Result[tuple[fut: Future[BatchResult], sig: CookedSig], cstring] =
|
|
## Schedule crypto verification of an attestation
|
|
##
|
|
## The buffer is processed:
|
|
## - when eager processing is enabled and the batch is full
|
|
## - otherwise after 10ms (BatchAttAccumTime)
|
|
##
|
|
## This returns an error if crypto sanity checks failed
|
|
## and a future with the deferred attestation check otherwise.
|
|
##
|
|
let
|
|
sig = signature.load().orReturnErr(
|
|
"SyncCommitteMessage: cannot load signature")
|
|
fut = batchCrypto.withBatch("scheduleSyncCommitteeMessageCheck"):
|
|
sync_committee_message_signature_set(
|
|
fork, genesis_validators_root, slot, beacon_block_root, pubkey, sig)
|
|
|
|
ok((fut, sig))
|
|
|
|
proc scheduleContributionChecks*(
|
|
batchCrypto: ref BatchCrypto,
|
|
fork: Fork, genesis_validators_root: Eth2Digest,
|
|
signedContributionAndProof: SignedContributionAndProof,
|
|
subcommitteeIdx: SyncSubcommitteeIndex,
|
|
dag: ChainDAGRef): Result[tuple[
|
|
aggregatorFut, proofFut, contributionFut: Future[BatchResult],
|
|
sig: CookedSig], cstring] =
|
|
## Schedule crypto verification of all signatures in a
|
|
## SignedContributionAndProof message
|
|
##
|
|
## The buffer is processed:
|
|
## - when eager processing is enabled and the batch is full
|
|
## - otherwise after 10ms (BatchAttAccumTime)
|
|
##
|
|
## This returns an error if crypto sanity checks failed
|
|
## and a future with the deferred check otherwise.
|
|
##
|
|
template contribution_and_proof: untyped = signedContributionAndProof.message
|
|
template contribution: untyped = contribution_and_proof.contribution
|
|
|
|
# Do the eager steps first to avoid polluting batches with needlessly
|
|
let
|
|
aggregatorKey =
|
|
dag.validatorKey(contribution_and_proof.aggregator_index).orReturnErr(
|
|
"SignedAggregateAndProof: invalid contributor index")
|
|
aggregatorSig = signedContributionAndProof.signature.load().orReturnErr(
|
|
"SignedContributionAndProof: invalid proof signature")
|
|
proofSig = contribution_and_proof.selection_proof.load().orReturnErr(
|
|
"SignedContributionAndProof: invalid selection signature")
|
|
contributionSig = contribution.signature.load().orReturnErr(
|
|
"SignedContributionAndProof: invalid contribution signature")
|
|
|
|
contributionKey = ? aggregateAll(
|
|
dag, dag.syncCommitteeParticipants(contribution.slot + 1, subcommitteeIdx),
|
|
contribution.aggregation_bits)
|
|
let
|
|
aggregatorFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.aggregator"):
|
|
contribution_and_proof_signature_set(
|
|
fork, genesis_validators_root, contribution_and_proof, aggregatorKey,
|
|
aggregatorSig)
|
|
proofFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.selection_proof"):
|
|
sync_committee_selection_proof_set(
|
|
fork, genesis_validators_root, contribution.slot,
|
|
subcommitteeIdx, aggregatorKey, proofSig)
|
|
contributionFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.contribution"):
|
|
sync_committee_message_signature_set(
|
|
fork, genesis_validators_root, contribution.slot,
|
|
contribution.beacon_block_root, contributionKey, contributionSig)
|
|
|
|
ok((aggregatorFut, proofFut, contributionFut, contributionSig))
|