diff --git a/beacon_chain/gossip_processing/batch_validation.nim b/beacon_chain/gossip_processing/batch_validation.nim new file mode 100644 index 000000000..72ff394b4 --- /dev/null +++ b/beacon_chain/gossip_processing/batch_validation.nim @@ -0,0 +1,282 @@ +# beacon_chain +# Copyright (c) 2019-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import + # Status + chronicles, chronos, + stew/results, + eth/keys, + # Internals + ../spec/[ + datatypes, crypto, digest, helpers, signatures_batch], + ../consensus_object_pools/[ + blockchain_dag, block_quarantine, + attestation_pool, exit_pool, + block_pools_types, spec_cache + ], + ".."/[beacon_node_types, ssz, beacon_clock] + +export BrHmacDrbgContext + +logScope: + topics = "gossip_checks" + +# Batched gossip validation +# ---------------------------------------------------------------- +{.push raises: [Defect].} + +type + BatchCrypto* = object + # The buffers are bounded by BatchedCryptoSize (16) which was chosen: + # - based on "nimble bench" in nim-blscurve + # so that low power devices like Raspberry Pi 4 can process + # that many batched verifications within 20ms + # - based on the accumulation rate of attestations and aggregates + # in large instances which were 12000 per slot (12s) + # hence 1 per ms (but the pattern is bursty around the 4s mark) + pendingBuffer: seq[SignatureSet] + resultsBuffer: seq[Future[Result[void, cstring]]] + sigVerifCache: BatchedBLSVerifierCache ##\ + ## A cache for batch BLS signature verification contexts + rng: ref BrHmacDrbgContext ##\ + ## A reference to the Nimbus application-wide RNG + +const + # We cap waiting for an idle slot in case there's a lot of network traffic + # taking up all CPU - we don't want to _completely_ stop processing blocks + # in this case (attestations will get dropped) - doing so also allows us + # to benefit from more batching / larger network reads when under load. + BatchAttAccumTime = 10.milliseconds + + # Threshold for immediate trigger of batch verification. + # A balance between throughput and worst case latency. + # At least 6 so that the constant factors + # (RNG for blinding and Final Exponentiation) + # are amortized, + # but not too big as we need to redo checks one-by-one if one failed. + BatchedCryptoSize = 16 + +proc new*(T: type BatchCrypto, rng: ref BrHmacDrbgContext): ref BatchCrypto = + (ref BatchCrypto)(rng: rng) + +func clear(batchCrypto: var BatchCrypto) = + ## Empty the crypto-pending attestations & aggregate queues + batchCrypto.pendingBuffer.setLen(0) + batchCrypto.resultsBuffer.setLen(0) + +proc done(batchCrypto: var BatchCrypto, idx: int) = + ## Send signal to [Attestation/Aggregate]Validator + ## that the attestation was crypto-verified (and so gossip validated) + ## with success + batchCrypto.resultsBuffer[idx].complete(Result[void, cstring].ok()) + +proc fail(batchCrypto: var BatchCrypto, idx: int, error: cstring) = + ## Send signal to [Attestation/Aggregate]Validator + ## that the attestation was NOT crypto-verified (and so NOT gossip validated) + batchCrypto.resultsBuffer[idx].complete(Result[void, cstring].err(error)) + +proc complete(batchCrypto: var BatchCrypto, idx: int, res: Result[void, cstring]) = + ## Send signal to [Attestation/Aggregate]Validator + batchCrypto.resultsBuffer[idx].complete(res) + +proc processBufferedCrypto(self: var BatchCrypto) = + ## Drain all attestations waiting for crypto verifications + + doAssert self.pendingBuffer.len == + self.resultsBuffer.len + + if self.pendingBuffer.len == 0: + return + + trace "batch crypto - starting", + batchSize = self.pendingBuffer.len + + let startTime = Moment.now() + + var secureRandomBytes: array[32, byte] + self.rng[].brHmacDrbgGenerate(secureRandomBytes) + + # TODO: For now only enable serial batch verification + let ok = batchVerifySerial( + self.sigVerifCache, + self.pendingBuffer, + secureRandomBytes) + + let stopTime = Moment.now() + + debug "batch crypto - finished", + batchSize = self.pendingBuffer.len, + cryptoVerified = ok, + dur = stopTime - startTime + + if ok: + for i in 0 ..< self.resultsBuffer.len: + self.done(i) + else: + debug "batch crypto - failure, falling back", + batchSize = self.pendingBuffer.len + for i in 0 ..< self.pendingBuffer.len: + let ok = blsVerify self.pendingBuffer[i] + if ok: + self.done(i) + else: + self.fail(i, "batch crypto verification: invalid signature") + + self.clear() + +proc deferCryptoProcessing(self: ref BatchCrypto, idleTimeout: Duration) {.async.} = + ## Process pending crypto check: + ## - if time threshold is reached + ## - or if networking is idle + + # TODO: how to cancel the scheduled `deferCryptoProcessing(BatchAttAccumTime)` ? + # when the buffer size threshold is reached? + # In practice this only happens when we receive a burst of attestations/aggregates. + # Though it's possible to reach the threshold 9ms in, + # and have only 1ms left for further accumulation. + await sleepAsync(idleTimeout) + self[].processBufferedCrypto() + +proc schedule(batchCrypto: ref BatchCrypto, fut: Future[Result[void, cstring]], checkThreshold = true) = + ## Schedule a cryptocheck for processing + ## + ## The buffer is processed: + ## - when 16 or more attestations/aggregates are buffered (BatchedCryptoSize) + ## - when there are no network events (idleAsync) + ## - otherwise after 10ms (BatchAttAccumTime) + + # Note: use the resultsBuffer size to detect the first item + # as pendingBuffer is appended to 3 by 3 in case of aggregates + + batchCrypto.resultsBuffer.add fut + + if batchCrypto.resultsBuffer.len == 1: + # First attestation to be scheduled in the batch + # wait for an idle time or up to 10ms before processing + trace "batch crypto - scheduling next", + deadline = BatchAttAccumTime + asyncSpawn batchCrypto.deferCryptoProcessing(BatchAttAccumTime) + elif checkThreshold and + batchCrypto.resultsBuffer.len >= BatchedCryptoSize: + # Reached the max buffer size, process immediately + # TODO: how to cancel the scheduled `deferCryptoProcessing(BatchAttAccumTime)` ? + batchCrypto[].processBufferedCrypto() + +proc scheduleAttestationCheck*( + batchCrypto: ref BatchCrypto, + fork: Fork, genesis_validators_root: Eth2Digest, + epochRef: EpochRef, + attestation: Attestation + ): Option[Future[Result[void, cstring]]] = + ## Schedule crypto verification of an attestation + ## + ## The buffer is processed: + ## - when 16 or more attestations/aggregates are buffered (BatchedCryptoSize) + ## - when there are no network events (idleAsync) + ## - otherwise after 10ms (BatchAttAccumTime) + ## + ## This returns None if crypto sanity checks failed + ## and a future with the deferred attestation check otherwise. + doAssert batchCrypto.pendingBuffer.len < BatchedCryptoSize + + let sanity = batchCrypto + .pendingBuffer + .addAttestation( + fork, genesis_validators_root, epochRef, + attestation + ) + if not sanity: + return none(Future[Result[void, cstring]]) + + let fut = newFuture[Result[void, cstring]]( + "batch_validation.scheduleAttestationCheck" + ) + + batchCrypto.schedule(fut) + + return some(fut) + +proc scheduleAggregateChecks*( + batchCrypto: ref BatchCrypto, + fork: Fork, genesis_validators_root: Eth2Digest, + epochRef: EpochRef, + signedAggregateAndProof: SignedAggregateAndProof + ): Option[tuple[slotCheck, aggregatorCheck, aggregateCheck: Future[Result[void, cstring]]]] = + ## Schedule crypto verification of an aggregate + ## + ## This involves 3 checks: + ## - verify_slot_signature + ## - verify_aggregate_and_proof_signature + ## - is_valid_indexed_attestation + ## + ## The buffer is processed: + ## - when 16 or more attestations/aggregates are buffered (BatchedCryptoSize) + ## - when there are no network events (idleAsync) + ## - otherwise after 10ms (BatchAttAccumTime) + ## + ## This returns None if crypto sanity checks failed + ## and 2 futures with the deferred aggregate checks otherwise. + doAssert batchCrypto.pendingBuffer.len < BatchedCryptoSize + + template aggregate_and_proof: untyped = signedAggregateAndProof.message + template aggregate: untyped = aggregate_and_proof.aggregate + + type R = tuple[slotCheck, aggregatorCheck, aggregateCheck: Future[Result[void, cstring]]] + + # Enqueue in the buffer + # ------------------------------------------------------ + let aggregator = epochRef.validator_keys[aggregate_and_proof.aggregator_index] + block: + let sanity = batchCrypto + .pendingBuffer + .addSlotSignature( + fork, genesis_validators_root, + aggregate.data.slot, + aggregator, + aggregate_and_proof.selection_proof + ) + if not sanity: + return none(R) + + block: + let sanity = batchCrypto + .pendingBuffer + .addAggregateAndProofSignature( + fork, genesis_validators_root, + aggregate_and_proof, + aggregator, + signed_aggregate_and_proof.signature + ) + if not sanity: + return none(R) + + block: + let sanity = batchCrypto + .pendingBuffer + .addAttestation( + fork, genesis_validators_root, epochRef, + aggregate + ) + if not sanity: + return none(R) + + let futSlot = newFuture[Result[void, cstring]]( + "batch_validation.scheduleAggregateChecks.slotCheck" + ) + let futAggregator = newFuture[Result[void, cstring]]( + "batch_validation.scheduleAggregateChecks.aggregatorCheck" + ) + + let futAggregate = newFuture[Result[void, cstring]]( + "batch_validation.scheduleAggregateChecks.aggregateCheck" + ) + + batchCrypto.schedule(futSlot, checkThreshold = false) + batchCrypto.schedule(futAggregator, checkThreshold = false) + batchCrypto.schedule(futAggregate) + + return some((futSlot, futAggregator, futAggregate)) diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index 0825ad8d8..d65922575 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -14,6 +14,7 @@ import ../spec/[crypto, datatypes, digest], ../consensus_object_pools/[block_clearance, blockchain_dag, exit_pool, attestation_pool], ./gossip_validation, ./gossip_to_consensus, + ./batch_validation, ../validators/validator_pool, ../beacon_node_types, ../beacon_clock, ../conf, ../ssz/sszdump @@ -70,6 +71,10 @@ type # ---------------------------------------------------------------- exitPool: ref ExitPool + # Almost validated, pending cryptographic signature check + # ---------------------------------------------------------------- + batchCrypto: ref BatchCrypto + # Missing information # ---------------------------------------------------------------- quarantine*: QuarantineRef @@ -85,6 +90,7 @@ proc new*(T: type Eth2Processor, exitPool: ref ExitPool, validatorPool: ref ValidatorPool, quarantine: QuarantineRef, + rng: ref BrHmacDrbgContext, getWallTime: GetWallTimeFn): ref Eth2Processor = (ref Eth2Processor)( config: config, @@ -94,7 +100,8 @@ proc new*(T: type Eth2Processor, attestationPool: attestationPool, exitPool: exitPool, validatorPool: validatorPool, - quarantine: quarantine + quarantine: quarantine, + batchCrypto: BatchCrypto.new(rng = rng) ) # Gossip Management @@ -179,11 +186,13 @@ proc checkForPotentialDoppelganger( warn "We believe you are currently running another instance of the same validator. We've disconnected you from the network as this presents a significant slashing risk. Possible next steps are (a) making sure you've disconnected your validator from your old machine before restarting the client; and (b) running the client again with the gossip-slashing-protection option disabled, only if you are absolutely sure this is the only instance of your validator running, and reporting the issue at https://github.com/status-im/nimbus-eth2/issues." quit QuitFailure +{.pop.} # async can raise anything + proc attestationValidator*( - self: var Eth2Processor, + self: ref Eth2Processor, attestation: Attestation, committeeIndex: uint64, - checksExpensive: bool = true): ValidationResult = + checksExpensive: bool = true): Future[ValidationResult] {.async.} = logScope: attestation = shortLog(attestation) committeeIndex @@ -201,7 +210,10 @@ proc attestationValidator*( # Potential under/overflows are fine; would just create odd metrics and logs let delay = wallTime - attestation.data.slot.toBeaconTime debug "Attestation received", delay - let v = self.attestationPool[].validateAttestation( + + # Now proceed to validation + let v = await self.attestationPool.validateAttestation( + self.batchCrypto, attestation, wallTime, committeeIndex, checksExpensive) if v.isErr(): debug "Dropping attestation", err = v.error() @@ -210,16 +222,16 @@ proc attestationValidator*( beacon_attestations_received.inc() beacon_attestation_delay.observe(delay.toFloatSeconds()) - self.checkForPotentialDoppelganger(attestation.data, v.value, wallSlot) + self[].checkForPotentialDoppelganger(attestation.data, v.value, wallSlot) trace "Attestation validated" self.verifQueues[].addAttestation(attestation, v.get()) - ValidationResult.Accept + return ValidationResult.Accept proc aggregateValidator*( - self: var Eth2Processor, - signedAggregateAndProof: SignedAggregateAndProof): ValidationResult = + self: ref Eth2Processor, + signedAggregateAndProof: SignedAggregateAndProof): Future[ValidationResult] {.async.} = logScope: aggregate = shortLog(signedAggregateAndProof.message.aggregate) signature = shortLog(signedAggregateAndProof.signature) @@ -239,7 +251,8 @@ proc aggregateValidator*( wallTime - signedAggregateAndProof.message.aggregate.data.slot.toBeaconTime debug "Aggregate received", delay - let v = self.attestationPool[].validateAggregate( + let v = await self.attestationPool.validateAggregate( + self.batchCrypto, signedAggregateAndProof, wallTime) if v.isErr: debug "Dropping aggregate", @@ -252,7 +265,7 @@ proc aggregateValidator*( beacon_aggregates_received.inc() beacon_aggregate_delay.observe(delay.toFloatSeconds()) - self.checkForPotentialDoppelganger( + self[].checkForPotentialDoppelganger( signedAggregateAndProof.message.aggregate.data, v.value, wallSlot) trace "Aggregate validated", @@ -262,7 +275,9 @@ proc aggregateValidator*( self.verifQueues[].addAggregate(signedAggregateAndProof, v.get()) - ValidationResult.Accept + return ValidationResult.Accept + +{.push raises: [Defect].} proc attesterSlashingValidator*( self: var Eth2Processor, attesterSlashing: AttesterSlashing): diff --git a/beacon_chain/gossip_processing/gossip_to_consensus.nim b/beacon_chain/gossip_processing/gossip_to_consensus.nim index d367ca248..dd93b5095 100644 --- a/beacon_chain/gossip_processing/gossip_to_consensus.nim +++ b/beacon_chain/gossip_processing/gossip_to_consensus.nim @@ -5,8 +5,6 @@ # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. -{.push raises: [Defect].} - import std/math, stew/results, @@ -37,7 +35,7 @@ type v: Attestation attesting_indices: seq[ValidatorIndex] - AggregateEntry* = AttestationEntry + AggregateEntry = AttestationEntry VerifQueueManager* = object ## This manages the queues of blocks and attestations. @@ -70,6 +68,9 @@ type # Producers # ---------------------------------------------------------------- blocksQueue*: AsyncQueue[BlockEntry] # Exported for "test_sync_manager" + # TODO: + # is there a point to separate + # attestations & aggregates here? attestationsQueue: AsyncQueue[AttestationEntry] aggregatesQueue: AsyncQueue[AggregateEntry] @@ -78,6 +79,8 @@ type consensusManager: ref ConsensusManager ## Blockchain DAG, AttestationPool and Quarantine +{.push raises: [Defect].} + # Initialization # ------------------------------------------------------------------------------ @@ -331,6 +334,10 @@ proc runQueueProcessingLoop*(self: ref VerifQueueManager) {.async.} = aggregateFut = self[].aggregatesQueue.popFirst() attestationFut = self[].attestationsQueue.popFirst() + # TODO: + # revisit `idleTimeout` + # and especially `attestationBatch` in light of batch validation + # in particular we might want `attestationBatch` to drain both attestation & aggregates while true: # Cooperative concurrency: one idle calculation step per loop - because # we run both networking and CPU-heavy things like block processing diff --git a/beacon_chain/gossip_processing/gossip_validation.nim b/beacon_chain/gossip_processing/gossip_validation.nim index 401776075..24b730208 100644 --- a/beacon_chain/gossip_processing/gossip_validation.nim +++ b/beacon_chain/gossip_processing/gossip_validation.nim @@ -8,9 +8,12 @@ {.push raises: [Defect].} import + # Standard library std/[sequtils, intsets, deques], - chronicles, + # Status + chronicles, chronos, stew/results, + # Internals ../spec/[ beaconstate, state_transition_block, datatypes, crypto, digest, helpers, network, signatures], @@ -20,11 +23,15 @@ import ], ".."/[beacon_node_types, ssz, beacon_clock], ../validators/attestation_aggregation, - ../extras + ../extras, + ./batch_validation logScope: topics = "gossip_checks" +# Internal checks +# ---------------------------------------------------------------- + func check_attestation_block( pool: AttestationPool, attestationSlot: Slot, blck: BlockRef): Result[void, (ValidationResult, cstring)] = @@ -148,12 +155,18 @@ func check_attestation_subnet( ok() +# Gossip Validation +# ---------------------------------------------------------------- +{.pop.} # async can raises anything + # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#beacon_attestation_subnet_id proc validateAttestation*( - pool: var AttestationPool, - attestation: Attestation, wallTime: BeaconTime, + pool: ref AttestationPool, + batchCrypto: ref BatchCrypto, + attestation: Attestation, + wallTime: BeaconTime, topicCommitteeIndex: uint64, checksExpensive: bool): - Result[seq[ValidatorIndex], (ValidationResult, cstring)] = + Future[Result[seq[ValidatorIndex], (ValidationResult, cstring)]] {.async.} = # Some of the checks below have been reordered compared to the spec, to # perform the cheap checks first - in particular, we want to avoid loading # an `EpochRef` and checking signatures. This reordering might lead to @@ -172,12 +185,18 @@ proc validateAttestation*( # attestation.data.slot + ATTESTATION_PROPAGATION_SLOT_RANGE >= current_slot # >= attestation.data.slot (a client MAY queue future attestations for # processing at the appropriate slot). - ? check_propagation_slot_range(attestation.data, wallTime) # [IGNORE] + block: + let v = check_propagation_slot_range(attestation.data, wallTime) # [IGNORE] + if v.isErr(): + return err(v.error) # The attestation is unaggregated -- that is, it has exactly one # participating validator (len([bit for bit in attestation.aggregation_bits # if bit == 0b1]) == 1). - ? check_aggregation_count(attestation, singular = true) # [REJECT] + block: + let v = check_aggregation_count(attestation, singular = true) # [REJECT] + if v.isErr(): + return err(v.error) # The block being voted for (attestation.data.beacon_block_root) has been seen # (via both gossip and non-gossip sources) (a client MAY queue attestations for @@ -185,7 +204,11 @@ proc validateAttestation*( # The block being voted for (attestation.data.beacon_block_root) passes # validation. # [IGNORE] if block is unseen so far and enqueue it in missing blocks - let target = ? check_beacon_and_target_block(pool, attestation.data) # [IGNORE/REJECT] + let target = block: + let v = check_beacon_and_target_block(pool[], attestation.data) # [IGNORE/REJECT] + if v.isErr(): + return err(v.error) + v.get() # The following rule follows implicitly from that we clear out any # unviable blocks from the chain dag: @@ -210,7 +233,10 @@ proc validateAttestation*( # committees_per_slot = get_committee_count_per_slot(state, # attestation.data.target.epoch), which may be pre-computed along with the # committee information for the signature check. - ? check_attestation_subnet(epochRef, attestation, topicCommitteeIndex) + block: + let v = check_attestation_subnet(epochRef, attestation, topicCommitteeIndex) # [REJECT] + if v.isErr(): + return err(v.error) # [REJECT] The number of aggregation bits matches the committee size -- i.e. # len(attestation.aggregation_bits) == len(get_beacon_committee(state, @@ -252,12 +278,29 @@ proc validateAttestation*( # The signature of attestation is valid. block: + # First pass - without cryptography let v = is_valid_indexed_attestation( fork, genesis_validators_root, epochRef, attesting_indices, - attestation, {}) + attestation, + {skipBLSValidation}) if v.isErr(): return err((ValidationResult.Reject, v.error)) + # Buffer crypto checks + let deferredCrypto = batchCrypto + .scheduleAttestationCheck( + fork, genesis_validators_root, epochRef, + attestation + ) + if deferredCrypto.isNone(): + return err((ValidationResult.Reject, + cstring("validateAttestation: crypto sanity checks failure"))) + + # Await the crypto check + let cryptoChecked = await deferredCrypto.get() + if cryptoChecked.isErr(): + return err((ValidationResult.Reject, cryptoChecked.error)) + # Only valid attestations go in the list, which keeps validator_index # in range if not (pool.nextAttestationEpoch.lenu64 > validator_index.uint64): @@ -265,13 +308,15 @@ proc validateAttestation*( pool.nextAttestationEpoch[validator_index].subnet = attestation.data.target.epoch + 1 - ok(attesting_indices) + return ok(attesting_indices) # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#beacon_aggregate_and_proof proc validateAggregate*( - pool: var AttestationPool, - signedAggregateAndProof: SignedAggregateAndProof, wallTime: BeaconTime): - Result[seq[ValidatorIndex], (ValidationResult, cstring)] = + pool: ref AttestationPool, + batchCrypto: ref BatchCrypto, + signedAggregateAndProof: SignedAggregateAndProof, + wallTime: BeaconTime): + Future[Result[seq[ValidatorIndex], (ValidationResult, cstring)]] {.async.} = # Some of the checks below have been reordered compared to the spec, to # perform the cheap checks first - in particular, we want to avoid loading # an `EpochRef` and checking signatures. This reordering might lead to @@ -291,7 +336,10 @@ proc validateAggregate*( # ATTESTATION_PROPAGATION_SLOT_RANGE slots (with a # MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance) -- i.e. aggregate.data.slot + # ATTESTATION_PROPAGATION_SLOT_RANGE >= current_slot >= aggregate.data.slot - ? check_propagation_slot_range(aggregate.data, wallTime) # [IGNORE] + block: + let v = check_propagation_slot_range(aggregate.data, wallTime) # [IGNORE] + if v.isErr(): + return err(v.error) # [IGNORE] The valid aggregate attestation defined by # hash_tree_root(aggregate) has not already been seen (via aggregate gossip, @@ -327,12 +375,19 @@ proc validateAggregate*( # members, i.e. they counts don't match. # But (2) would reflect an invalid aggregation in other ways, so reject it # either way. - ? check_aggregation_count(aggregate, singular = false) + block: + let v = check_aggregation_count(aggregate, singular = false) # [REJECT] + if v.isErr(): + return err(v.error) # [REJECT] The block being voted for (aggregate.data.beacon_block_root) # passes validation. # [IGNORE] if block is unseen so far and enqueue it in missing blocks - let target = ? check_beacon_and_target_block(pool, aggregate.data) + let target = block: + let v = check_beacon_and_target_block(pool[], aggregate.data) # [IGNORE/REJECT] + if v.isErr(): + return err(v.error) + v.get() # [REJECT] aggregate_and_proof.selection_proof selects the validator as an # aggregator for the slot -- i.e. is_aggregator(state, aggregate.data.slot, @@ -354,42 +409,47 @@ proc validateAggregate*( return err((ValidationResult.Reject, cstring( "Aggregator's validator index not in committee"))) - # [REJECT] The aggregate_and_proof.selection_proof is a valid signature of the - # aggregate.data.slot by the validator with index - # aggregate_and_proof.aggregator_index. - # get_slot_signature(state, aggregate.data.slot, privkey) - if aggregate_and_proof.aggregator_index >= epochRef.validator_keys.lenu64: - return err((ValidationResult.Reject, cstring("Invalid aggregator_index"))) - - let - fork = pool.chainDag.headState.data.data.fork - genesis_validators_root = - pool.chainDag.headState.data.data.genesis_validators_root - if not verify_slot_signature( - fork, genesis_validators_root, aggregate.data.slot, - epochRef.validator_keys[aggregate_and_proof.aggregator_index], - aggregate_and_proof.selection_proof): - return err((ValidationResult.Reject, cstring( - "Selection_proof signature verification failed"))) - - # [REJECT] The aggregator signature, signed_aggregate_and_proof.signature, is valid. - if not verify_aggregate_and_proof_signature( - fork, genesis_validators_root, aggregate_and_proof, - epochRef.validator_keys[aggregate_and_proof.aggregator_index], - signed_aggregate_and_proof.signature): - return err((ValidationResult.Reject, cstring( - "signed_aggregate_and_proof signature verification failed"))) - - let attesting_indices = get_attesting_indices( - epochRef, aggregate.data, aggregate.aggregation_bits) - - # [REJECT] The signature of aggregate is valid. block: - let v = is_valid_indexed_attestation( - fork, genesis_validators_root, epochRef, attesting_indices, - aggregate, {}) - if v.isErr(): - return err((ValidationResult.Reject, v.error)) + # 1. [REJECT] The aggregate_and_proof.selection_proof is a valid signature of the + # aggregate.data.slot by the validator with index + # aggregate_and_proof.aggregator_index. + # get_slot_signature(state, aggregate.data.slot, privkey) + # 2. [REJECT] The aggregator signature, signed_aggregate_and_proof.signature, is valid. + # 3. [REJECT] The signature of aggregate is valid. + if aggregate_and_proof.aggregator_index >= epochRef.validator_keys.lenu64: + return err((ValidationResult.Reject, cstring("Invalid aggregator_index"))) + + let + fork = pool.chainDag.headState.data.data.fork + genesis_validators_root = + pool.chainDag.headState.data.data.genesis_validators_root + + let deferredCrypto = batchCrypto + .scheduleAggregateChecks( + fork, genesis_validators_root, epochRef, + signed_aggregate_and_proof + ) + if deferredCrypto.isNone(): + return err((ValidationResult.Reject, + cstring("validateAttestation: crypto sanity checks failure"))) + + # [REJECT] aggregate_and_proof.selection_proof + let slotChecked = await deferredCrypto.get().slotCheck + if slotChecked.isErr(): + return err((ValidationResult.Reject, cstring( + "Selection_proof signature verification failed"))) + + # [REJECT] The aggregator signature, signed_aggregate_and_proof.signature, is valid. + let aggregatorChecked = await deferredCrypto.get().aggregatorCheck + if aggregatorChecked.isErr(): + return err((ValidationResult.Reject, cstring( + "signed_aggregate_and_proof aggregator signature verification failed"))) + + # [REJECT] The aggregator signature, signed_aggregate_and_proof.signature, is valid. + let aggregateChecked = await deferredCrypto.get().aggregateCheck + if aggregateChecked.isErr(): + return err((ValidationResult.Reject, cstring( + "signed_aggregate_and_proof aggregate attester signatures verification failed"))) # The following rule follows implicitly from that we clear out any # unviable blocks from the chain dag: @@ -407,7 +467,12 @@ proc validateAggregate*( pool.nextAttestationEpoch[aggregate_and_proof.aggregator_index].aggregate = aggregate.data.target.epoch + 1 - ok(attesting_indices) + let attesting_indices = get_attesting_indices( + epochRef, aggregate.data, aggregate.aggregation_bits) + + return ok(attesting_indices) + +{.push raises: [Defect].} # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#beacon_block proc isValidBeaconBlock*( diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 4ca7d17db..d6a8bba83 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -1704,6 +1704,40 @@ proc addValidator*[MsgType](node: Eth2Node, node.pubsub.addValidator(topic & "_snappy", execValidator) except Exception as exc: raiseAssert exc.msg # TODO fix libp2p +proc addAsyncValidator*[MsgType](node: Eth2Node, + topic: string, + msgValidator: proc(msg: MsgType): + Future[ValidationResult] {.gcsafe, raises: [Defect].} ) = + + proc execValidator( + topic: string, message: GossipMsg): Future[ValidationResult] {.raises: [Defect].} = + + inc nbc_gossip_messages_received + trace "Validating incoming gossip message", + len = message.data.len, topic + + let decompressed = snappy.decode(message.data, GOSSIP_MAX_SIZE) + + if decompressed.len == 0: + debug "Empty gossip data after decompression", + topic, len = message.data.len + result.complete(ValidationResult.Ignore) + return + + let decoded = try: + SSZ.decode(decompressed, MsgType) + except: + error "SSZ decoding failure", + topic, len = message.data.len + result.complete(ValidationResult.Ignore) + return + + return msgValidator(decoded) + + try: + node.pubsub.addValidator(topic & "_snappy", execValidator) + except Exception as exc: raiseAssert exc.msg # TODO fix libp2p + proc unsubscribe*(node: Eth2Node, topic: string) {.raises: [Defect, CatchableError].} = try: node.pubsub.unsubscribeAll(topic & "_snappy") diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 7daaea897..6d1d4b675 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -339,6 +339,7 @@ proc init*(T: type BeaconNode, verifQueues, chainDag, attestationPool, exitPool, validatorPool, quarantine, + rng, proc(): BeaconTime = beaconClock.now()) var res = BeaconNode( @@ -1167,16 +1168,16 @@ proc installMessageValidators(node: BeaconNode) = for it in 0'u64 ..< ATTESTATION_SUBNET_COUNT.uint64: closureScope: let ci = it - node.network.addValidator( + node.network.addAsyncValidator( getAttestationTopic(node.forkDigest, ci), # This proc needs to be within closureScope; don't lift out of loop. - proc(attestation: Attestation): ValidationResult = - node.processor[].attestationValidator(attestation, ci)) + proc(attestation: Attestation): Future[ValidationResult] = + node.processor.attestationValidator(attestation, ci)) - node.network.addValidator( + node.network.addAsyncValidator( getAggregateAndProofsTopic(node.forkDigest), - proc(signedAggregateAndProof: SignedAggregateAndProof): ValidationResult = - node.processor[].aggregateValidator(signedAggregateAndProof)) + proc(signedAggregateAndProof: SignedAggregateAndProof): Future[ValidationResult] = + node.processor.aggregateValidator(signedAggregateAndProof)) node.network.addValidator( node.topicBeaconBlocks, diff --git a/beacon_chain/spec/crypto.nim b/beacon_chain/spec/crypto.nim index f2cee59cc..b45a05f84 100644 --- a/beacon_chain/spec/crypto.nim +++ b/beacon_chain/spec/crypto.nim @@ -166,6 +166,16 @@ proc blsVerify*( parsedKey.isSome() and parsedKey.get.verify(message, parsedSig.get()) +proc blsVerify*(sigSet: SignatureSet): bool = + ## Unbatched verification + ## of 1 SignatureSet + ## tuple[pubkey: blscurve.PublicKey, message: array[32, byte], blscurve.signature: Signature] + verify( + sigSet.pubkey, + sigSet.message, + sigSet.signature + ) + func blsSign*(privkey: ValidatorPrivKey, message: openArray[byte]): ValidatorSig = ## Computes a signature from a secret key and a message let sig = SecretKey(privkey).sign(message) diff --git a/beacon_chain/spec/signatures_batch.nim b/beacon_chain/spec/signatures_batch.nim index 521db04a3..cdf583b2f 100644 --- a/beacon_chain/spec/signatures_batch.nim +++ b/beacon_chain/spec/signatures_batch.nim @@ -89,11 +89,35 @@ proc aggregateAttesters( aggPK.finish(attestersAgg) return true +proc aggregateAttesters( + aggPK: var blscurve.PublicKey, + attestation: IndexedAttestation, + epochRef: auto + ): bool = + mixin validator_keys + + doAssert attestation.attesting_indices.len > 0 + var attestersAgg{.noInit.}: AggregatePublicKey + attestersAgg.init(epochRef.validator_keys[attestation.attesting_indices[0]] + .pubkey.loadWithCacheOrExitFalse()) + for i in 1 ..< attestation.attesting_indices.len: + attestersAgg.aggregate(epochRef.validator_keys[attestation.attesting_indices[i]] + .pubkey.loadWithCacheOrExitFalse()) + aggPK.finish(attestersAgg) + return true + proc addIndexedAttestation( sigs: var seq[SignatureSet], attestation: IndexedAttestation, state: BeaconState ): bool = + ## Add an indexed attestation for batched BLS verification + ## purposes + ## This only verifies cryptography, checking that + ## the indices are sorted and unique is not checked for example. + ## + ## Returns true if the indexed attestations was added to the batching buffer + ## Returns false if saniy checks failed (non-empty, keys are valid) if attestation.attesting_indices.len == 0: # Aggregation spec requires non-empty collection # - https://tools.ietf.org/html/draft-irtf-cfrg-bls-signature-04 @@ -156,6 +180,125 @@ proc addAttestation( return false return true +# Public API +# ------------------------------------------------------ + +proc addAttestation*( + sigs: var seq[SignatureSet], + fork: Fork, genesis_validators_root: Eth2Digest, + epochRef: auto, + attestation: Attestation + ): bool = + ## Add an attestation for batched BLS verification + ## purposes + ## This only verifies cryptography + ## + ## Returns true if the attestation was added to the batching buffer + ## Returns false if saniy checks failed (non-empty, keys are valid) + ## In that case the seq[SignatureSet] is unmodified + mixin get_attesting_indices, validator_keys, pubkey + + result = false + + var attestersAgg{.noInit.}: AggregatePublicKey + for valIndex in epochRef.get_attesting_indices( + attestation.data, + attestation.aggregation_bits): + if not result: # first iteration + attestersAgg.init(epochRef.validator_keys[valIndex] + .loadWithCacheOrExitFalse()) + result = true + else: + attestersAgg.aggregate(epochRef.validator_keys[valIndex] + .loadWithCacheOrExitFalse()) + + if not result: + # There was no attesters + return false + + var attesters{.noinit.}: blscurve.PublicKey + attesters.finish(attestersAgg) + + return sigs.addSignatureSet( + attesters, + attestation.data, + attestation.signature, + genesis_validators_root, + fork, + attestation.data.target.epoch, + DOMAIN_BEACON_ATTESTER) + +proc addIndexedAttestation*( + sigs: var seq[SignatureSet], + fork: Fork, genesis_validators_root: Eth2Digest, + epochRef: auto, + attestation: IndexedAttestation, + ): bool = + ## Add an indexed attestation for batched BLS verification + ## purposes + ## This only verifies cryptography, checking that + ## the indices are sorted and unique is not checked for example. + ## + ## Returns true if the indexed attestations was added to the batching buffer + ## Returns false if saniy checks failed (non-empty, keys are valid) + ## In that case the seq[SignatureSet] is unmodified + if attestation.attesting_indices.len == 0: + # Aggregation spec requires non-empty collection + # - https://tools.ietf.org/html/draft-irtf-cfrg-bls-signature-04 + # Eth2 spec requires at least one attesting indice in slashing + # - https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/beacon-chain.md#is_valid_indexed_attestation + return false + + var aggPK {.noInit.}: blscurve.PublicKey + if not aggPK.aggregateAttesters(attestation, epochRef): + return false + + return sigs.addSignatureSet( + aggPK, + attestation.data, + attestation.signature, + genesis_validators_root, + fork, + attestation.data.target.epoch, + DOMAIN_BEACON_ATTESTER) + +proc addSlotSignature*( + sigs: var seq[SignatureSet], + fork: Fork, genesis_validators_root: Eth2Digest, + slot: Slot, + pubkey: ValidatorPubKey, + signature: ValidatorSig): bool = + + let epoch = compute_epoch_at_slot(slot) + return sigs.addSignatureSet( + pubkey.loadWithCacheOrExitFalse(), + sszObj = slot, + signature, + genesis_validators_root, + fork, + epoch, + DOMAIN_SELECTION_PROOF + ) + +proc addAggregateAndProofSignature*( + sigs: var seq[SignatureSet], + fork: Fork, genesis_validators_root: Eth2Digest, + aggregate_and_proof: AggregateAndProof, + pubkey: ValidatorPubKey, + signature: ValidatorSig + ): bool = + + let epoch = compute_epoch_at_slot(aggregate_and_proof.aggregate.data.slot) + return sigs.addSignatureSet( + pubkey.loadWithCacheOrExitFalse(), + sszObj = aggregate_and_proof, + signature, + genesis_validators_root, + fork, + epoch, + DOMAIN_AGGREGATE_AND_PROOF + ) + proc collectSignatureSets*( sigs: var seq[SignatureSet], signed_block: SignedBeaconBlock, diff --git a/beacon_chain/validators/validator_duties.nim b/beacon_chain/validators/validator_duties.nim index e9b2d931f..aa27a420b 100644 --- a/beacon_chain/validators/validator_duties.nim +++ b/beacon_chain/validators/validator_duties.nim @@ -167,7 +167,7 @@ proc sendAttestation*( getAttestationTopic(node.forkDigest, subnet_index), attestation) # Ensure node's own broadcast attestations end up in its attestation pool - discard node.processor[].attestationValidator( + discard node.processor.attestationValidator( attestation, subnet_index, false) beacon_attestations_sent.inc() diff --git a/tests/test_attestation_pool.nim b/tests/test_attestation_pool.nim index 42e50cf3e..089cb6c62 100644 --- a/tests/test_attestation_pool.nim +++ b/tests/test_attestation_pool.nim @@ -18,7 +18,7 @@ import ../beacon_chain/spec/[crypto, datatypes, digest, validator, state_transition, helpers, beaconstate, presets, network], ../beacon_chain/[beacon_node_types, extras, beacon_clock], - ../beacon_chain/gossip_processing/gossip_validation, + ../beacon_chain/gossip_processing/[gossip_validation, batch_validation], ../beacon_chain/fork_choice/[fork_choice_types, fork_choice], ../beacon_chain/consensus_object_pools/[block_quarantine, blockchain_dag, block_clearance, attestation_pool], # Test utilities @@ -398,6 +398,7 @@ suiteReport "Attestation validation " & preset(): pool = newClone(AttestationPool.init(chainDag, quarantine)) state = newClone(chainDag.headState) cache = StateCache() + batchCrypto = BatchCrypto.new(keys.newRng()) # Slot 0 is a finalized slot - won't be making attestations for it.. check: process_slots(state.data, state.data.data.slot + 1, cache) @@ -439,27 +440,27 @@ suiteReport "Attestation validation " & preset(): beaconTime = attestation.data.slot.toBeaconTime() check: - validateAttestation(pool[], attestation, beaconTime, subnet, true).isOk + validateAttestation(pool, batchCrypto, attestation, beaconTime, subnet, true).waitFor().isOk # Same validator again - validateAttestation(pool[], attestation, beaconTime, subnet, true).error()[0] == + validateAttestation(pool, batchCrypto, attestation, beaconTime, subnet, true).waitFor().error()[0] == ValidationResult.Ignore pool[].nextAttestationEpoch.setLen(0) # reset for test check: # Wrong subnet - validateAttestation(pool[], attestation, beaconTime, subnet + 1, true).isErr + validateAttestation(pool, batchCrypto, attestation, beaconTime, subnet + 1, true).waitFor().isErr pool[].nextAttestationEpoch.setLen(0) # reset for test check: # Too far in the future validateAttestation( - pool[], attestation, beaconTime - 1.seconds, subnet + 1, true).isErr + pool, batchCrypto, attestation, beaconTime - 1.seconds, subnet + 1, true).waitFor().isErr pool[].nextAttestationEpoch.setLen(0) # reset for test check: # Too far in the past validateAttestation( - pool[], attestation, + pool, batchCrypto, attestation, beaconTime - (SECONDS_PER_SLOT * SLOTS_PER_EPOCH - 1).int.seconds, - subnet + 1, true).isErr + subnet + 1, true).waitFor().isErr