Lazy aggregated batch verification (#3212)

A novel optimisation for attestation and sync committee message
validation: when batching, we look for signatures of the same message
and aggregate these before batch-validating: this results in up to 60%
fewer signature verifications on a busy server, leading to a significant
reduction in CPU usage.

* increase batch size slightly which helps finding more aggregates
* add metrics for batch verification efficiency
* use simple `blsVerify` when there is only one signature to verify in
the batch, avoiding the RNG
This commit is contained in:
Jacek Sieka 2021-12-29 15:28:40 +01:00 committed by GitHub
parent a860cd6250
commit 6b60a774e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 189 additions and 117 deletions

View File

@ -8,7 +8,8 @@
{.push raises: [Defect].}
import
std/sequtils,
std/[deques, sequtils],
metrics,
# Status
chronicles, chronos,
../spec/signatures_batch,
@ -19,13 +20,42 @@ export signatures_batch, blockchain_dag
logScope:
topics = "gossip_checks"
declareCounter batch_verification_batches,
"Total number of batches processed"
declareCounter batch_verification_signatures,
"Total number of verified signatures before aggregation"
declareCounter batch_verification_aggregates,
"Total number of verified signatures after aggregation"
# Batched gossip validation
# ----------------------------------------------------------------
# Batching in the context of BLS means collecting the signatures of several
# messages and verifying them all at once - this can be done more efficiently
# than verifying each message one by one, but the downside is that we get an
# all-or-nothing response - in case of an invalid signature, we must re-check
# each message separately.
#
# In addition to batching, we also perform lazy aggregation:
#
# * batching speeds up the verification of multiple signatures over different
# messages, by a decent amount
# * lazy aggregation speeds up the verification of multiple signatures over the
# same message, by a lot
#
# Due to the nature of gossip validation in eth2, it is common for messages
# to arrive in bursts - because most traffic on the network is valid (honest
# nodes don't re-broadcast invalid traffic and dishonest nodes quickly get
# disconnected), valid messages by far make up the bulk of traffic.
#
# Further, traffic is divided into topics - on a single topic it will be
# highly likely that the same message appears over and over again, but with
# different signatures, as most validators have the same view of the network -
# at least 2/3 or we're in deep trouble :)
type
BatchResult* {.pure.} = enum
Invalid # Invalid by default
Valid
Invalid
Timeout
Eager = proc(): bool {.gcsafe, raises: [Defect].} ##\
@ -33,16 +63,20 @@ type
## latency at the expense of spending more cycles validating things, creating
## a crude timesharing priority mechanism.
BatchItem* = object
sigset: SignatureSet
fut: Future[BatchResult]
Batch* = object
created: Moment
pendingBuffer: seq[SignatureSet]
resultsBuffer: seq[Future[BatchResult]]
sigsets: seq[SignatureSet]
items: seq[BatchItem]
BatchCrypto* = object
# Each batch is bounded by BatchedCryptoSize (16) which was chosen:
# Each batch is bounded by BatchedCryptoSize which was chosen:
# - based on "nimble bench" in nim-blscurve
# so that low power devices like Raspberry Pi 4 can process
# that many batched verifications within 20ms
# that many batched verifications within ~30ms on average
# - based on the accumulation rate of attestations and aggregates
# in large instances which were 12000 per slot (12s)
# hence 1 per ms (but the pattern is bursty around the 4s mark)
@ -50,7 +84,7 @@ type
# we can't process them in the time that one slot takes, and we return
# timeout instead which prevents the gossip layer from forwarding the
# batch.
batches: seq[ref Batch]
batches: Deque[ref Batch]
eager: Eager ##\
## Eager is used to enable eager processing of attestations when it's
## prudent to do so (instead of leaving the CPU for other, presumably more
@ -58,8 +92,11 @@ type
##
verifier: BatchVerifier
pruneTime: Moment ## :ast time we had to prune something
pruneTime: Moment ## last time we had to prune something
# `nim-metrics` library is a bit too slow to update on every batch, so
# we accumulate here instead
counts: tuple[signatures, batches, aggregates: int64]
const
# We cap waiting for an idle slot in case there's a lot of network traffic
@ -72,9 +109,16 @@ const
# A balance between throughput and worst case latency.
# At least 6 so that the constant factors
# (RNG for blinding and Final Exponentiation)
# are amortized,
# but not too big as we need to redo checks one-by-one if one failed.
BatchedCryptoSize = 16
# are amortized, but not too big as we need to redo checks one-by-one if one
# failed.
# The current value is based on experiments, where 72 gives an average batch
# size of ~30 signatures per batch, or 2.5 signatures per aggregate (meaning
# an average of 12 verifications per batch which on a raspberry should be
# doable in less than 30ms). In the same experiment, a value of 36 resulted
# in 17-18 signatures per batch and 1.7-1.9 signatures per aggregate - this
# node was running on mainnet with `--subscribe-all-subnets` turned on -
# typical nodes will see smaller batches.
BatchedCryptoSize = 72
proc new*(
T: type BatchCrypto, rng: ref BrHmacDrbgContext,
@ -85,20 +129,22 @@ proc new*(
pruneTime: Moment.now())
func len(batch: Batch): int =
doAssert batch.resultsBuffer.len() == batch.pendingBuffer.len()
batch.resultsBuffer.len()
batch.items.len()
func full(batch: Batch): bool =
batch.len() >= BatchedCryptoSize
proc clear(batch: var Batch) =
batch.pendingBuffer.setLen(0)
batch.resultsBuffer.setLen(0)
proc complete(batchItem: var BatchItem, v: BatchResult) =
batchItem.fut.complete(v)
batchItem.fut = nil
proc complete(batchItem: var BatchItem, ok: bool) =
batchItem.fut.complete(if ok: BatchResult.Valid else: BatchResult.Invalid)
batchItem.fut = nil
proc skip(batch: var Batch) =
for res in batch.resultsBuffer.mitems():
for res in batch.items.mitems():
res.complete(BatchResult.Timeout)
batch.clear() # release memory early
proc pruneBatchQueue(batchCrypto: ref BatchCrypto) =
let
@ -112,8 +158,18 @@ proc pruneBatchQueue(batchCrypto: ref BatchCrypto) =
notice "Batch queue pruned, skipping attestation validation",
batches = batchCrypto.batches.len()
batchCrypto.pruneTime = Moment.now()
batchCrypto.batches[0][].skip()
batchCrypto.batches.delete(0)
batchCrypto.batches.popFirst()[].skip()
proc combine(a: var Signature, b: Signature) =
var tmp = AggregateSignature.init(CookedSig(a))
tmp.aggregate(b)
a = Signature(tmp.finish())
proc combine(a: var PublicKey, b: PublicKey) =
var tmp = AggregatePublicKey.init(CookedPubKey(a))
tmp.aggregate(b)
a = PublicKey(tmp.finish())
proc processBatch(batchCrypto: ref BatchCrypto) =
## Process one batch, if there is any
@ -126,41 +182,52 @@ proc processBatch(batchCrypto: ref BatchCrypto) =
return
let
batch = batchCrypto[].batches[0]
batchSize = batch[].len()
batchCrypto[].batches.del(0)
batch = batchCrypto[].batches.popFirst()
batchSize = batch[].sigsets.len()
if batchSize == 0:
# Nothing to do in this batch, can happen when a batch is created without
# there being any signatures successfully added to it
return
trace "batch crypto - starting",
batchSize
let startTick = Moment.now()
let ok = batchCrypto.verifier.batchVerify(batch.pendingBuffer)
trace "batch crypto - finished",
batchSize,
cryptoVerified = ok,
batchDur = Moment.now() - startTick
if ok:
for res in batch.resultsBuffer.mitems():
res.complete(BatchResult.Valid)
discard
else:
# Batched verification failed meaning that some of the signature checks
# failed, but we don't know which ones - check each signature separately
# instead
debug "batch crypto - failure, falling back",
trace "batch crypto - starting",
batchSize
for i, res in batch.resultsBuffer.mpairs():
let ok = blsVerify batch[].pendingBuffer[i]
res.complete(if ok: BatchResult.Valid else: BatchResult.Invalid)
batch[].clear() # release memory early
let
startTick = Moment.now()
ok =
if batchSize == 1: blsVerify(batch[].sigsets[0])
else: batchCrypto.verifier.batchVerify(batch[].sigsets)
trace "batch crypto - finished",
batchSize,
cryptoVerified = ok,
batchDur = Moment.now() - startTick
if ok:
for res in batch.items.mitems():
res.complete(BatchResult.Valid)
else:
# Batched verification failed meaning that some of the signature checks
# failed, but we don't know which ones - check each signature separately
# instead
debug "batch crypto - failure, falling back",
items = batch[].items.len()
for item in batch[].items.mitems():
item.complete(blsVerify item.sigset)
batchCrypto[].counts.batches += 1
batchCrypto[].counts.signatures += batch[].items.len()
batchCrypto[].counts.aggregates += batch[].sigsets.len()
if batchCrypto[].counts.batches >= 256:
# Not too often, so as not to overwhelm our metrics
batch_verification_batches.inc(batchCrypto[].counts.batches)
batch_verification_signatures.inc(batchCrypto[].counts.signatures)
batch_verification_aggregates.inc(batchCrypto[].counts.aggregates)
reset(batchCrypto[].counts)
proc deferCryptoProcessing(batchCrypto: ref BatchCrypto) {.async.} =
## Process pending crypto check after some time has passed - the time is
@ -178,14 +245,14 @@ proc getBatch(batchCrypto: ref BatchCrypto): (ref Batch, bool) =
batchCrypto.pruneBatchQueue()
if batchCrypto.batches.len() == 0 or
batchCrypto.batches[^1][].full():
batchCrypto.batches.peekLast[].full():
# There are no batches in progress - start a new batch and schedule a
# deferred task to eventually handle it
let batch = (ref Batch)(created: Moment.now())
batchCrypto[].batches.add(batch)
batchCrypto[].batches.addLast(batch)
(batch, true)
else:
let batch = batchCrypto[].batches[^1]
let batch = batchCrypto[].batches.peekLast()
# len will be 0 when the batch was created but nothing added to it
# because of early failures
(batch, batch[].len() == 0)
@ -198,7 +265,7 @@ proc scheduleBatch(batchCrypto: ref BatchCrypto, fresh: bool) =
asyncSpawn batchCrypto.deferCryptoProcessing()
if batchCrypto.batches.len() > 0 and
batchCrypto.batches[0][].full() and
batchCrypto.batches.peekFirst()[].full() and
batchCrypto.eager():
# If there's a full batch, process it eagerly assuming the callback allows
batchCrypto.processBatch()
@ -215,13 +282,28 @@ template withBatch(
body: untyped): Future[BatchResult] =
block:
let
(batch {.inject.}, fresh) = batchCrypto.getBatch()
(batch, fresh) = batchCrypto.getBatch()
body
let
fut = newFuture[BatchResult](name)
sigset = body
let fut = newFuture[BatchResult](name)
var found = false
# Find existing signature sets with the same message - if we can verify an
# aggregate instead of several signatures, that is _much_ faster
for item in batch[].sigsets.mitems():
if item.message == sigset.message:
item.signature.combine(sigset.signature)
item.pubkey.combine(sigset.pubkey)
found = true
break
batch[].resultsBuffer.add(fut)
if not found:
batch[].sigsets.add sigset
# We need to keep the "original" sigset to allow verifying each signature
# one by one in the case the combined operation fails
batch[].items.add(BatchItem(sigset: sigset, fut: fut))
batchCrypto.scheduleBatch(fresh)
fut
@ -245,7 +327,7 @@ proc scheduleAttestationCheck*(
let
sig = signature.load().orReturnErr("attestation: cannot load signature")
fut = batchCrypto.withBatch("batch_validation.scheduleAttestationCheck"):
batch.pendingBuffer.add_attestation_signature(
attestation_signature_set(
fork, genesis_validators_root, attestationData, pubkey, sig)
ok((fut, sig))
@ -291,15 +373,15 @@ proc scheduleAggregateChecks*(
let
aggregatorFut = batchCrypto.withBatch("scheduleAggregateChecks.aggregator"):
batch.pendingBuffer.add_aggregate_and_proof_signature(
aggregate_and_proof_signature_set(
fork, genesis_validators_root, aggregate_and_proof, aggregatorKey,
aggregatorSig)
slotFut = batchCrypto.withBatch("scheduleAggregateChecks.selection_proof"):
batch.pendingBuffer.add_slot_signature(
slot_signature_set(
fork, genesis_validators_root, aggregate.data.slot, aggregatorKey,
slotSig)
aggregateFut = batchCrypto.withBatch("scheduleAggregateChecks.aggregate"):
batch.pendingBuffer.add_attestation_signature(
attestation_signature_set(
fork, genesis_validators_root, aggregate.data, aggregateKey,
aggregateSig)
@ -324,7 +406,7 @@ proc scheduleSyncCommitteeMessageCheck*(
sig = signature.load().orReturnErr(
"SyncCommitteMessage: cannot load signature")
fut = batchCrypto.withBatch("scheduleSyncCommitteeMessageCheck"):
batch.pendingBuffer.add_sync_committee_message_signature(
sync_committee_message_signature_set(
fork, genesis_validators_root, slot, beacon_block_root, pubkey, sig)
ok((fut, sig))
@ -367,15 +449,15 @@ proc scheduleContributionChecks*(
contribution.aggregation_bits)
let
aggregatorFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.aggregator"):
batch.pendingBuffer.add_contribution_and_proof_signature(
contribution_and_proof_signature_set(
fork, genesis_validators_root, contribution_and_proof, aggregatorKey,
aggregatorSig)
proofFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.selection_proof"):
batch.pendingBuffer.add_sync_committee_selection_proof(
sync_committee_selection_proof_set(
fork, genesis_validators_root, contribution.slot,
contribution.subcommittee_index, aggregatorKey, proofSig)
contributionFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.contribution"):
batch.pendingBuffer.add_sync_committee_message_signature(
sync_committee_message_signature_set(
fork, genesis_validators_root, contribution.slot,
contribution.beacon_block_root, contributionKey, contributionSig)

View File

@ -55,17 +55,16 @@ template loadOrExit(signature: ValidatorSig, error: cstring):
return err(error) # this exits the calling scope, as templates are inlined.
sig.unsafeGet()
func addSignatureSet(
sigs: var seq[SignatureSet], pubkey: CookedPubKey, signing_root: Eth2Digest,
signature: CookedSig) =
func init(T: type SignatureSet,
pubkey: CookedPubKey, signing_root: Eth2Digest,
signature: CookedSig): T =
## Add a new signature set triplet (pubkey, message, signature)
## to a collection of signature sets for batch verification.
sigs.add((
(
blscurve.PublicKey(pubkey),
signing_root.data,
blscurve.Signature(signature)
))
)
proc aggregateAttesters(
validatorIndices: openArray[uint64|ValidatorIndex],
@ -130,100 +129,91 @@ proc aggregateAttesters(
# ------------------------------------------------------
# See also: verify_slot_signature
proc add_slot_signature*(
sigs: var seq[SignatureSet],
proc slot_signature_set*(
fork: Fork, genesis_validators_root: Eth2Digest, slot: Slot,
pubkey: CookedPubKey, signature: CookedSig) =
pubkey: CookedPubKey, signature: CookedSig): SignatureSet =
let signing_root = compute_slot_signing_root(
fork, genesis_validators_root, slot)
sigs.addSignatureSet(pubkey, signing_root, signature)
SignatureSet.init(pubkey, signing_root, signature)
# See also: verify_epoch_signature
proc add_epoch_signature*(
sigs: var seq[SignatureSet],
fork: Fork, genesis_validators_root: Eth2Digest, epoch: Epoch,
pubkey: CookedPubKey, signature: CookedSig) =
proc epoch_signature_set*(
fork: Fork, genesis_validators_root: Eth2Digest, epoch: Epoch,
pubkey: CookedPubKey, signature: CookedSig): SignatureSet =
let signing_root = compute_epoch_signing_root(
fork, genesis_validators_root, epoch)
sigs.addSignatureSet(pubkey, signing_root, signature)
SignatureSet.init(pubkey, signing_root, signature)
# See also: verify_block_signature
proc add_block_signature*(
sigs: var seq[SignatureSet],
proc block_signature_set*(
fork: Fork, genesis_validators_root: Eth2Digest, slot: Slot,
blck: Eth2Digest | SomeSomeBeaconBlock | BeaconBlockHeader,
pubkey: CookedPubKey, signature: CookedSig) =
pubkey: CookedPubKey, signature: CookedSig): SignatureSet =
let signing_root = compute_block_signing_root(
fork, genesis_validators_root, slot, blck)
sigs.addSignatureSet(pubkey, signing_root, signature)
SignatureSet.init(pubkey, signing_root, signature)
# See also: verify_aggregate_and_proof_signature
proc add_aggregate_and_proof_signature*(
sigs: var seq[SignatureSet],
proc aggregate_and_proof_signature_set*(
fork: Fork, genesis_validators_root: Eth2Digest,
aggregate_and_proof: AggregateAndProof,
pubkey: CookedPubKey, signature: CookedSig) =
pubkey: CookedPubKey, signature: CookedSig): SignatureSet =
let signing_root = compute_aggregate_and_proof_signing_root(
fork, genesis_validators_root, aggregate_and_proof)
sigs.addSignatureSet(pubkey, signing_root, signature)
SignatureSet.init(pubkey, signing_root, signature)
# See also: verify_attestation_signature
proc add_attestation_signature*(
sigs: var seq[SignatureSet],
proc attestation_signature_set*(
fork: Fork, genesis_validators_root: Eth2Digest,
attestation_data: AttestationData,
pubkey: CookedPubKey, signature: CookedSig) =
pubkey: CookedPubKey, signature: CookedSig): SignatureSet =
let signing_root = compute_attestation_signing_root(
fork, genesis_validators_root, attestation_data)
sigs.addSignatureSet(pubkey, signing_root, signature)
SignatureSet.init(pubkey, signing_root, signature)
# See also: verify_voluntary_exit_signature
proc add_voluntary_exit_signature*(
sigs: var seq[SignatureSet],
proc voluntary_exit_signature_set*(
fork: Fork, genesis_validators_root: Eth2Digest,
voluntary_exit: VoluntaryExit,
pubkey: CookedPubKey, signature: CookedSig) =
pubkey: CookedPubKey, signature: CookedSig): SignatureSet =
let signing_root = compute_voluntary_exit_signing_root(
fork, genesis_validators_root, voluntary_exit)
sigs.addSignatureSet(pubkey, signing_root, signature)
SignatureSet.init(pubkey, signing_root, signature)
# See also: verify_sync_committee_message_signature
proc add_sync_committee_message_signature*(
sigs: var seq[SignatureSet],
proc sync_committee_message_signature_set*(
fork: Fork, genesis_validators_root: Eth2Digest,
slot: Slot, block_root: Eth2Digest,
pubkey: CookedPubKey, signature: CookedSig) =
pubkey: CookedPubKey, signature: CookedSig): SignatureSet =
let signing_root = compute_sync_committee_message_signing_root(
fork, genesis_validators_root, slot, block_root)
sigs.addSignatureSet(pubkey, signing_root, signature)
SignatureSet.init(pubkey, signing_root, signature)
# See also: verify_sync_committee_selection_proof
proc add_sync_committee_selection_proof*(
sigs: var seq[SignatureSet],
proc sync_committee_selection_proof_set*(
fork: Fork, genesis_validators_root: Eth2Digest,
slot: Slot, subcommittee_index: uint64,
pubkey: CookedPubKey, signature: CookedSig) =
pubkey: CookedPubKey, signature: CookedSig): SignatureSet =
let signing_root = compute_sync_committee_selection_proof_signing_root(
fork, genesis_validators_root, slot, subcommittee_index)
sigs.addSignatureSet(pubkey, signing_root, signature)
SignatureSet.init(pubkey, signing_root, signature)
proc add_contribution_and_proof_signature*(
sigs: var seq[SignatureSet],
proc contribution_and_proof_signature_set*(
fork: Fork, genesis_validators_root: Eth2Digest,
msg: ContributionAndProof,
pubkey: CookedPubKey, signature: CookedSig) =
pubkey: CookedPubKey, signature: CookedSig): SignatureSet =
let signing_root = compute_contribution_and_proof_signing_root(
fork, genesis_validators_root, msg)
sigs.addSignatureSet(pubkey, signing_root, signature)
SignatureSet.init(pubkey, signing_root, signature)
proc collectSignatureSets*(
sigs: var seq[SignatureSet],
@ -264,7 +254,7 @@ proc collectSignatureSets*(
# 1. Block proposer
# ----------------------------------------------------
sigs.add_block_signature(
sigs.add block_signature_set(
fork, genesis_validators_root,
signed_block.message.slot, signed_block.root,
proposer_key.get(), signed_block.signature.loadOrExit(
@ -272,7 +262,7 @@ proc collectSignatureSets*(
# 2. Randao Reveal
# ----------------------------------------------------
sigs.add_epoch_signature(
sigs.add epoch_signature_set(
fork, genesis_validators_root, epoch, proposer_key.get(),
signed_block.message.body.randao_reveal.loadOrExit(
"collectSignatureSets: cannot load randao"))
@ -299,7 +289,7 @@ proc collectSignatureSets*(
if not key.isSome():
return err("collectSignatureSets: invalid slashing proposer index 1")
sigs.add_block_signature(
sigs.add block_signature_set(
fork, genesis_validators_root, header.message.slot, header.message,
key.get(), header.signature.loadOrExit(
"collectSignatureSets: cannot load proposer slashing 1 signature"))
@ -312,7 +302,7 @@ proc collectSignatureSets*(
if not key.isSome():
return err("collectSignatureSets: invalid slashing proposer index 2")
sigs.add_block_signature(
sigs.add block_signature_set(
fork, genesis_validators_root, header.message.slot, header.message,
key.get(), header.signature.loadOrExit(
"collectSignatureSets: cannot load proposer slashing 2 signature"))
@ -337,7 +327,7 @@ proc collectSignatureSets*(
key = ? aggregateAttesters(
slashing.attestation_1.attesting_indices.asSeq(), validatorKeys)
sig = slashing.attestation_1.signature.loadOrExit("")
sigs.add_attestation_signature(
sigs.add attestation_signature_set(
fork, genesis_validators_root, slashing.attestation_1.data, key, sig)
# Conflicting attestation 2
@ -346,7 +336,7 @@ proc collectSignatureSets*(
key = ? aggregateAttesters(
slashing.attestation_2.attesting_indices.asSeq(), validatorKeys)
sig = slashing.attestation_2.signature.loadOrExit("")
sigs.add_attestation_signature(
sigs.add attestation_signature_set(
fork, genesis_validators_root, slashing.attestation_2.data, key, sig)
# 5. Attestations
@ -368,7 +358,7 @@ proc collectSignatureSets*(
validatorKeys)
sig = attestation.signature.loadOrExit("")
sigs.add_attestation_signature(
sigs.add attestation_signature_set(
fork, genesis_validators_root, attestation.data, key, sig)
# 6. VoluntaryExits
@ -386,7 +376,7 @@ proc collectSignatureSets*(
if not key.isSome():
return err("collectSignatureSets: invalid voluntary exit")
sigs.add_voluntary_exit_signature(
sigs.add voluntary_exit_signature_set(
fork, genesis_validators_root, volex.message, key.get(),
volex.signature.loadOrExit(
"collectSignatureSets: cannot load voluntary exit signature"))
@ -412,7 +402,7 @@ proc collectSignatureSets*(
signed_block.message.body.sync_aggregate.sync_committee_bits,
validatorKeys)
sigs.add_sync_committee_message_signature(
sigs.add sync_committee_message_signature_set(
fork, genesis_validators_root, previous_slot, beacon_block_root,
pubkey,
signed_block.message.body.sync_aggregate.sync_committee_signature.loadOrExit(

2
vendor/nim-libp2p vendored

@ -1 +1 @@
Subproject commit 123bf290c3ad951429dd3b9bbab71bed6b55a53f
Subproject commit fb0d10b6fdd93515508de095c7f63f2547a5116b