# beacon_chain # Copyright (c) 2019-2021 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. {.push raises: [Defect].} import std/sequtils, # Status chronicles, chronos, ../spec/signatures_batch, ../consensus_object_pools/[blockchain_dag, spec_cache] export signatures_batch, blockchain_dag logScope: topics = "gossip_checks" # Batched gossip validation # ---------------------------------------------------------------- type BatchResult* {.pure.} = enum Valid Invalid Timeout Eager = proc(): bool {.gcsafe, raises: [Defect].} ##\ ## Callback that returns true if eager processing should be done to lower ## latency at the expense of spending more cycles validating things, creating ## a crude timesharing priority mechanism. Batch* = object created: Moment pendingBuffer: seq[SignatureSet] resultsBuffer: seq[Future[BatchResult]] BatchCrypto* = object # Each batch is bounded by BatchedCryptoSize (16) which was chosen: # - based on "nimble bench" in nim-blscurve # so that low power devices like Raspberry Pi 4 can process # that many batched verifications within 20ms # - based on the accumulation rate of attestations and aggregates # in large instances which were 12000 per slot (12s) # hence 1 per ms (but the pattern is bursty around the 4s mark) # The number of batches is bounded by time - batch validation is skipped if # we can't process them in the time that one slot takes, and we return # timeout instead which prevents the gossip layer from forwarding the # batch. batches: seq[ref Batch] eager: Eager ##\ ## Eager is used to enable eager processing of attestations when it's ## prudent to do so (instead of leaving the CPU for other, presumably more ## important work like block processing) ## verifier: BatchVerifier pruneTime: Moment ## :ast time we had to prune something const # We cap waiting for an idle slot in case there's a lot of network traffic # taking up all CPU - we don't want to _completely_ stop processing # attestations - doing so also allows us to benefit from more batching / # larger network reads when under load. BatchAttAccumTime = 10.milliseconds # Threshold for immediate trigger of batch verification. # A balance between throughput and worst case latency. # At least 6 so that the constant factors # (RNG for blinding and Final Exponentiation) # are amortized, # but not too big as we need to redo checks one-by-one if one failed. BatchedCryptoSize = 16 proc new*( T: type BatchCrypto, rng: ref BrHmacDrbgContext, eager: Eager, taskpool: TaskPoolPtr): ref BatchCrypto = (ref BatchCrypto)( verifier: BatchVerifier(rng: rng, taskpool: taskpool), eager: eager, pruneTime: Moment.now()) func len(batch: Batch): int = doAssert batch.resultsBuffer.len() == batch.pendingBuffer.len() batch.resultsBuffer.len() func full(batch: Batch): bool = batch.len() >= BatchedCryptoSize proc clear(batch: var Batch) = batch.pendingBuffer.setLen(0) batch.resultsBuffer.setLen(0) proc skip(batch: var Batch) = for res in batch.resultsBuffer.mitems(): res.complete(BatchResult.Timeout) batch.clear() # release memory early proc pruneBatchQueue(batchCrypto: ref BatchCrypto) = let now = Moment.now() # If batches haven't been processed for more than 12 seconds while batchCrypto.batches.len() > 0: if batchCrypto.batches[0][].created + SECONDS_PER_SLOT.int64.seconds > now: break if batchCrypto.pruneTime + SECONDS_PER_SLOT.int64.seconds > now: notice "Batch queue pruned, skipping attestation validation", batches = batchCrypto.batches.len() batchCrypto.pruneTime = Moment.now() batchCrypto.batches[0][].skip() batchCrypto.batches.delete(0) proc processBatch(batchCrypto: ref BatchCrypto) = ## Process one batch, if there is any # Pruning the queue here makes sure we catch up with processing if need be batchCrypto.pruneBatchQueue() # Skip old batches if batchCrypto[].batches.len() == 0: # No more batches left, they might have been eagerly processed or pruned return let batch = batchCrypto[].batches[0] batchSize = batch[].len() batchCrypto[].batches.del(0) if batchSize == 0: # Nothing to do in this batch, can happen when a batch is created without # there being any signatures successfully added to it return trace "batch crypto - starting", batchSize let startTick = Moment.now() let ok = batchCrypto.verifier.batchVerify(batch.pendingBuffer) trace "batch crypto - finished", batchSize, cryptoVerified = ok, batchDur = Moment.now() - startTick if ok: for res in batch.resultsBuffer.mitems(): res.complete(BatchResult.Valid) else: # Batched verification failed meaning that some of the signature checks # failed, but we don't know which ones - check each signature separately # instead debug "batch crypto - failure, falling back", batchSize for i, res in batch.resultsBuffer.mpairs(): let ok = blsVerify batch[].pendingBuffer[i] res.complete(if ok: BatchResult.Valid else: BatchResult.Invalid) batch[].clear() # release memory early proc deferCryptoProcessing(batchCrypto: ref BatchCrypto) {.async.} = ## Process pending crypto check after some time has passed - the time is ## chosen such that there's time to fill the batch but not so long that ## latency across the network is negatively affected await sleepAsync(BatchAttAccumTime) # Take the first batch in the queue and process it - if eager processing has # stolen it already, that's fine batchCrypto.processBatch() proc getBatch(batchCrypto: ref BatchCrypto): (ref Batch, bool) = # Get a batch suitable for attestation processing - in particular, attestation # batches might be skipped batchCrypto.pruneBatchQueue() if batchCrypto.batches.len() == 0 or batchCrypto.batches[^1][].full(): # There are no batches in progress - start a new batch and schedule a # deferred task to eventually handle it let batch = (ref Batch)(created: Moment.now()) batchCrypto[].batches.add(batch) (batch, true) else: let batch = batchCrypto[].batches[^1] # len will be 0 when the batch was created but nothing added to it # because of early failures (batch, batch[].len() == 0) proc scheduleBatch(batchCrypto: ref BatchCrypto, fresh: bool) = if fresh: # Every time we start a new round of batching, we need to launch a deferred # task that will compute the result of the batch eventually in case the # batch is never filled or eager processing is blocked asyncSpawn batchCrypto.deferCryptoProcessing() if batchCrypto.batches.len() > 0 and batchCrypto.batches[0][].full() and batchCrypto.eager(): # If there's a full batch, process it eagerly assuming the callback allows batchCrypto.processBatch() template orReturnErr(v: Option, error: cstring): untyped = ## Returns with given error string if the option does not have a value let tmp = v if tmp.isNone: return err(error) # this exits the calling scope, as templates are inlined. tmp.unsafeGet() template withBatch( batchCrypto: ref BatchCrypto, name: cstring, body: untyped): Future[BatchResult] = block: let (batch {.inject.}, fresh) = batchCrypto.getBatch() body let fut = newFuture[BatchResult](name) batch[].resultsBuffer.add(fut) batchCrypto.scheduleBatch(fresh) fut # See also verify_attestation_signature proc scheduleAttestationCheck*( batchCrypto: ref BatchCrypto, fork: Fork, genesis_validators_root: Eth2Digest, attestationData: AttestationData, pubkey: CookedPubKey, signature: ValidatorSig ): Result[tuple[fut: Future[BatchResult], sig: CookedSig], cstring] = ## Schedule crypto verification of an attestation ## ## The buffer is processed: ## - when eager processing is enabled and the batch is full ## - otherwise after 10ms (BatchAttAccumTime) ## ## This returns an error if crypto sanity checks failed ## and a future with the deferred attestation check otherwise. ## let sig = signature.load().orReturnErr("attestation: cannot load signature") fut = batchCrypto.withBatch("batch_validation.scheduleAttestationCheck"): batch.pendingBuffer.add_attestation_signature( fork, genesis_validators_root, attestationData, pubkey, sig) ok((fut, sig)) proc scheduleAggregateChecks*( batchCrypto: ref BatchCrypto, fork: Fork, genesis_validators_root: Eth2Digest, signedAggregateAndProof: SignedAggregateAndProof, epochRef: EpochRef, attesting_indices: openArray[ValidatorIndex] ): Result[tuple[ aggregatorFut, slotFut, aggregateFut: Future[BatchResult], sig: CookedSig], cstring] = ## Schedule crypto verification of an aggregate ## ## This involves 3 checks: ## - verify_slot_signature ## - verify_aggregate_and_proof_signature ## - is_valid_indexed_attestation ## ## The buffer is processed: ## - when eager processing is enabled and the batch is full ## - otherwise after 10ms (BatchAttAccumTime) ## ## This returns None if the signatures could not be loaded. ## and 3 futures with the deferred aggregate checks otherwise. template aggregate_and_proof: untyped = signedAggregateAndProof.message template aggregate: untyped = aggregate_and_proof.aggregate # Do the eager steps first to avoid polluting batches with needlessly let aggregatorKey = epochRef.validatorKey(aggregate_and_proof.aggregator_index).orReturnErr( "SignedAggregateAndProof: invalid aggregator index") aggregatorSig = signedAggregateAndProof.signature.load().orReturnErr( "aggregateAndProof: invalid proof signature") slotSig = aggregate_and_proof.selection_proof.load().orReturnErr( "aggregateAndProof: invalid selection signature") aggregateKey = ? aggregateAll(epochRef.dag, attesting_indices) aggregateSig = aggregate.signature.load().orReturnErr( "aggregateAndProof: invalid aggregate signature") let aggregatorFut = batchCrypto.withBatch("scheduleAggregateChecks.aggregator"): batch.pendingBuffer.add_aggregate_and_proof_signature( fork, genesis_validators_root, aggregate_and_proof, aggregatorKey, aggregatorSig) slotFut = batchCrypto.withBatch("scheduleAggregateChecks.selection_proof"): batch.pendingBuffer.add_slot_signature( fork, genesis_validators_root, aggregate.data.slot, aggregatorKey, slotSig) aggregateFut = batchCrypto.withBatch("scheduleAggregateChecks.aggregate"): batch.pendingBuffer.add_attestation_signature( fork, genesis_validators_root, aggregate.data, aggregateKey, aggregateSig) ok((aggregatorFut, slotFut, aggregateFut, aggregateSig)) proc scheduleSyncCommitteeMessageCheck*( batchCrypto: ref BatchCrypto, fork: Fork, genesis_validators_root: Eth2Digest, slot: Slot, beacon_block_root: Eth2Digest, pubkey: CookedPubKey, signature: ValidatorSig ): Result[tuple[fut: Future[BatchResult], sig: CookedSig], cstring] = ## Schedule crypto verification of an attestation ## ## The buffer is processed: ## - when eager processing is enabled and the batch is full ## - otherwise after 10ms (BatchAttAccumTime) ## ## This returns an error if crypto sanity checks failed ## and a future with the deferred attestation check otherwise. ## let sig = signature.load().orReturnErr( "SyncCommitteMessage: cannot load signature") fut = batchCrypto.withBatch("scheduleSyncCommitteeMessageCheck"): batch.pendingBuffer.add_sync_committee_message_signature( fork, genesis_validators_root, slot, beacon_block_root, pubkey, sig) ok((fut, sig)) proc scheduleContributionChecks*( batchCrypto: ref BatchCrypto, fork: Fork, genesis_validators_root: Eth2Digest, signedContributionAndProof: SignedContributionAndProof, subcommitteeIndex: SyncSubcommitteeIndex, dag: ChainDAGRef): Result[tuple[ aggregatorFut, proofFut, contributionFut: Future[BatchResult], sig: CookedSig], cstring] = ## Schedule crypto verification of all signatures in a ## SignedContributionAndProof message ## ## The buffer is processed: ## - when eager processing is enabled and the batch is full ## - otherwise after 10ms (BatchAttAccumTime) ## ## This returns an error if crypto sanity checks failed ## and a future with the deferred check otherwise. ## template contribution_and_proof: untyped = signedContributionAndProof.message template contribution: untyped = contribution_and_proof.contribution # Do the eager steps first to avoid polluting batches with needlessly let aggregatorKey = dag.validatorKey(contribution_and_proof.aggregator_index).orReturnErr( "SignedAggregateAndProof: invalid contributor index") aggregatorSig = signedContributionAndProof.signature.load().orReturnErr( "SignedContributionAndProof: invalid proof signature") proofSig = contribution_and_proof.selection_proof.load().orReturnErr( "SignedContributionAndProof: invalid selection signature") contributionSig = contribution.signature.load().orReturnErr( "SignedContributionAndProof: invalid contribution signature") contributionKey = ? aggregateAll( dag, dag.syncCommitteeParticipants(contribution.slot, subcommitteeIndex), contribution.aggregation_bits) let aggregatorFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.aggregator"): batch.pendingBuffer.add_contribution_and_proof_signature( fork, genesis_validators_root, contribution_and_proof, aggregatorKey, aggregatorSig) proofFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.selection_proof"): batch.pendingBuffer.add_sync_committee_selection_proof( fork, genesis_validators_root, contribution.slot, contribution.subcommittee_index, aggregatorKey, proofSig) contributionFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.contribution"): batch.pendingBuffer.add_sync_committee_message_signature( fork, genesis_validators_root, contribution.slot, contribution.beacon_block_root, contributionKey, contributionSig) ok((aggregatorFut, proofFut, contributionFut, contributionSig))