nimbus-eth2/beacon_chain/gossip_processing/batch_validation.nim

283 lines
9.9 KiB
Nim

# beacon_chain
# Copyright (c) 2019-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
# Status
chronicles, chronos,
stew/results,
eth/keys,
# Internals
../spec/[
datatypes, crypto, digest, helpers, signatures_batch],
../consensus_object_pools/[
blockchain_dag, block_quarantine,
attestation_pool, exit_pool,
block_pools_types, spec_cache
],
".."/[beacon_node_types, ssz, beacon_clock]
export BrHmacDrbgContext
logScope:
topics = "gossip_checks"
# Batched gossip validation
# ----------------------------------------------------------------
{.push raises: [Defect].}
type
BatchCrypto* = object
# The buffers are bounded by BatchedCryptoSize (16) which was chosen:
# - based on "nimble bench" in nim-blscurve
# so that low power devices like Raspberry Pi 4 can process
# that many batched verifications within 20ms
# - based on the accumulation rate of attestations and aggregates
# in large instances which were 12000 per slot (12s)
# hence 1 per ms (but the pattern is bursty around the 4s mark)
pendingBuffer: seq[SignatureSet]
resultsBuffer: seq[Future[Result[void, cstring]]]
sigVerifCache: BatchedBLSVerifierCache ##\
## A cache for batch BLS signature verification contexts
rng: ref BrHmacDrbgContext ##\
## A reference to the Nimbus application-wide RNG
const
# We cap waiting for an idle slot in case there's a lot of network traffic
# taking up all CPU - we don't want to _completely_ stop processing blocks
# in this case (attestations will get dropped) - doing so also allows us
# to benefit from more batching / larger network reads when under load.
BatchAttAccumTime = 10.milliseconds
# Threshold for immediate trigger of batch verification.
# A balance between throughput and worst case latency.
# At least 6 so that the constant factors
# (RNG for blinding and Final Exponentiation)
# are amortized,
# but not too big as we need to redo checks one-by-one if one failed.
BatchedCryptoSize = 16
proc new*(T: type BatchCrypto, rng: ref BrHmacDrbgContext): ref BatchCrypto =
(ref BatchCrypto)(rng: rng)
func clear(batchCrypto: var BatchCrypto) =
## Empty the crypto-pending attestations & aggregate queues
batchCrypto.pendingBuffer.setLen(0)
batchCrypto.resultsBuffer.setLen(0)
proc done(batchCrypto: var BatchCrypto, idx: int) =
## Send signal to [Attestation/Aggregate]Validator
## that the attestation was crypto-verified (and so gossip validated)
## with success
batchCrypto.resultsBuffer[idx].complete(Result[void, cstring].ok())
proc fail(batchCrypto: var BatchCrypto, idx: int, error: cstring) =
## Send signal to [Attestation/Aggregate]Validator
## that the attestation was NOT crypto-verified (and so NOT gossip validated)
batchCrypto.resultsBuffer[idx].complete(Result[void, cstring].err(error))
proc complete(batchCrypto: var BatchCrypto, idx: int, res: Result[void, cstring]) =
## Send signal to [Attestation/Aggregate]Validator
batchCrypto.resultsBuffer[idx].complete(res)
proc processBufferedCrypto(self: var BatchCrypto) =
## Drain all attestations waiting for crypto verifications
doAssert self.pendingBuffer.len ==
self.resultsBuffer.len
if self.pendingBuffer.len == 0:
return
trace "batch crypto - starting",
batchSize = self.pendingBuffer.len
let startTime = Moment.now()
var secureRandomBytes: array[32, byte]
self.rng[].brHmacDrbgGenerate(secureRandomBytes)
# TODO: For now only enable serial batch verification
let ok = batchVerifySerial(
self.sigVerifCache,
self.pendingBuffer,
secureRandomBytes)
let stopTime = Moment.now()
debug "batch crypto - finished",
batchSize = self.pendingBuffer.len,
cryptoVerified = ok,
dur = stopTime - startTime
if ok:
for i in 0 ..< self.resultsBuffer.len:
self.done(i)
else:
debug "batch crypto - failure, falling back",
batchSize = self.pendingBuffer.len
for i in 0 ..< self.pendingBuffer.len:
let ok = blsVerify self.pendingBuffer[i]
if ok:
self.done(i)
else:
self.fail(i, "batch crypto verification: invalid signature")
self.clear()
proc deferCryptoProcessing(self: ref BatchCrypto, idleTimeout: Duration) {.async.} =
## Process pending crypto check:
## - if time threshold is reached
## - or if networking is idle
# TODO: how to cancel the scheduled `deferCryptoProcessing(BatchAttAccumTime)` ?
# when the buffer size threshold is reached?
# In practice this only happens when we receive a burst of attestations/aggregates.
# Though it's possible to reach the threshold 9ms in,
# and have only 1ms left for further accumulation.
await sleepAsync(idleTimeout)
self[].processBufferedCrypto()
proc schedule(batchCrypto: ref BatchCrypto, fut: Future[Result[void, cstring]], checkThreshold = true) =
## Schedule a cryptocheck for processing
##
## The buffer is processed:
## - when 16 or more attestations/aggregates are buffered (BatchedCryptoSize)
## - when there are no network events (idleAsync)
## - otherwise after 10ms (BatchAttAccumTime)
# Note: use the resultsBuffer size to detect the first item
# as pendingBuffer is appended to 3 by 3 in case of aggregates
batchCrypto.resultsBuffer.add fut
if batchCrypto.resultsBuffer.len == 1:
# First attestation to be scheduled in the batch
# wait for an idle time or up to 10ms before processing
trace "batch crypto - scheduling next",
deadline = BatchAttAccumTime
asyncSpawn batchCrypto.deferCryptoProcessing(BatchAttAccumTime)
elif checkThreshold and
batchCrypto.resultsBuffer.len >= BatchedCryptoSize:
# Reached the max buffer size, process immediately
# TODO: how to cancel the scheduled `deferCryptoProcessing(BatchAttAccumTime)` ?
batchCrypto[].processBufferedCrypto()
proc scheduleAttestationCheck*(
batchCrypto: ref BatchCrypto,
fork: Fork, genesis_validators_root: Eth2Digest,
epochRef: EpochRef,
attestation: Attestation
): Option[Future[Result[void, cstring]]] =
## Schedule crypto verification of an attestation
##
## The buffer is processed:
## - when 16 or more attestations/aggregates are buffered (BatchedCryptoSize)
## - when there are no network events (idleAsync)
## - otherwise after 10ms (BatchAttAccumTime)
##
## This returns None if crypto sanity checks failed
## and a future with the deferred attestation check otherwise.
doAssert batchCrypto.pendingBuffer.len < BatchedCryptoSize
let sanity = batchCrypto
.pendingBuffer
.addAttestation(
fork, genesis_validators_root, epochRef,
attestation
)
if not sanity:
return none(Future[Result[void, cstring]])
let fut = newFuture[Result[void, cstring]](
"batch_validation.scheduleAttestationCheck"
)
batchCrypto.schedule(fut)
return some(fut)
proc scheduleAggregateChecks*(
batchCrypto: ref BatchCrypto,
fork: Fork, genesis_validators_root: Eth2Digest,
epochRef: EpochRef,
signedAggregateAndProof: SignedAggregateAndProof
): Option[tuple[slotCheck, aggregatorCheck, aggregateCheck: Future[Result[void, cstring]]]] =
## Schedule crypto verification of an aggregate
##
## This involves 3 checks:
## - verify_slot_signature
## - verify_aggregate_and_proof_signature
## - is_valid_indexed_attestation
##
## The buffer is processed:
## - when 16 or more attestations/aggregates are buffered (BatchedCryptoSize)
## - when there are no network events (idleAsync)
## - otherwise after 10ms (BatchAttAccumTime)
##
## This returns None if crypto sanity checks failed
## and 2 futures with the deferred aggregate checks otherwise.
doAssert batchCrypto.pendingBuffer.len < BatchedCryptoSize
template aggregate_and_proof: untyped = signedAggregateAndProof.message
template aggregate: untyped = aggregate_and_proof.aggregate
type R = tuple[slotCheck, aggregatorCheck, aggregateCheck: Future[Result[void, cstring]]]
# Enqueue in the buffer
# ------------------------------------------------------
let aggregator = epochRef.validator_keys[aggregate_and_proof.aggregator_index]
block:
let sanity = batchCrypto
.pendingBuffer
.addSlotSignature(
fork, genesis_validators_root,
aggregate.data.slot,
aggregator,
aggregate_and_proof.selection_proof
)
if not sanity:
return none(R)
block:
let sanity = batchCrypto
.pendingBuffer
.addAggregateAndProofSignature(
fork, genesis_validators_root,
aggregate_and_proof,
aggregator,
signed_aggregate_and_proof.signature
)
if not sanity:
return none(R)
block:
let sanity = batchCrypto
.pendingBuffer
.addAttestation(
fork, genesis_validators_root, epochRef,
aggregate
)
if not sanity:
return none(R)
let futSlot = newFuture[Result[void, cstring]](
"batch_validation.scheduleAggregateChecks.slotCheck"
)
let futAggregator = newFuture[Result[void, cstring]](
"batch_validation.scheduleAggregateChecks.aggregatorCheck"
)
let futAggregate = newFuture[Result[void, cstring]](
"batch_validation.scheduleAggregateChecks.aggregateCheck"
)
batchCrypto.schedule(futSlot, checkThreshold = false)
batchCrypto.schedule(futAggregator, checkThreshold = false)
batchCrypto.schedule(futAggregate)
return some((futSlot, futAggregator, futAggregate))