From 7dba1b37dd3723059bf76896892367c62d5b1f2a Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Mon, 26 Apr 2021 22:39:44 +0200 Subject: [PATCH] remove attestation/aggregate queue (#2519) With the introduction of batching and lazy attestation aggregation, it no longer makes sense to enqueue attestations between the signature check and adding them to the attestation pool - this only takes up valuable CPU without any real benefit. * add successfully validated attestations to attestion pool directly * avoid copying participant list around for single-vote attestations, pass single validator index instead * release decompressed gossip memory earlier, specially during async message validation * use cooked signatures in a few more places to avoid reloads and errors * remove some Defect-raising versions of signature-loading * release decompressed data memory before validating message --- AllTests-mainnet.md | 10 +- .../attestation_pool.nim | 19 +- .../consensus_object_pools/spec_cache.nim | 21 ++ beacon_chain/fork_choice/fork_choice.nim | 16 +- .../fork_choice/fork_choice_types.nim | 32 +- .../gossip_processing/batch_validation.nim | 334 ++++++++++-------- .../gossip_processing/eth2_processor.nim | 23 +- .../gossip_processing/gossip_to_consensus.nim | 198 +---------- .../gossip_processing/gossip_validation.nim | 84 +++-- beacon_chain/interop.nim | 2 +- beacon_chain/networking/eth2_network.nim | 50 +-- beacon_chain/nimbus_beacon_node.nim | 2 +- beacon_chain/nimbus_signing_process.nim | 2 +- beacon_chain/spec/crypto.nim | 80 ++--- beacon_chain/spec/keystore.nim | 2 +- beacon_chain/spec/signatures.nim | 14 +- beacon_chain/spec/signatures_batch.nim | 160 +++------ beacon_chain/validators/validator_pool.nim | 45 +-- research/block_sim.nim | 9 +- research/state_sim.nim | 6 +- tests/all_tests.nim | 1 + tests/mocking/mock_attestations.nim | 2 +- tests/mocking/mock_blocks.nim | 6 +- tests/mocking/mock_deposits.nim | 2 +- tests/test_attestation_pool.nim | 91 +---- tests/test_gossip_validation.nim | 149 ++++++++ tests/test_interop.nim | 2 +- tests/testblockutil.nim | 22 +- 28 files changed, 662 insertions(+), 722 deletions(-) create mode 100644 tests/test_gossip_validation.nim diff --git a/AllTests-mainnet.md b/AllTests-mainnet.md index 494cccb35..1f7d4dc0c 100644 --- a/AllTests-mainnet.md +++ b/AllTests-mainnet.md @@ -15,11 +15,6 @@ AllTests-mainnet + Working with aggregates [Preset: mainnet] OK ``` OK: 11/11 Fail: 0/11 Skip: 0/11 -## Attestation validation [Preset: mainnet] -```diff -+ Validation sanity OK -``` -OK: 1/1 Fail: 0/1 Skip: 0/1 ## Beacon chain DB [Preset: mainnet] ```diff + empty database [Preset: mainnet] OK @@ -98,6 +93,11 @@ OK: 3/3 Fail: 0/3 Skip: 0/3 + fork_choice - testing with votes OK ``` OK: 4/4 Fail: 0/4 Skip: 0/4 +## Gossip validation [Preset: mainnet] +```diff ++ Validation sanity OK +``` +OK: 1/1 Fail: 0/1 Skip: 0/1 ## Honest validator ```diff + General pubsub topics OK diff --git a/beacon_chain/consensus_object_pools/attestation_pool.nim b/beacon_chain/consensus_object_pools/attestation_pool.nim index 642481227..abd6ff21b 100644 --- a/beacon_chain/consensus_object_pools/attestation_pool.nim +++ b/beacon_chain/consensus_object_pools/attestation_pool.nim @@ -95,11 +95,12 @@ proc init*(T: type AttestationPool, chainDag: ChainDAGRef, quarantine: Quarantin ) proc addForkChoiceVotes( - pool: var AttestationPool, slot: Slot, participants: seq[ValidatorIndex], - block_root: Eth2Digest, wallSlot: Slot) = + pool: var AttestationPool, slot: Slot, + attesting_indices: openArray[ValidatorIndex], block_root: Eth2Digest, + wallSlot: Slot) = # Add attestation votes to fork choice if (let v = pool.forkChoice.on_attestation( - pool.chainDag, slot, block_root, participants, wallSlot); + pool.chainDag, slot, block_root, attesting_indices, wallSlot); v.isErr): # This indicates that the fork choice and the chain dag are out of sync - # this is most likely the result of a bug, but we'll try to keep going - @@ -154,7 +155,7 @@ func toAttestation(entry: AttestationEntry, validation: Validation): Attestation Attestation( aggregation_bits: validation.aggregation_bits, data: entry.data, - signature: validation.aggregate_signature.finish().exportRaw() + signature: validation.aggregate_signature.finish().toValidatorSig() ) func updateAggregates(entry: var AttestationEntry) = @@ -262,7 +263,7 @@ proc addAttestation(entry: var AttestationEntry, proc addAttestation*(pool: var AttestationPool, attestation: Attestation, - participants: seq[ValidatorIndex], + attesting_indices: openArray[ValidatorIndex], signature: CookedSig, wallSlot: Slot) = ## Add an attestation to the pool, assuming it's been validated already. @@ -273,7 +274,7 @@ proc addAttestation*(pool: var AttestationPool, logScope: attestation = shortLog(attestation) - doAssert attestation.signature == signature.exportRaw(), + doAssert attestation.signature == signature.toValidatorSig(), "Deserialized signature must match the one in the attestation" updateCurrent(pool, wallSlot) @@ -303,8 +304,8 @@ proc addAttestation*(pool: var AttestationPool, return pool.addForkChoiceVotes( - attestation.data.slot, participants, attestation.data.beacon_block_root, - wallSlot) + attestation.data.slot, attesting_indices, + attestation.data.beacon_block_root, wallSlot) proc addForkChoice*(pool: var AttestationPool, epochRef: EpochRef, @@ -343,7 +344,7 @@ iterator attestations*(pool: AttestationPool, slot: Option[Slot], for index, signature in entry.singles: singleAttestation.aggregation_bits.setBit(index) - singleAttestation.signature = signature.exportRaw() + singleAttestation.signature = signature.toValidatorSig() yield singleAttestation singleAttestation.aggregation_bits.clearBit(index) diff --git a/beacon_chain/consensus_object_pools/spec_cache.nim b/beacon_chain/consensus_object_pools/spec_cache.nim index e0b430c2b..52c90a0d3 100644 --- a/beacon_chain/consensus_object_pools/spec_cache.nim +++ b/beacon_chain/consensus_object_pools/spec_cache.nim @@ -78,6 +78,27 @@ iterator get_attesting_indices*(epochRef: EpochRef, yield index inc i +func get_attesting_indices_one*(epochRef: EpochRef, + data: AttestationData, + bits: CommitteeValidatorsBits): + Option[ValidatorIndex] = + # A variation on get_attesting_indices that returns the validator index only + # if only one validator index is set + if bits.lenu64 != get_beacon_committee_len(epochRef, data.slot, data.index.CommitteeIndex): + trace "get_attesting_indices: inconsistent aggregation and committee length" + none(ValidatorIndex) + else: + var res = none(ValidatorIndex) + var i = 0 + for index in get_beacon_committee(epochRef, data.slot, data.index.CommitteeIndex): + if bits[i]: + if res.isNone(): + res = some(index) + else: + return none(ValidatorIndex) + inc i + res + # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/beacon-chain.md#get_attesting_indices func get_attesting_indices*(epochRef: EpochRef, data: AttestationData, diff --git a/beacon_chain/fork_choice/fork_choice.nim b/beacon_chain/fork_choice/fork_choice.nim index 1993830c5..ea1cc80c2 100644 --- a/beacon_chain/fork_choice/fork_choice.nim +++ b/beacon_chain/fork_choice/fork_choice.nim @@ -40,9 +40,6 @@ func compute_deltas( old_balances: openArray[Gwei], new_balances: openArray[Gwei] ): FcResult[void] -# TODO: raises [Defect] - once https://github.com/nim-lang/Nim/issues/12862 is fixed -# https://github.com/status-im/nimbus-eth2/pull/865#pullrequestreview-389117232 - # Fork choice routines # ---------------------------------------------------------------------- @@ -71,7 +68,8 @@ proc init*(T: type ForkChoice, epoch = epochRef.epoch, blck = shortLog(blck) let - justified = BalanceCheckpoint(blck: blck, epoch: epochRef.epoch, balances: epochRef.effective_balances) + justified = BalanceCheckpoint( + blck: blck, epoch: epochRef.epoch, balances: epochRef.effective_balances) finalized = Checkpoint(root: blck.root, epoch: epochRef.epoch) best_justified = Checkpoint( root: justified.blck.root, epoch: justified.epoch) @@ -176,7 +174,7 @@ proc on_attestation*( dag: ChainDAGRef, attestation_slot: Slot, beacon_block_root: Eth2Digest, - attesting_indices: seq[ValidatorIndex], + attesting_indices: openArray[ValidatorIndex], wallSlot: Slot ): FcResult[void] = ? self.update_time(dag, wallSlot) @@ -188,12 +186,14 @@ proc on_attestation*( for validator_index in attesting_indices: # attestation_slot and target epoch must match, per attestation rules self.backend.process_attestation( - validator_index.ValidatorIndex, beacon_block_root, - attestation_slot.epoch) + validator_index, beacon_block_root, attestation_slot.epoch) else: + # Spec: + # Attestations can only affect the fork choice of subsequent slots. + # Delay consideration in the fork choice until their slot is in the past. self.queuedAttestations.add(QueuedAttestation( slot: attestation_slot, - attesting_indices: attesting_indices, + attesting_indices: @attesting_indices, block_root: beacon_block_root)) ok() diff --git a/beacon_chain/fork_choice/fork_choice_types.nim b/beacon_chain/fork_choice/fork_choice_types.nim index d632c65ab..d164d1054 100644 --- a/beacon_chain/fork_choice/fork_choice_types.nim +++ b/beacon_chain/fork_choice/fork_choice_types.nim @@ -34,40 +34,18 @@ type ## Fork Choice Error Kinds fcFinalizedNodeUnknown fcJustifiedNodeUnknown - fcInvalidFinalizedRootChange fcInvalidNodeIndex - fcInvalidParentIndex - fcInvalidBestChildIndex fcInvalidJustifiedIndex fcInvalidBestDescendant fcInvalidParentDelta fcInvalidNodeDelta fcDeltaUnderflow - fcIndexUnderflow fcInvalidDeltaLen - fcRevertedFinalizedEpoch fcInvalidBestNode fcInconsistentTick - # ------------------------- - # TODO: Extra error modes beyond Proto/Lighthouse to be reviewed fcUnknownParent fcPruningFromOutdatedFinalizedRoot - AttErrorKind* = enum - attFromFuture - attFromPast - attBadTargetEpoch - attUnkownTarget - attUnknownBlock - attWrongTarget - attFutureSlot - - FcUnderflowKind* = enum - ## Fork Choice Overflow Kinds - fcUnderflowIndices = "Indices Overflow" - fcUnderflowBestChild = "Best Child Overflow" - fcUnderflowBestDescendant = "Best Descendant Overflow" - Index* = int Delta* = int64 ## Delta balances @@ -77,26 +55,18 @@ type of fcFinalizedNodeUnknown, fcJustifiedNodeUnknown: blockRoot*: Eth2Digest - of fcInvalidFinalizedRootChange, - fcInconsistentTick: + of fcInconsistentTick: discard of fcInvalidNodeIndex, - fcInvalidParentIndex, - fcInvalidBestChildIndex, fcInvalidJustifiedIndex, fcInvalidBestDescendant, fcInvalidParentDelta, fcInvalidNodeDelta, fcDeltaUnderflow: index*: Index - of fcIndexUnderflow: - underflowKind*: FcUnderflowKind of fcInvalidDeltaLen: deltasLen*: int indicesLen*: int - of fcRevertedFinalizedEpoch: - currentFinalizedEpoch*: Epoch - new_finalized_epoch*: Epoch of fcInvalidBestNode: startRoot*: Eth2Digest justifiedEpoch*: Epoch diff --git a/beacon_chain/gossip_processing/batch_validation.nim b/beacon_chain/gossip_processing/batch_validation.nim index e52058c30..6fea1c4e8 100644 --- a/beacon_chain/gossip_processing/batch_validation.nim +++ b/beacon_chain/gossip_processing/batch_validation.nim @@ -5,6 +5,8 @@ # * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. +{.push raises: [Defect].} + import # Status chronicles, chronos, @@ -27,29 +29,51 @@ logScope: # Batched gossip validation # ---------------------------------------------------------------- -{.push raises: [Defect].} type + BatchResult* {.pure.} = enum + Valid + Invalid + Timeout + + Eager = proc(): bool {.gcsafe, raises: [Defect].} ##\ + ## Callback that returns true if eager processing should be done to lower + ## latency at the expense of spending more cycles validating things, creating + ## a crude timesharing priority mechanism. + + Batch* = object + created: Moment + pendingBuffer: seq[SignatureSet] + resultsBuffer: seq[Future[BatchResult]] + BatchCrypto* = object - # The buffers are bounded by BatchedCryptoSize (16) which was chosen: + # Each batch is bounded by BatchedCryptoSize (16) which was chosen: # - based on "nimble bench" in nim-blscurve # so that low power devices like Raspberry Pi 4 can process # that many batched verifications within 20ms # - based on the accumulation rate of attestations and aggregates # in large instances which were 12000 per slot (12s) # hence 1 per ms (but the pattern is bursty around the 4s mark) - pendingBuffer: seq[SignatureSet] - resultsBuffer: seq[Future[Result[void, cstring]]] + # The number of batches is bounded by time - batch validation is skipped if + # we can't process them in the time that one slot takes, and we return + # timeout instead which prevents the gossip layer from forwarding the + # batch. + batches: seq[ref Batch] + eager: Eager ##\ + ## Eager is used to enable eager processing of attestations when it's + ## prudent to do so (instead of leaving the CPU for other, presumably more + ## important work like block processing) sigVerifCache: BatchedBLSVerifierCache ##\ ## A cache for batch BLS signature verification contexts rng: ref BrHmacDrbgContext ##\ ## A reference to the Nimbus application-wide RNG + pruneTime: Moment ## :ast time we had to prune something const # We cap waiting for an idle slot in case there's a lot of network traffic - # taking up all CPU - we don't want to _completely_ stop processing blocks - # in this case (attestations will get dropped) - doing so also allows us - # to benefit from more batching / larger network reads when under load. + # taking up all CPU - we don't want to _completely_ stop processing + # attestations - doing so also allows us to benefit from more batching / + # larger network reads when under load. BatchAttAccumTime = 10.milliseconds # Threshold for immediate trigger of batch verification. @@ -60,145 +84,174 @@ const # but not too big as we need to redo checks one-by-one if one failed. BatchedCryptoSize = 16 -proc new*(T: type BatchCrypto, rng: ref BrHmacDrbgContext): ref BatchCrypto = - (ref BatchCrypto)(rng: rng) +proc new*( + T: type BatchCrypto, rng: ref BrHmacDrbgContext, eager: Eager): ref BatchCrypto = + (ref BatchCrypto)(rng: rng, eager: eager, pruneTime: Moment.now()) -func clear(batchCrypto: var BatchCrypto) = - ## Empty the crypto-pending attestations & aggregate queues - batchCrypto.pendingBuffer.setLen(0) - batchCrypto.resultsBuffer.setLen(0) +func len(batch: Batch): int = + doAssert batch.resultsBuffer.len() == batch.pendingBuffer.len() + batch.resultsBuffer.len() -proc done(batchCrypto: var BatchCrypto, idx: int) = - ## Send signal to [Attestation/Aggregate]Validator - ## that the attestation was crypto-verified (and so gossip validated) - ## with success - batchCrypto.resultsBuffer[idx].complete(Result[void, cstring].ok()) +func full(batch: Batch): bool = + batch.len() >= BatchedCryptoSize -proc fail(batchCrypto: var BatchCrypto, idx: int, error: cstring) = - ## Send signal to [Attestation/Aggregate]Validator - ## that the attestation was NOT crypto-verified (and so NOT gossip validated) - batchCrypto.resultsBuffer[idx].complete(Result[void, cstring].err(error)) +proc clear(batch: var Batch) = + batch.pendingBuffer.setLen(0) + batch.resultsBuffer.setLen(0) -proc complete(batchCrypto: var BatchCrypto, idx: int, res: Result[void, cstring]) = - ## Send signal to [Attestation/Aggregate]Validator - batchCrypto.resultsBuffer[idx].complete(res) +proc skip(batch: var Batch) = + for res in batch.resultsBuffer.mitems(): + res.complete(BatchResult.Timeout) + batch.clear() # release memory early -proc processBufferedCrypto(self: var BatchCrypto) = - ## Drain all attestations waiting for crypto verifications +proc pruneBatchQueue(batchCrypto: ref BatchCrypto) = + let + now = Moment.now() - doAssert self.pendingBuffer.len == - self.resultsBuffer.len + # If batches haven't been processed for more than 12 seconds + while batchCrypto.batches.len() > 0: + if batchCrypto.batches[0][].created + SECONDS_PER_SLOT.int64.seconds > now: + break + if batchCrypto.pruneTime + SECONDS_PER_SLOT.int64.seconds > now: + notice "Batch queue pruned, skipping attestation validation", + batches = batchCrypto.batches.len() + batchCrypto.pruneTime = Moment.now() + batchCrypto.batches[0][].skip() + batchCrypto.batches.delete(0) - if self.pendingBuffer.len == 0: +proc processBatch(batchCrypto: ref BatchCrypto) = + ## Process one batch, if there is any + + # Pruning the queue here makes sure we catch up with processing if need be + batchCrypto.pruneBatchQueue() # Skip old batches + + if batchCrypto[].batches.len() == 0: + # No more batches left, they might have been eagerly processed or pruned + return + + let + batch = batchCrypto[].batches[0] + batchSize = batch[].len() + batchCrypto[].batches.del(0) + + if batchSize == 0: + # Nothing to do in this batch, can happen when a batch is created without + # there being any signatures successfully added to it return trace "batch crypto - starting", - batchSize = self.pendingBuffer.len + batchSize let startTime = Moment.now() var secureRandomBytes: array[32, byte] - self.rng[].brHmacDrbgGenerate(secureRandomBytes) + batchCrypto[].rng[].brHmacDrbgGenerate(secureRandomBytes) # TODO: For now only enable serial batch verification let ok = batchVerifySerial( - self.sigVerifCache, - self.pendingBuffer, + batchCrypto.sigVerifCache, + batch.pendingBuffer, secureRandomBytes) let stopTime = Moment.now() - debug "batch crypto - finished", - batchSize = self.pendingBuffer.len, + trace "batch crypto - finished", + batchSize, cryptoVerified = ok, dur = stopTime - startTime if ok: - for i in 0 ..< self.resultsBuffer.len: - self.done(i) + for res in batch.resultsBuffer.mitems(): + res.complete(BatchResult.Valid) else: + # Batched verification failed meaning that some of the signature checks + # failed, but we don't know which ones - check each signature separately + # instead debug "batch crypto - failure, falling back", - batchSize = self.pendingBuffer.len - for i in 0 ..< self.pendingBuffer.len: - let ok = blsVerify self.pendingBuffer[i] - if ok: - self.done(i) - else: - self.fail(i, "batch crypto verification: invalid signature") + batchSize + for i, res in batch.resultsBuffer.mpairs(): + let ok = blsVerify batch[].pendingBuffer[i] + res.complete(if ok: BatchResult.Valid else: BatchResult.Invalid) - self.clear() + batch[].clear() # release memory early -proc deferCryptoProcessing(self: ref BatchCrypto, idleTimeout: Duration) {.async.} = - ## Process pending crypto check: - ## - if time threshold is reached - ## - or if networking is idle +proc deferCryptoProcessing(batchCrypto: ref BatchCrypto) {.async.} = + ## Process pending crypto check after some time has passed - the time is + ## chosen such that there's time to fill the batch but not so long that + ## latency across the network is negatively affected + await sleepAsync(BatchAttAccumTime) - # TODO: how to cancel the scheduled `deferCryptoProcessing(BatchAttAccumTime)` ? - # when the buffer size threshold is reached? - # In practice this only happens when we receive a burst of attestations/aggregates. - # Though it's possible to reach the threshold 9ms in, - # and have only 1ms left for further accumulation. - await sleepAsync(idleTimeout) - self[].processBufferedCrypto() + # Take the first batch in the queue and process it - if eager processing has + # stolen it already, that's fine + batchCrypto.processBatch() -proc schedule(batchCrypto: ref BatchCrypto, fut: Future[Result[void, cstring]], checkThreshold = true) = - ## Schedule a cryptocheck for processing - ## - ## The buffer is processed: - ## - when 16 or more attestations/aggregates are buffered (BatchedCryptoSize) - ## - when there are no network events (idleAsync) - ## - otherwise after 10ms (BatchAttAccumTime) +proc getBatch(batchCrypto: ref BatchCrypto): (ref Batch, bool) = + # Get a batch suitable for attestation processing - in particular, attestation + # batches might be skipped + batchCrypto.pruneBatchQueue() - # Note: use the resultsBuffer size to detect the first item - # as pendingBuffer is appended to 3 by 3 in case of aggregates + if batchCrypto.batches.len() == 0 or + batchCrypto.batches[^1][].full(): + # There are no batches in progress - start a new batch and schedule a + # deferred task to eventually handle it + let batch = (ref Batch)(created: Moment.now()) + batchCrypto[].batches.add(batch) + (batch, true) + else: + let batch = batchCrypto[].batches[^1] + # len will be 0 when the batch was created but nothing added to it + # because of early failures + (batch, batch[].len() == 0) +proc scheduleBatch(batchCrypto: ref BatchCrypto, fresh: bool) = + if fresh: + # Every time we start a new round of batching, we need to launch a deferred + # task that will compute the result of the batch eventually in case the + # batch is never filled or eager processing is blocked + asyncSpawn batchCrypto.deferCryptoProcessing() - batchCrypto.resultsBuffer.add fut - - if batchCrypto.resultsBuffer.len == 1: - # First attestation to be scheduled in the batch - # wait for an idle time or up to 10ms before processing - trace "batch crypto - scheduling next", - deadline = BatchAttAccumTime - asyncSpawn batchCrypto.deferCryptoProcessing(BatchAttAccumTime) - elif checkThreshold and - batchCrypto.resultsBuffer.len >= BatchedCryptoSize: - # Reached the max buffer size, process immediately - # TODO: how to cancel the scheduled `deferCryptoProcessing(BatchAttAccumTime)` ? - batchCrypto[].processBufferedCrypto() + if batchCrypto.batches.len() > 0 and + batchCrypto.batches[0][].full() and + batchCrypto.eager(): + # If there's a full batch, process it eagerly assuming the callback allows + batchCrypto.processBatch() proc scheduleAttestationCheck*( batchCrypto: ref BatchCrypto, fork: Fork, genesis_validators_root: Eth2Digest, epochRef: EpochRef, attestation: Attestation - ): Option[(Future[Result[void, cstring]], CookedSig)] = + ): Option[(Future[BatchResult], CookedSig)] = ## Schedule crypto verification of an attestation ## ## The buffer is processed: - ## - when 16 or more attestations/aggregates are buffered (BatchedCryptoSize) - ## - when there are no network events (idleAsync) + ## - when eager processing is enabled and the batch is full ## - otherwise after 10ms (BatchAttAccumTime) ## ## This returns None if crypto sanity checks failed ## and a future with the deferred attestation check otherwise. - doAssert batchCrypto.pendingBuffer.len < BatchedCryptoSize + ## + let (batch, fresh) = batchCrypto.getBatch() - let (sanity, sig) = batchCrypto - .pendingBuffer - .addAttestation( - fork, genesis_validators_root, epochRef, - attestation - ) - if not sanity: - return none((Future[Result[void, cstring]], CookedSig)) + doAssert batch.pendingBuffer.len < BatchedCryptoSize - let fut = newFuture[Result[void, cstring]]( + let sig = batch + .pendingBuffer + .addAttestation( + fork, genesis_validators_root, epochRef, + attestation + ) + if not sig.isSome(): + return none((Future[BatchResult], CookedSig)) + + let fut = newFuture[BatchResult]( "batch_validation.scheduleAttestationCheck" ) - batchCrypto.schedule(fut) + batch[].resultsBuffer.add(fut) - return some((fut, sig)) + batchCrypto.scheduleBatch(fresh) + + return some((fut, sig.get())) proc scheduleAggregateChecks*( batchCrypto: ref BatchCrypto, @@ -207,7 +260,7 @@ proc scheduleAggregateChecks*( signedAggregateAndProof: SignedAggregateAndProof ): Option[( tuple[slotCheck, aggregatorCheck, aggregateCheck: - Future[Result[void, cstring]]], + Future[BatchResult]], CookedSig)] = ## Schedule crypto verification of an aggregate ## @@ -217,71 +270,74 @@ proc scheduleAggregateChecks*( ## - is_valid_indexed_attestation ## ## The buffer is processed: - ## - when 16 or more attestations/aggregates are buffered (BatchedCryptoSize) - ## - when there are no network events (idleAsync) + ## - when eager processing is enabled and the batch is full ## - otherwise after 10ms (BatchAttAccumTime) ## - ## This returns None if crypto sanity checks failed - ## and 2 futures with the deferred aggregate checks otherwise. - doAssert batchCrypto.pendingBuffer.len < BatchedCryptoSize + ## This returns None if the signatures could not be loaded. + ## and 3 futures with the deferred aggregate checks otherwise. + let (batch, fresh) = batchCrypto.getBatch() + + doAssert batch[].pendingBuffer.len < BatchedCryptoSize template aggregate_and_proof: untyped = signedAggregateAndProof.message template aggregate: untyped = aggregate_and_proof.aggregate type R = ( tuple[slotCheck, aggregatorCheck, aggregateCheck: - Future[Result[void, cstring]]], + Future[BatchResult]], CookedSig) # Enqueue in the buffer # ------------------------------------------------------ let aggregator = epochRef.validator_keys[aggregate_and_proof.aggregator_index] block: - let sanity = batchCrypto - .pendingBuffer - .addSlotSignature( - fork, genesis_validators_root, - aggregate.data.slot, - aggregator, - aggregate_and_proof.selection_proof - ) - if not sanity: + if not batch + .pendingBuffer + .addSlotSignature( + fork, genesis_validators_root, + aggregate.data.slot, + aggregator, + aggregate_and_proof.selection_proof + ): return none(R) - - block: - let sanity = batchCrypto - .pendingBuffer - .addAggregateAndProofSignature( - fork, genesis_validators_root, - aggregate_and_proof, - aggregator, - signed_aggregate_and_proof.signature - ) - if not sanity: - return none(R) - - let (sanity, sig) = batchCrypto - .pendingBuffer - .addAttestation( - fork, genesis_validators_root, epochRef, - aggregate - ) - if not sanity: - return none(R) - - let futSlot = newFuture[Result[void, cstring]]( + let futSlot = newFuture[BatchResult]( "batch_validation.scheduleAggregateChecks.slotCheck" ) - let futAggregator = newFuture[Result[void, cstring]]( + batch.resultsBuffer.add(futSlot) + + block: + if not batch + .pendingBuffer + .addAggregateAndProofSignature( + fork, genesis_validators_root, + aggregate_and_proof, + aggregator, + signed_aggregate_and_proof.signature + ): + batchCrypto.scheduleBatch(fresh) + return none(R) + + let futAggregator = newFuture[BatchResult]( "batch_validation.scheduleAggregateChecks.aggregatorCheck" ) - let futAggregate = newFuture[Result[void, cstring]]( + batch.resultsBuffer.add(futAggregator) + + let sig = batch + .pendingBuffer + .addAttestation( + fork, genesis_validators_root, epochRef, + aggregate + ) + if not sig.isSome(): + batchCrypto.scheduleBatch(fresh) + return none(R) + + let futAggregate = newFuture[BatchResult]( "batch_validation.scheduleAggregateChecks.aggregateCheck" ) + batch.resultsBuffer.add(futAggregate) - batchCrypto.schedule(futSlot, checkThreshold = false) - batchCrypto.schedule(futAggregator, checkThreshold = false) - batchCrypto.schedule(futAggregate) + batchCrypto.scheduleBatch(fresh) - return some(((futSlot, futAggregator, futAggregate), sig)) + return some(((futSlot, futAggregator, futAggregate), sig.get())) diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index e1a696ecb..55f55bb0c 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -98,7 +98,11 @@ proc new*(T: type Eth2Processor, exitPool: exitPool, validatorPool: validatorPool, quarantine: quarantine, - batchCrypto: BatchCrypto.new(rng = rng) + batchCrypto: BatchCrypto.new( + rng = rng, + # Only run eager attestation signature verification if we're not + # processing blocks in order to give priority to block processing + eager = proc(): bool = not verifQueues[].hasBlocks()) ) # Gossip Management @@ -219,12 +223,14 @@ proc attestationValidator*( beacon_attestations_received.inc() beacon_attestation_delay.observe(delay.toFloatSeconds()) + let (attestation_index, sig) = v.get() + self[].checkForPotentialDoppelganger( - attestation.data, v.value.attestingIndices, wallSlot) + attestation.data, [attestation_index], wallSlot) trace "Attestation validated" - let (attestingIndices, sig) = v.get() - self.verifQueues[].addAttestation(attestation, attestingIndices, sig) + self.attestationPool[].addAttestation( + attestation, [attestation_index], sig, wallSlot) return ValidationResult.Accept @@ -264,8 +270,10 @@ proc aggregateValidator*( beacon_aggregates_received.inc() beacon_aggregate_delay.observe(delay.toFloatSeconds()) + let (attesting_indices, sig) = v.get() + self[].checkForPotentialDoppelganger( - signedAggregateAndProof.message.aggregate.data, v.value.attestingIndices, + signedAggregateAndProof.message.aggregate.data, attesting_indices, wallSlot) trace "Aggregate validated", @@ -273,9 +281,8 @@ proc aggregateValidator*( selection_proof = signedAggregateAndProof.message.selection_proof, wallSlot - let (attestingIndices, sig) = v.get() - self.verifQueues[].addAggregate( - signedAggregateAndProof, attestingIndices, sig) + self.attestationPool[].addAttestation( + signedAggregateAndProof.message.aggregate, attesting_indices, sig, wallSlot) return ValidationResult.Accept diff --git a/beacon_chain/gossip_processing/gossip_to_consensus.nim b/beacon_chain/gossip_processing/gossip_to_consensus.nim index 612289bbe..aeb84cbe3 100644 --- a/beacon_chain/gossip_processing/gossip_to_consensus.nim +++ b/beacon_chain/gossip_processing/gossip_to_consensus.nim @@ -22,12 +22,6 @@ import declareHistogram beacon_store_block_duration_seconds, "storeBlock() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf] -declareCounter beacon_attestations_dropped_queue_full, - "Number of attestations dropped because queue is full" - -declareCounter beacon_aggregates_dropped_queue_full, - "Number of aggregates dropped because queue is full" - type SyncBlock* = object blk*: SignedBeaconBlock @@ -75,15 +69,6 @@ type # Producers # ---------------------------------------------------------------- blocksQueue*: AsyncQueue[BlockEntry] # Exported for "test_sync_manager" - # TODO: - # is there a point to separate - # attestations & aggregates here? - attestationsQueue: AsyncQueue[AttestationEntry] - attestationsDropped: int - attestationsDropTime: tuple[afterGenesis: bool, slot: Slot] - aggregatesQueue: AsyncQueue[AggregateEntry] - aggregatesDropped: int - aggregatesDropTime: tuple[afterGenesis: bool, slot: Slot] # Consumer # ---------------------------------------------------------------- @@ -107,21 +92,8 @@ proc new*(T: type VerifQueueManager, getWallTime: getWallTime, - blocksQueue: newAsyncQueue[BlockEntry](1), - # limit to the max number of aggregates we expect to see in one slot - aggregatesQueue: newAsyncQueue[AggregateEntry]( - (TARGET_AGGREGATORS_PER_COMMITTEE * MAX_COMMITTEES_PER_SLOT).int), - # This queue is a bit harder to bound reasonably - we want to get a good - # spread of votes across committees - ideally at least TARGET_COMMITTEE_SIZE - # per committee - assuming randomness in vote arrival, this limit should - # cover that but of course, when votes arrive depends on a number of - # factors that are not entire random - attestationsQueue: newAsyncQueue[AttestationEntry]( - (TARGET_COMMITTEE_SIZE * MAX_COMMITTEES_PER_SLOT).int), - + blocksQueue: newAsyncQueue[BlockEntry](), consensusManager: consensusManager, - attestationsDropTime: getWallTime().toSlot(), - aggregatesDropTime: getWallTime().toSlot(), ) # Sync callbacks @@ -145,86 +117,25 @@ proc complete*(blk: SyncBlock, res: Result[void, BlockError]) = if blk.resfut != nil: blk.resfut.complete(res) +proc hasBlocks*(self: VerifQueueManager): bool = + self.blocksQueue.len() > 0 + # Enqueue # ------------------------------------------------------------------------------ proc addBlock*(self: var VerifQueueManager, syncBlock: SyncBlock) = ## Enqueue a Gossip-validated block for consensus verification # Backpressure: - # If no item can be enqueued because buffer is full, - # we suspend here. + # There is no backpressure here - producers must wait for the future in the + # SyncBlock to constrain their own processing # Producers: # - Gossip (when synced) # - SyncManager (during sync) # - RequestManager (missing ancestor blocks) - # addLast doesn't fail - asyncSpawn(self.blocksQueue.addLast(BlockEntry(v: syncBlock))) - -proc addAttestation*( - self: var VerifQueueManager, att: Attestation, - att_indices: seq[ValidatorIndex], sig: CookedSig) = - ## Enqueue a Gossip-validated attestation for consensus verification - # Backpressure: - # If buffer is full, the oldest attestation is dropped and the newest is enqueued - # Producer: - # - Gossip (when synced) - while self.attestationsQueue.full(): - self.attestationsDropped += 1 - beacon_attestations_dropped_queue_full.inc() - - try: - discard self.attestationsQueue.popFirstNoWait() - except AsyncQueueEmptyError as exc: - raiseAssert "If queue is full, we have at least one item! " & exc.msg - - if self.attestationsDropped > 0: - let now = self.getWallTime().toSlot() # Print notice once per slot - if now != self.attestationsDropTime: - notice "Queue full, attestations dropped", - count = self.attestationsDropped - self.attestationsDropTime = now - self.attestationsDropped = 0 - - try: - self.attestationsQueue.addLastNoWait( - AttestationEntry(v: att, attesting_indices: att_indices, sig: sig)) - except AsyncQueueFullError as exc: - raiseAssert "We just checked that queue is not full! " & exc.msg - -proc addAggregate*( - self: var VerifQueueManager, agg: SignedAggregateAndProof, - att_indices: seq[ValidatorIndex], sig: CookedSig) = - ## Enqueue a Gossip-validated aggregate attestation for consensus verification - # Backpressure: - # If buffer is full, the oldest aggregate is dropped and the newest is enqueued - # Producer: - # - Gossip (when synced) - - while self.aggregatesQueue.full(): - self.aggregatesDropped += 1 - beacon_aggregates_dropped_queue_full.inc() - - try: - discard self.aggregatesQueue.popFirstNoWait() - except AsyncQueueEmptyError as exc: - raiseAssert "We just checked that queue is not full! " & exc.msg - - if self.aggregatesDropped > 0: - let now = self.getWallTime().toSlot() # Print notice once per slot - if now != self.aggregatesDropTime: - notice "Queue full, aggregates dropped", - count = self.aggregatesDropped - self.aggregatesDropTime = now - self.aggregatesDropped = 0 - - try: - self.aggregatesQueue.addLastNoWait(AggregateEntry( - v: agg.message.aggregate, - attesting_indices: att_indices, - sig: sig)) - except AsyncQueueFullError as exc: - raiseAssert "We just checked that queue is not full! " & exc.msg + # addLast doesn't fail with unbounded queues, but we'll add asyncSpawn as a + # sanity check + asyncSpawn self.blocksQueue.addLast(BlockEntry(v: syncBlock)) # Storage # ------------------------------------------------------------------------------ @@ -272,40 +183,6 @@ proc storeBlock( # Event Loop # ------------------------------------------------------------------------------ -proc processAttestation( - self: var VerifQueueManager, entry: AttestationEntry) = - logScope: - signature = shortLog(entry.v.signature) - - let - wallTime = self.getWallTime() - (afterGenesis, wallSlot) = wallTime.toSlot() - - if not afterGenesis: - error "Processing attestation before genesis, clock turned back?" - quit 1 - - trace "Processing attestation" - self.consensusManager.attestationPool[].addAttestation( - entry.v, entry.attesting_indices, entry.sig, wallSlot) - -proc processAggregate( - self: var VerifQueueManager, entry: AggregateEntry) = - logScope: - signature = shortLog(entry.v.signature) - - let - wallTime = self.getWallTime() - (afterGenesis, wallSlot) = wallTime.toSlot() - - if not afterGenesis: - error "Processing aggregate before genesis, clock turned back?" - quit 1 - - trace "Processing aggregate" - self.consensusManager.attestationPool[].addAttestation( - entry.v, entry.attesting_indices, entry.sig, wallSlot) - proc processBlock(self: var VerifQueueManager, entry: BlockEntry) = logScope: blockRoot = shortLog(entry.v.blk.root) @@ -360,67 +237,18 @@ proc processBlock(self: var VerifQueueManager, entry: BlockEntry) = entry.v.resFut.complete(Result[void, BlockError].err(res.error())) proc runQueueProcessingLoop*(self: ref VerifQueueManager) {.async.} = - # Blocks in eth2 arrive on a schedule for every slot: - # - # * Block arrives at time 0 - # * Attestations arrives at time 4 - # * Aggregate arrives at time 8 - - var - blockFut = self[].blocksQueue.popFirst() - aggregateFut = self[].aggregatesQueue.popFirst() - attestationFut = self[].attestationsQueue.popFirst() - - # TODO: - # revisit `idleTimeout` - # and especially `attestationBatch` in light of batch validation - # in particular we might want `attestationBatch` to drain both attestation & aggregates while true: - # Cooperative concurrency: one idle calculation step per loop - because + # Cooperative concurrency: one block per loop iteration - because # we run both networking and CPU-heavy things like block processing # on the same thread, we need to make sure that there is steady progress # on the networking side or we get long lockups that lead to timeouts. const # We cap waiting for an idle slot in case there's a lot of network traffic # taking up all CPU - we don't want to _completely_ stop processing blocks - # in this case (attestations will get dropped) - doing so also allows us - # to benefit from more batching / larger network reads when under load. + # in this case - doing so also allows us to benefit from more batching / + # larger network reads when under load. idleTimeout = 10.milliseconds - # Attestation processing is fairly quick and therefore done in batches to - # avoid some of the `Future` overhead - attestationBatch = 16 - discard await idleAsync().withTimeout(idleTimeout) - # Avoid one more `await` when there's work to do - if not (blockFut.finished or aggregateFut.finished or attestationFut.finished): - trace "Waiting for processing work" - await blockFut or aggregateFut or attestationFut - - # Only run one task per idle iteration, in priority order: blocks are needed - # for all other processing - then come aggregates which are cheap to - # process but might have a big impact on fork choice - last come - # attestations which individually have the smallest effect on chain progress - if blockFut.finished: - self[].processBlock(blockFut.read()) - blockFut = self[].blocksQueue.popFirst() - elif aggregateFut.finished: - # aggregates will be dropped under heavy load on producer side - self[].processAggregate(aggregateFut.read()) - for i in 0.. 0: - msgValidator(SSZ.decode(decompressed, MsgType)) + let decoded = SSZ.decode(decompressed, MsgType) + decompressed = newSeq[byte](0) # release memory before validating + msgValidator(decoded) else: debug "Empty gossip data after decompression", topic, len = message.data.len @@ -1689,7 +1696,7 @@ proc addValidator*[MsgType](node: Eth2Node, debug "Gossip validation error", msg = err.msg, topic, len = message.data.len ValidationResult.Ignore - return res + return newValidationResultFuture(res) try: node.pubsub.addValidator(topic & "_snappy", execValidator) @@ -1699,31 +1706,28 @@ proc addAsyncValidator*[MsgType](node: Eth2Node, topic: string, msgValidator: proc(msg: MsgType): Future[ValidationResult] {.gcsafe, raises: [Defect].} ) = - proc execValidator( topic: string, message: GossipMsg): Future[ValidationResult] {.raises: [Defect].} = - inc nbc_gossip_messages_received trace "Validating incoming gossip message", len = message.data.len, topic - let decompressed = snappy.decode(message.data, GOSSIP_MAX_SIZE) - - if decompressed.len == 0: - debug "Empty gossip data after decompression", - topic, len = message.data.len - result.complete(ValidationResult.Ignore) - return - - let decoded = try: - SSZ.decode(decompressed, MsgType) - except: - error "SSZ decoding failure", - topic, len = message.data.len - result.complete(ValidationResult.Ignore) - return - - return msgValidator(decoded) + let res = + try: + var decompressed = snappy.decode(message.data, GOSSIP_MAX_SIZE) + if decompressed.len > 0: + let decoded = SSZ.decode(decompressed, MsgType) + decompressed = newSeq[byte](0) # release memory before validating + return msgValidator(decoded) # Reuses future from msgValidator + else: + debug "Empty gossip data after decompression", + topic, len = message.data.len + ValidationResult.Ignore + except CatchableError as err: + debug "Gossip validation error", + msg = err.msg, topic, len = message.data.len + ValidationResult.Ignore + return newValidationResultFuture(res) try: node.pubsub.addValidator(topic & "_snappy", execValidator) diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 450a2836a..85779b913 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -1598,7 +1598,7 @@ proc handleValidatorExitCommand(config: BeaconNodeConf) {.async.} = validator_index: validatorIdx)) signedExit.signature = get_voluntary_exit_signature( - fork, genesisValidatorsRoot, signedExit.message, signingKey.get) + fork, genesisValidatorsRoot, signedExit.message, signingKey.get).toValidatorSig() template ask(prompt: string): string = try: diff --git a/beacon_chain/nimbus_signing_process.nim b/beacon_chain/nimbus_signing_process.nim index 5ad5a1e2e..060f2e591 100644 --- a/beacon_chain/nimbus_signing_process.nim +++ b/beacon_chain/nimbus_signing_process.nim @@ -33,4 +33,4 @@ programMain: let privKey = validators[ValidatorPubKey.fromHex(args[0]).get()] - echo blsSign(privKey, Eth2Digest.fromHex(args[1]).data) + echo blsSign(privKey, Eth2Digest.fromHex(args[1]).data).toValidatorSig() diff --git a/beacon_chain/spec/crypto.nim b/beacon_chain/spec/crypto.nim index b73091498..6b768552b 100644 --- a/beacon_chain/spec/crypto.nim +++ b/beacon_chain/spec/crypto.nim @@ -50,7 +50,7 @@ type # signatures lazily - this helps operations like comparisons and hashes to # be fast (which is important), makes loading blocks and states fast, and # allows invalid values in the SSZ byte stream, which is valid from an SSZ - # point of view - the invalid values are later processed to + # point of view - the invalid values are caught later ValidatorPubKey* = object blob*: array[RawPubKeySize, byte] @@ -69,10 +69,9 @@ type SomeSig* = TrustedSig | ValidatorSig CookedSig* = distinct blscurve.Signature ## \ - ## Allows loading in an atttestation or other message's signature once across - ## all its computations, rather than repeatedly re-loading it each time it is - ## referenced. This primarily currently serves the attestation pool. - + ## Cooked signatures are those that have been loaded successfully from a + ## ValidatorSig and are used to avoid expensive reloading as well as error + ## checking export AggregateSignature # API @@ -108,36 +107,26 @@ proc loadWithCache*(v: ValidatorPubKey): Option[blscurve.PublicKey] = else: none blscurve.PublicKey -proc load*(v: ValidatorSig): Option[blscurve.Signature] = +proc load*(v: ValidatorSig): Option[CookedSig] = ## Parse signature blob - this may fail var parsed: blscurve.Signature if fromBytes(parsed, v.blob): - some(parsed) + some(CookedSig(parsed)) else: - none(blscurve.Signature) - -func init*(agg: var AggregateSignature, sig: ValidatorSig) {.inline.}= - ## Initializes an aggregate signature context - ## This assumes that the signature is valid - agg.init(sig.load().get()) + none(CookedSig) func init*(agg: var AggregateSignature, sig: CookedSig) {.inline.}= ## Initializes an aggregate signature context agg.init(blscurve.Signature(sig)) -func init*(T: type AggregateSignature, sig: CookedSig | ValidatorSig): T = +func init*(T: type AggregateSignature, sig: CookedSig): T = result.init(sig) -proc aggregate*(agg: var AggregateSignature, sig: ValidatorSig) {.inline.}= - ## Aggregate two Validator Signatures - ## Both signatures must be valid - agg.aggregate(sig.load.get()) - proc aggregate*(agg: var AggregateSignature, sig: CookedSig) {.inline.}= - ## Aggregate two Validator Signatures + ## Aggregate two valid Validator Signatures agg.aggregate(blscurve.Signature(sig)) -func finish*(agg: AggregateSignature): CookedSig {.inline.}= +func finish*(agg: AggregateSignature): CookedSig {.inline.} = ## Canonicalize an AggregateSignature into a signature var sig: blscurve.Signature sig.finish(agg) @@ -146,7 +135,7 @@ func finish*(agg: AggregateSignature): CookedSig {.inline.}= # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/beacon-chain.md#bls-signatures proc blsVerify*( pubkey: ValidatorPubKey, message: openArray[byte], - signature: ValidatorSig): bool = + signature: CookedSig): bool = ## Check that a signature is valid for a message ## under the provided public key. ## returns `true` if the signature is valid, `false` otherwise. @@ -155,19 +144,23 @@ proc blsVerify*( ## It is recommended to use the overload that accepts a proof-of-possession ## to enforce correct usage. let - parsedSig = signature.load() + parsedKey = pubkey.loadWithCache() + + # It may happen that signatures or keys fail to parse as invalid blobs may + # be passed around - for example, the deposit contract doesn't verify + # signatures, so the loading happens lazily at verification time instead! + parsedKey.isSome() and + parsedKey.get.verify(message, blscurve.Signature(signature)) + +proc blsVerify*( + pubkey: ValidatorPubKey, message: openArray[byte], + signature: ValidatorSig): bool = + let parsedSig = signature.load() if parsedSig.isNone(): false else: - let - parsedKey = pubkey.loadWithCache() - - # It may happen that signatures or keys fail to parse as invalid blobs may - # be passed around - for example, the deposit contract doesn't verify - # signatures, so the loading happens lazily at verification time instead! - parsedKey.isSome() and - parsedKey.get.verify(message, parsedSig.get()) + blsVerify(pubkey, message, parsedSig.get()) proc blsVerify*(sigSet: SignatureSet): bool = ## Unbatched verification @@ -179,15 +172,14 @@ proc blsVerify*(sigSet: SignatureSet): bool = sigSet.signature ) -func blsSign*(privkey: ValidatorPrivKey, message: openArray[byte]): ValidatorSig = +func blsSign*(privkey: ValidatorPrivKey, message: openArray[byte]): CookedSig = ## Computes a signature from a secret key and a message - let sig = SecretKey(privkey).sign(message) - ValidatorSig(blob: sig.exportRaw()) + CookedSig(SecretKey(privkey).sign(message)) proc blsFastAggregateVerify*( publicKeys: openArray[ValidatorPubKey], message: openArray[byte], - signature: ValidatorSig + signature: CookedSig ): bool = ## Verify the aggregate of multiple signatures on the same message ## This function is faster than AggregateVerify @@ -207,9 +199,6 @@ proc blsFastAggregateVerify*( # in blscurve which already exists internally # - or at network/databases/serialization boundaries we do not # allow invalid BLS objects to pollute consensus routines - let parsedSig = signature.load() - if not parsedSig.isSome(): - return false var unwrapped: seq[PublicKey] for pubkey in publicKeys: let realkey = pubkey.loadWithCache() @@ -217,7 +206,18 @@ proc blsFastAggregateVerify*( return false unwrapped.add realkey.get - fastAggregateVerify(unwrapped, message, parsedSig.get()) + fastAggregateVerify(unwrapped, message, blscurve.Signature(signature)) + +proc blsFastAggregateVerify*( + publicKeys: openArray[ValidatorPubKey], + message: openArray[byte], + signature: ValidatorSig + ): bool = + let parsedSig = signature.load() + if not parsedSig.isSome(): + false + else: + blsFastAggregateVerify(publicKeys, message, parsedSig.get()) proc toGaugeValue*(hash: Eth2Digest): int64 = # Only the last 8 bytes are taken into consideration in accordance @@ -252,7 +252,7 @@ template toRaw*(x: TrustedSig): auto = func toHex*(x: BlsCurveType): string = toHex(toRaw(x)) -func exportRaw*(x: CookedSig): ValidatorSig = +func toValidatorSig*(x: CookedSig): ValidatorSig = ValidatorSig(blob: blscurve.Signature(x).exportRaw()) func fromRaw*(T: type ValidatorPrivKey, bytes: openArray[byte]): BlsResult[T] = diff --git a/beacon_chain/spec/keystore.nim b/beacon_chain/spec/keystore.nim index 054128fdd..dc46e728d 100644 --- a/beacon_chain/spec/keystore.nim +++ b/beacon_chain/spec/keystore.nim @@ -761,5 +761,5 @@ proc prepareDeposit*(preset: RuntimePreset, pubkey: signingPubKey, withdrawal_credentials: makeWithdrawalCredentials(withdrawalPubKey)) - res.signature = preset.get_deposit_signature(res, signingKey) + res.signature = preset.get_deposit_signature(res, signingKey).toValidatorSig() return res diff --git a/beacon_chain/spec/signatures.nim b/beacon_chain/spec/signatures.nim index a62f3ba1b..43c0a4dd3 100644 --- a/beacon_chain/spec/signatures.nim +++ b/beacon_chain/spec/signatures.nim @@ -35,7 +35,7 @@ func compute_slot_root*( # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#aggregation-selection func get_slot_signature*( fork: Fork, genesis_validators_root: Eth2Digest, slot: Slot, - privkey: ValidatorPrivKey): ValidatorSig = + privkey: ValidatorPrivKey): CookedSig = blsSign(privKey, compute_slot_root(fork, genesis_validators_root, slot).data) proc verify_slot_signature*( @@ -60,7 +60,7 @@ func compute_epoch_root*( # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#randao-reveal func get_epoch_signature*( fork: Fork, genesis_validators_root: Eth2Digest, epoch: Epoch, - privkey: ValidatorPrivKey): ValidatorSig = + privkey: ValidatorPrivKey): CookedSig = blsSign(privKey, compute_epoch_root(fork, genesis_validators_root, epoch).data) proc verify_epoch_signature*( @@ -85,7 +85,7 @@ func compute_block_root*( # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#signature func get_block_signature*( fork: Fork, genesis_validators_root: Eth2Digest, slot: Slot, - root: Eth2Digest, privkey: ValidatorPrivKey): ValidatorSig = + root: Eth2Digest, privkey: ValidatorPrivKey): CookedSig = blsSign(privKey, compute_block_root(fork, genesis_validators_root, slot, root).data) proc verify_block_signature*( @@ -114,7 +114,7 @@ func compute_aggregate_and_proof_root*(fork: Fork, genesis_validators_root: Eth2 # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#broadcast-aggregate func get_aggregate_and_proof_signature*(fork: Fork, genesis_validators_root: Eth2Digest, aggregate_and_proof: AggregateAndProof, - privKey: ValidatorPrivKey): ValidatorSig = + privKey: ValidatorPrivKey): CookedSig = blsSign(privKey, compute_aggregate_and_proof_root(fork, genesis_validators_root, aggregate_and_proof).data) @@ -144,7 +144,7 @@ func compute_attestation_root*( func get_attestation_signature*( fork: Fork, genesis_validators_root: Eth2Digest, attestation_data: AttestationData, - privkey: ValidatorPrivKey): ValidatorSig = + privkey: ValidatorPrivKey): CookedSig = blsSign(privKey, compute_attestation_root(fork, genesis_validators_root, attestation_data).data) @@ -165,7 +165,7 @@ proc verify_attestation_signature*( # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/beacon-chain.md#deposits func get_deposit_signature*(preset: RuntimePreset, deposit: DepositData, - privkey: ValidatorPrivKey): ValidatorSig = + privkey: ValidatorPrivKey): CookedSig = let deposit_message = deposit.getDepositMessage() # Fork-agnostic domain since deposits are valid across forks @@ -188,7 +188,7 @@ func get_voluntary_exit_signature*( fork: Fork, genesis_validators_root: Eth2Digest, voluntary_exit: VoluntaryExit, - privkey: ValidatorPrivKey): ValidatorSig = + privkey: ValidatorPrivKey): CookedSig = let domain = get_domain( fork, DOMAIN_VOLUNTARY_EXIT, voluntary_exit.epoch, genesis_validators_root) diff --git a/beacon_chain/spec/signatures_batch.nim b/beacon_chain/spec/signatures_batch.nim index b8e5d7c85..4d74ea60b 100644 --- a/beacon_chain/spec/signatures_batch.nim +++ b/beacon_chain/spec/signatures_batch.nim @@ -30,7 +30,7 @@ func `$`*(s: SignatureSet): string = # unlike when Nimbus did eager loading which ensured they were correct beforehand template loadOrExit(signature: ValidatorSig, failReturn: auto): - blscurve.Signature = + CookedSig = ## Load a BLS signature from a raw signature ## Exits the **caller** with false if the signature is invalid let sig = signature.load() @@ -51,11 +51,11 @@ func addSignatureSet[T]( sigs: var seq[SignatureSet], pubkey: blscurve.PublicKey, sszObj: T, - signature: ValidatorSig | blscurve.Signature, + signature: CookedSig, genesis_validators_root: Eth2Digest, fork: Fork, epoch: Epoch, - domain: DomainType): bool {.raises: [Defect].}= + domain: DomainType) = ## Add a new signature set triplet (pubkey, message, signature) ## to a collection of signature sets for batch verification. ## Can return false if `signature` wasn't deserialized to a valid BLS signature. @@ -68,20 +68,11 @@ func addSignatureSet[T]( ) ).data - when signature is ValidatorSig: - sigs.add(( - pubkey, - signing_root, - signature.loadOrExit(false) - )) - else: - sigs.add(( - pubkey, - signing_root, - signature - )) - - return true + sigs.add(( + pubkey, + signing_root, + blscurve.Signature(signature) + )) proc aggregateAttesters( aggPK: var blscurve.PublicKey, @@ -138,15 +129,14 @@ proc addIndexedAttestation( if not aggPK.aggregateAttesters(attestation, state): return false - if not sigs.addSignatureSet( + sigs.addSignatureSet( aggPK, attestation.data, - attestation.signature, + attestation.signature.loadOrExit(false), state.genesis_validators_root, state.fork, attestation.data.target.epoch, - DOMAIN_BEACON_ATTESTER): - return false + DOMAIN_BEACON_ATTESTER) return true proc addAttestation( @@ -155,39 +145,38 @@ proc addAttestation( state: BeaconState, cache: var StateCache ): bool = - result = false - + var inited = false var attestersAgg{.noInit.}: AggregatePublicKey for valIndex in state.get_attesting_indices( attestation.data, attestation.aggregation_bits, cache ): - if not result: # first iteration + if not inited: # first iteration attestersAgg.init(state.validators[valIndex] .pubkey.loadWithCacheOrExit(false)) - result = true + inited = true else: attestersAgg.aggregate(state.validators[valIndex] .pubkey.loadWithCacheOrExit(false)) - if not result: + if not inited: # There were no attesters return false var attesters{.noinit.}: blscurve.PublicKey attesters.finish(attestersAgg) - if not sigs.addSignatureSet( + sigs.addSignatureSet( attesters, attestation.data, - attestation.signature, + attestation.signature.loadOrExit(false), state.genesis_validators_root, state.fork, attestation.data.target.epoch, - DOMAIN_BEACON_ATTESTER): - return false - return true + DOMAIN_BEACON_ATTESTER) + + true # Public API # ------------------------------------------------------ @@ -197,7 +186,7 @@ proc addAttestation*( fork: Fork, genesis_validators_root: Eth2Digest, epochRef: auto, attestation: Attestation - ): tuple[valid: bool, sig: CookedSig] = + ): Option[CookedSig] = ## Add an attestation for batched BLS verification ## purposes ## This only verifies cryptography @@ -207,75 +196,38 @@ proc addAttestation*( ## In that case the seq[SignatureSet] is unmodified mixin get_attesting_indices, validator_keys, pubkey - let defaultFail = (false, default(CookedSig)) - - result = defaultFail - + var inited = false var attestersAgg{.noInit.}: AggregatePublicKey for valIndex in epochRef.get_attesting_indices( attestation.data, attestation.aggregation_bits): - if not result.valid: # first iteration + if not inited: # first iteration attestersAgg.init(epochRef.validator_keys[valIndex] - .loadWithCacheOrExit(defaultFail)) - result.valid = true + .loadWithCacheOrExit(none(CookedSig))) + inited = true else: attestersAgg.aggregate(epochRef.validator_keys[valIndex] - .loadWithCacheOrExit(defaultFail)) + .loadWithCacheOrExit(none(CookedSig))) - if not result.valid: + if not inited: # There were no attesters - return defaultFail + return none(CookedSig) var attesters{.noinit.}: blscurve.PublicKey attesters.finish(attestersAgg) - let cookedSig = attestation.signature.loadOrExit(defaultFail) + let cookedSig = attestation.signature.loadOrExit(none(CookedSig)) - return ( - sigs.addSignatureSet( + sigs.addSignatureSet( attesters, attestation.data, cookedSig, genesis_validators_root, fork, attestation.data.target.epoch, - DOMAIN_BEACON_ATTESTER), - CookedSig(cookedSig)) + DOMAIN_BEACON_ATTESTER) -proc addIndexedAttestation*( - sigs: var seq[SignatureSet], - fork: Fork, genesis_validators_root: Eth2Digest, - epochRef: auto, - attestation: IndexedAttestation, - ): bool = - ## Add an indexed attestation for batched BLS verification - ## purposes - ## This only verifies cryptography, checking that - ## the indices are sorted and unique is not checked for example. - ## - ## Returns true if the indexed attestations was added to the batching buffer - ## Returns false if saniy checks failed (non-empty, keys are valid) - ## In that case the seq[SignatureSet] is unmodified - if attestation.attesting_indices.len == 0: - # Aggregation spec requires non-empty collection - # - https://tools.ietf.org/html/draft-irtf-cfrg-bls-signature-04 - # Eth2 spec requires at least one attesting indice in slashing - # - https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/beacon-chain.md#is_valid_indexed_attestation - return false - - var aggPK {.noInit.}: blscurve.PublicKey - if not aggPK.aggregateAttesters(attestation, epochRef): - return false - - return sigs.addSignatureSet( - aggPK, - attestation.data, - attestation.signature, - genesis_validators_root, - fork, - attestation.data.target.epoch, - DOMAIN_BEACON_ATTESTER) + some(CookedSig(cookedSig)) proc addSlotSignature*( sigs: var seq[SignatureSet], @@ -283,18 +235,19 @@ proc addSlotSignature*( slot: Slot, pubkey: ValidatorPubKey, signature: ValidatorSig): bool = - let epoch = compute_epoch_at_slot(slot) - return sigs.addSignatureSet( + sigs.addSignatureSet( pubkey.loadWithCacheOrExit(false), sszObj = slot, - signature, + signature.loadOrExit(false), genesis_validators_root, fork, epoch, DOMAIN_SELECTION_PROOF ) + true + proc addAggregateAndProofSignature*( sigs: var seq[SignatureSet], fork: Fork, genesis_validators_root: Eth2Digest, @@ -304,16 +257,18 @@ proc addAggregateAndProofSignature*( ): bool = let epoch = compute_epoch_at_slot(aggregate_and_proof.aggregate.data.slot) - return sigs.addSignatureSet( + sigs.addSignatureSet( pubkey.loadWithCacheOrExit(false), sszObj = aggregate_and_proof, - signature, + signature.loadOrExit(false), genesis_validators_root, fork, epoch, DOMAIN_AGGREGATE_AND_PROOF ) + true + proc collectSignatureSets*( sigs: var seq[SignatureSet], signed_block: SignedBeaconBlock, @@ -345,27 +300,25 @@ proc collectSignatureSets*( # 1. Block proposer # ---------------------------------------------------- - if not sigs.addSignatureSet( + sigs.addSignatureSet( pubkey, signed_block.message, - signed_block.signature, + signed_block.signature.loadOrExit(false), state.genesis_validators_root, state.fork, epoch, - DOMAIN_BEACON_PROPOSER): - return false + DOMAIN_BEACON_PROPOSER) # 2. Randao Reveal # ---------------------------------------------------- - if not sigs.addSignatureSet( + sigs.addSignatureSet( pubkey, epoch, - signed_block.message.body.randao_reveal, + signed_block.message.body.randao_reveal.loadOrExit(false), state.genesis_validators_root, state.fork, epoch, - DOMAIN_RANDAO): - return false + DOMAIN_RANDAO) # 3. Proposer slashings # ---------------------------------------------------- @@ -386,32 +339,30 @@ proc collectSignatureSets*( let header_1 = slashing.signed_header_1 let proposer1 = state.validators[header_1.message.proposer_index] let epoch1 = header_1.message.slot.compute_epoch_at_slot() - if not sigs.addSignatureSet( + sigs.addSignatureSet( proposer1.pubkey.loadWithCacheOrExit(false), header_1.message, - header_1.signature, + header_1.signature.loadOrExit(false), state.genesis_validators_root, state.fork, epoch1, DOMAIN_BEACON_PROPOSER - ): - return false + ) # Conflicting block 2 block: let header_2 = slashing.signed_header_2 let proposer2 = state.validators[header_2.message.proposer_index] let epoch2 = header_2.message.slot.compute_epoch_at_slot() - if not sigs.addSignatureSet( + sigs.addSignatureSet( proposer2.pubkey.loadWithCacheOrExit(false), header_2.message, - header_2.signature, + header_2.signature.loadOrExit(false), state.genesis_validators_root, state.fork, epoch2, DOMAIN_BEACON_PROPOSER - ): - return false + ) # 4. Attester slashings # ---------------------------------------------------- @@ -466,15 +417,14 @@ proc collectSignatureSets*( # fixed in 1.4.2 template volex: untyped = signed_block.message.body.voluntary_exits[i] - if not sigs.addSignatureSet( + sigs.addSignatureSet( state.validators[volex.message.validator_index] .pubkey.loadWithCacheOrExit(false), volex.message, - volex.signature, + volex.signature.loadOrExit(false), state.genesis_validators_root, state.fork, volex.message.epoch, - DOMAIN_VOLUNTARY_EXIT): - return false + DOMAIN_VOLUNTARY_EXIT) return true diff --git a/beacon_chain/validators/validator_pool.nim b/beacon_chain/validators/validator_pool.nim index e3cf86dbb..446f94f92 100644 --- a/beacon_chain/validators/validator_pool.nim +++ b/beacon_chain/validators/validator_pool.nim @@ -25,8 +25,9 @@ func init*(T: type ValidatorPool, ## `genesis_validators_root` is used as an unique ID for the ## blockchain ## `backend` is the KeyValue Store backend - result.validators = initTable[ValidatorPubKey, AttachedValidator]() - result.slashingProtection = slashingProtectionDB + T( + slashingProtection: slashingProtectionDB + ) template count*(pool: ValidatorPool): int = pool.validators.len @@ -72,23 +73,23 @@ proc signWithRemoteValidator(v: AttachedValidator, data: Eth2Digest): proc signBlockProposal*(v: AttachedValidator, fork: Fork, genesis_validators_root: Eth2Digest, slot: Slot, blockRoot: Eth2Digest): Future[ValidatorSig] {.async.} = - if v.kind == inProcess: - result = get_block_signature( - fork, genesis_validators_root, slot, blockRoot, v.privKey) + return if v.kind == inProcess: + get_block_signature( + fork, genesis_validators_root, slot, blockRoot, v.privKey).toValidatorSig() else: let root = compute_block_root(fork, genesis_validators_root, slot, blockRoot) - result = await signWithRemoteValidator(v, root) + await signWithRemoteValidator(v, root) proc signAttestation*(v: AttachedValidator, attestation: AttestationData, fork: Fork, genesis_validators_root: Eth2Digest): Future[ValidatorSig] {.async.} = - if v.kind == inProcess: - result = get_attestation_signature( - fork, genesis_validators_root, attestation, v.privKey) + return if v.kind == inProcess: + get_attestation_signature( + fork, genesis_validators_root, attestation, v.privKey).toValidatorSig() else: let root = compute_attestation_root(fork, genesis_validators_root, attestation) - result = await signWithRemoteValidator(v, root) + await signWithRemoteValidator(v, root) proc produceAndSignAttestation*(validator: AttachedValidator, attestationData: AttestationData, @@ -107,36 +108,36 @@ proc signAggregateAndProof*(v: AttachedValidator, aggregate_and_proof: AggregateAndProof, fork: Fork, genesis_validators_root: Eth2Digest): Future[ValidatorSig] {.async.} = - if v.kind == inProcess: - result = get_aggregate_and_proof_signature( - fork, genesis_validators_root, aggregate_and_proof, v.privKey) + return if v.kind == inProcess: + get_aggregate_and_proof_signature( + fork, genesis_validators_root, aggregate_and_proof, v.privKey).toValidatorSig() else: let root = compute_aggregate_and_proof_root( fork, genesis_validators_root, aggregate_and_proof) - result = await signWithRemoteValidator(v, root) + await signWithRemoteValidator(v, root) # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#randao-reveal func genRandaoReveal*(k: ValidatorPrivKey, fork: Fork, - genesis_validators_root: Eth2Digest, slot: Slot): ValidatorSig = + genesis_validators_root: Eth2Digest, slot: Slot): CookedSig = get_epoch_signature( fork, genesis_validators_root, slot.compute_epoch_at_slot, k) proc genRandaoReveal*(v: AttachedValidator, fork: Fork, genesis_validators_root: Eth2Digest, slot: Slot): Future[ValidatorSig] {.async.} = - if v.kind == inProcess: - return genRandaoReveal(v.privKey, fork, genesis_validators_root, slot) + return if v.kind == inProcess: + genRandaoReveal(v.privKey, fork, genesis_validators_root, slot).toValidatorSig() else: let root = compute_epoch_root( fork, genesis_validators_root, slot.compute_epoch_at_slot) - result = await signWithRemoteValidator(v, root) + await signWithRemoteValidator(v, root) proc getSlotSig*(v: AttachedValidator, fork: Fork, genesis_validators_root: Eth2Digest, slot: Slot ): Future[ValidatorSig] {.async.} = - if v.kind == inProcess: - result = get_slot_signature( - fork, genesis_validators_root, slot, v.privKey) + return if v.kind == inProcess: + get_slot_signature( + fork, genesis_validators_root, slot, v.privKey).toValidatorSig() else: let root = compute_slot_root(fork, genesis_validators_root, slot) - result = await signWithRemoteValidator(v, root) + await signWithRemoteValidator(v, root) diff --git a/research/block_sim.nim b/research/block_sim.nim index d64244105..e84c9923b 100644 --- a/research/block_sim.nim +++ b/research/block_sim.nim @@ -117,8 +117,8 @@ cli do(slots = SLOTS_PER_EPOCH * 5, Attestation( data: data, aggregation_bits: aggregation_bits, - signature: sig - ), @[validatorIdx], sig.load.get().CookedSig, data.slot) + signature: sig.toValidatorSig() + ), [validatorIdx], sig, data.slot) proc proposeBlock(slot: Slot) = if rand(r, 1.0) > blockRatio: @@ -141,7 +141,8 @@ cli do(slots = SLOTS_PER_EPOCH * 5, hashedState, proposerIdx, head.root, - privKey.genRandaoReveal(state.fork, state.genesis_validators_root, slot), + privKey.genRandaoReveal(state.fork, state.genesis_validators_root, + slot).toValidatorSig(), eth1ProposalData.vote, default(GraffitiBytes), attPool.getAttestationsForBlock(state, cache), @@ -164,7 +165,7 @@ cli do(slots = SLOTS_PER_EPOCH * 5, newBlock.signature = withTimerRet(timers[tSignBlock]): get_block_signature( state.fork, state.genesis_validators_root, newBlock.message.slot, - blockRoot, privKey) + blockRoot, privKey).toValidatorSig() let added = chainDag.addRawBlock(quarantine, newBlock) do ( blckRef: BlockRef, signedBlock: TrustedSignedBeaconBlock, diff --git a/research/state_sim.nim b/research/state_sim.nim index afa9af801..1e0570630 100644 --- a/research/state_sim.nim +++ b/research/state_sim.nim @@ -131,7 +131,7 @@ cli do(slots = SLOTS_PER_EPOCH * 5, attestation = makeAttestation(state[].data, latest_block_root, scas, target_slot, i.CommitteeIndex, v, cache, flags) - agg.init(attestation.signature) + agg.init(attestation.signature.load.get()) first = false else: let att2 = @@ -140,8 +140,8 @@ cli do(slots = SLOTS_PER_EPOCH * 5, if not att2.aggregation_bits.overlaps(attestation.aggregation_bits): attestation.aggregation_bits.incl(att2.aggregation_bits) if skipBlsValidation notin flags: - agg.aggregate(att2.signature) - attestation.signature = agg.finish().exportRaw() + agg.aggregate(att2.signature.load.get()) + attestation.signature = agg.finish().toValidatorSig() if not first: # add the attestation if any of the validators attested, as given diff --git a/tests/all_tests.nim b/tests/all_tests.nim index fdd4b36b9..18845d773 100644 --- a/tests/all_tests.nim +++ b/tests/all_tests.nim @@ -21,6 +21,7 @@ import # Unit test ./test_discovery, ./test_eth1_monitor, ./test_exit_pool, + ./test_gossip_validation, ./test_helpers, ./test_honest_validator, ./test_interop, diff --git a/tests/mocking/mock_attestations.nim b/tests/mocking/mock_attestations.nim index a5740d10c..47906c8f9 100644 --- a/tests/mocking/mock_attestations.nim +++ b/tests/mocking/mock_attestations.nim @@ -78,7 +78,7 @@ proc signMockAttestation*(state: BeaconState, attestation: var Attestation) = agg.aggregate(sig) if first_iter != true: - attestation.signature = agg.finish().exportRaw() + attestation.signature = agg.finish().toValidatorSig() # Otherwise no participants so zero sig proc mockAttestationImpl( diff --git a/tests/mocking/mock_blocks.nim b/tests/mocking/mock_blocks.nim index 50ab7c0ee..73b4b87de 100644 --- a/tests/mocking/mock_blocks.nim +++ b/tests/mocking/mock_blocks.nim @@ -8,7 +8,7 @@ import options, # Specs - ../../beacon_chain/spec/[datatypes, helpers, signatures, validator], + ../../beacon_chain/spec/[crypto, datatypes, helpers, signatures, validator], # Internals ../../beacon_chain/ssz, # Mock helpers @@ -28,11 +28,11 @@ proc signMockBlockImpl( signedBlock.message.body.randao_reveal = get_epoch_signature( state.fork, state.genesis_validators_root, block_slot.compute_epoch_at_slot, - privkey) + privkey).toValidatorSig() signedBlock.root = hash_tree_root(signedBlock.message) signedBlock.signature = get_block_signature( state.fork, state.genesis_validators_root, block_slot, - signedBlock.root, privkey) + signedBlock.root, privkey).toValidatorSig() proc signMockBlock*(state: BeaconState, signedBlock: var SignedBeaconBlock) = signMockBlockImpl(state, signedBlock) diff --git a/tests/mocking/mock_deposits.nim b/tests/mocking/mock_deposits.nim index 6c02fd6cf..cf2c93b38 100644 --- a/tests/mocking/mock_deposits.nim +++ b/tests/mocking/mock_deposits.nim @@ -44,7 +44,7 @@ func mockDepositData( ): DepositData = var ret = mockDepositData(pubkey, amount) if skipBlsValidation notin flags: - ret.signature = defaultRuntimePreset.get_deposit_signature(ret, privkey) + ret.signature = defaultRuntimePreset.get_deposit_signature(ret, privkey).toValidatorSig() ret template mockGenesisDepositsImpl( diff --git a/tests/test_attestation_pool.nim b/tests/test_attestation_pool.nim index f809c7d14..9c6f9643b 100644 --- a/tests/test_attestation_pool.nim +++ b/tests/test_attestation_pool.nim @@ -15,14 +15,14 @@ import stew/byteutils, eth/keys, # Internal - ../beacon_chain/[beacon_node_types, extras, beacon_clock], - ../beacon_chain/gossip_processing/[gossip_validation, batch_validation], + ../beacon_chain/[beacon_node_types, extras], + ../beacon_chain/gossip_processing/[gossip_validation], ../beacon_chain/fork_choice/[fork_choice_types, fork_choice], ../beacon_chain/consensus_object_pools/[ block_quarantine, blockchain_dag, block_clearance, attestation_pool], ../beacon_chain/ssz/merkleization, ../beacon_chain/spec/[crypto, datatypes, digest, validator, state_transition, - helpers, beaconstate, presets, network], + helpers, beaconstate, presets], # Test utilities ./testutil, ./testblockutil @@ -41,12 +41,12 @@ func combine(tgt: var Attestation, src: Attestation) = tgt.aggregation_bits.incl(src.aggregation_bits) var agg {.noInit.}: AggregateSignature - agg.init(tgt.signature) - agg.aggregate(src.signature) - tgt.signature = agg.finish().exportRaw() + agg.init(tgt.signature.load().get()) + agg.aggregate(src.signature.load.get()) + tgt.signature = agg.finish().toValidatorSig() func loadSig(a: Attestation): CookedSig = - a.signature.load.get().CookedSig + a.signature.load.get() proc pruneAtFinalization(dag: ChainDAGRef, attPool: AttestationPool) = if dag.needStateCachesAndForkChoicePruning(): @@ -560,80 +560,3 @@ suiteReport "Attestation pool processing" & preset(): pool[].addForkChoice(epochRef, blckRef, signedBlock.message, blckRef.slot) doAssert: b10Add_clone.error == (ValidationResult.Ignore, Duplicate) - - -suiteReport "Attestation validation " & preset(): - setup: - # Genesis state that results in 3 members per committee - var - chainDag = init(ChainDAGRef, defaultRuntimePreset, makeTestDB(SLOTS_PER_EPOCH * 3)) - quarantine = QuarantineRef.init(keys.newRng()) - pool = newClone(AttestationPool.init(chainDag, quarantine)) - state = newClone(chainDag.headState) - cache = StateCache() - batchCrypto = BatchCrypto.new(keys.newRng()) - # Slot 0 is a finalized slot - won't be making attestations for it.. - check: - process_slots(state.data, getStateField(state, slot) + 1, cache) - - timedTest "Validation sanity": - # TODO: refactor tests to avoid skipping BLS validation - chainDag.updateFlags.incl {skipBLSValidation} - - var - cache: StateCache - for blck in makeTestBlocks( - chainDag.headState.data, chainDag.head.root, cache, - int(SLOTS_PER_EPOCH * 5), false): - let added = chainDag.addRawBlock(quarantine, blck) do ( - blckRef: BlockRef, signedBlock: TrustedSignedBeaconBlock, - epochRef: EpochRef, state: HashedBeaconState): - # Callback add to fork choice if valid - pool[].addForkChoice(epochRef, blckRef, signedBlock.message, blckRef.slot) - - check: added.isOk() - chainDag.updateHead(added[], quarantine) - pruneAtFinalization(chainDag, pool[]) - - var - # Create an attestation for slot 1! - beacon_committee = get_beacon_committee( - chainDag.headState.data.data, chainDag.head.slot, 0.CommitteeIndex, cache) - attestation = makeAttestation( - chainDag.headState.data.data, chainDag.head.root, beacon_committee[0], cache) - - committees_per_slot = - get_committee_count_per_slot(chainDag.headState.data.data, - attestation.data.slot.epoch, cache) - - subnet = compute_subnet_for_attestation( - committees_per_slot, - attestation.data.slot, attestation.data.index.CommitteeIndex) - - beaconTime = attestation.data.slot.toBeaconTime() - - check: - validateAttestation(pool, batchCrypto, attestation, beaconTime, subnet, true).waitFor().isOk - - # Same validator again - validateAttestation(pool, batchCrypto, attestation, beaconTime, subnet, true).waitFor().error()[0] == - ValidationResult.Ignore - - pool[].nextAttestationEpoch.setLen(0) # reset for test - check: - # Wrong subnet - validateAttestation(pool, batchCrypto, attestation, beaconTime, subnet + 1, true).waitFor().isErr - - pool[].nextAttestationEpoch.setLen(0) # reset for test - check: - # Too far in the future - validateAttestation( - pool, batchCrypto, attestation, beaconTime - 1.seconds, subnet + 1, true).waitFor().isErr - - pool[].nextAttestationEpoch.setLen(0) # reset for test - check: - # Too far in the past - validateAttestation( - pool, batchCrypto, attestation, - beaconTime - (SECONDS_PER_SLOT * SLOTS_PER_EPOCH - 1).int.seconds, - subnet + 1, true).waitFor().isErr diff --git a/tests/test_gossip_validation.nim b/tests/test_gossip_validation.nim new file mode 100644 index 000000000..c70a178af --- /dev/null +++ b/tests/test_gossip_validation.nim @@ -0,0 +1,149 @@ +# beacon_chain +# Copyright (c) 2018-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.used.} + +import + # Status lib + unittest2, + chronicles, chronos, + eth/keys, + # Internal + ../beacon_chain/[beacon_node_types, extras, beacon_clock], + ../beacon_chain/gossip_processing/[gossip_validation, batch_validation], + ../beacon_chain/fork_choice/[fork_choice_types, fork_choice], + ../beacon_chain/consensus_object_pools/[ + block_quarantine, blockchain_dag, block_clearance, attestation_pool], + ../beacon_chain/ssz/merkleization, + ../beacon_chain/spec/[crypto, datatypes, digest, validator, state_transition, + helpers, presets, network], + # Test utilities + ./testutil, ./testblockutil + +proc pruneAtFinalization(dag: ChainDAGRef, attPool: AttestationPool) = + if dag.needStateCachesAndForkChoicePruning(): + dag.pruneStateCachesDAG() + # pool[].prune() # We test logic without att_1_0 pool / fork choice pruning + +suiteReport "Gossip validation " & preset(): + setup: + # Genesis state that results in 3 members per committee + var + chainDag = init(ChainDAGRef, defaultRuntimePreset, makeTestDB(SLOTS_PER_EPOCH * 3)) + quarantine = QuarantineRef.init(keys.newRng()) + pool = newClone(AttestationPool.init(chainDag, quarantine)) + state = newClone(chainDag.headState) + cache = StateCache() + batchCrypto = BatchCrypto.new(keys.newRng(), eager = proc(): bool = false) + # Slot 0 is a finalized slot - won't be making attestations for it.. + check: + process_slots(state.data, getStateField(state, slot) + 1, cache) + + timedTest "Validation sanity": + # TODO: refactor tests to avoid skipping BLS validation + chainDag.updateFlags.incl {skipBLSValidation} + + var + cache: StateCache + for blck in makeTestBlocks( + chainDag.headState.data, chainDag.head.root, cache, + int(SLOTS_PER_EPOCH * 5), false): + let added = chainDag.addRawBlock(quarantine, blck) do ( + blckRef: BlockRef, signedBlock: TrustedSignedBeaconBlock, + epochRef: EpochRef, state: HashedBeaconState): + # Callback add to fork choice if valid + pool[].addForkChoice(epochRef, blckRef, signedBlock.message, blckRef.slot) + + check: added.isOk() + chainDag.updateHead(added[], quarantine) + pruneAtFinalization(chainDag, pool[]) + + var + # Create attestations for slot 1 + beacon_committee = get_beacon_committee( + chainDag.headState.data.data, chainDag.head.slot, 0.CommitteeIndex, cache) + att_1_0 = makeAttestation( + chainDag.headState.data.data, chainDag.head.root, beacon_committee[0], cache) + att_1_1 = makeAttestation( + chainDag.headState.data.data, chainDag.head.root, beacon_committee[1], cache) + + committees_per_slot = + get_committee_count_per_slot(chainDag.headState.data.data, + att_1_0.data.slot.epoch, cache) + + subnet = compute_subnet_for_attestation( + committees_per_slot, + att_1_0.data.slot, att_1_0.data.index.CommitteeIndex) + + beaconTime = att_1_0.data.slot.toBeaconTime() + + check: + validateAttestation(pool, batchCrypto, att_1_0, beaconTime, subnet, true).waitFor().isOk + + # Same validator again + validateAttestation(pool, batchCrypto, att_1_0, beaconTime, subnet, true).waitFor().error()[0] == + ValidationResult.Ignore + + pool[].nextAttestationEpoch.setLen(0) # reset for test + check: + # Wrong subnet + validateAttestation(pool, batchCrypto, att_1_0, beaconTime, subnet + 1, true).waitFor().isErr + + pool[].nextAttestationEpoch.setLen(0) # reset for test + check: + # Too far in the future + validateAttestation( + pool, batchCrypto, att_1_0, beaconTime - 1.seconds, subnet + 1, true).waitFor().isErr + + pool[].nextAttestationEpoch.setLen(0) # reset for test + check: + # Too far in the past + validateAttestation( + pool, batchCrypto, att_1_0, + beaconTime - (SECONDS_PER_SLOT * SLOTS_PER_EPOCH - 1).int.seconds, + subnet + 1, true).waitFor().isErr + + block: + var broken = att_1_0 + broken.signature.blob[0] += 1 + pool[].nextAttestationEpoch.setLen(0) # reset for test + check: + # Invalid signature + validateAttestation( + pool, batchCrypto, broken, beaconTime, subnet, true).waitFor(). + error()[0] == ValidationResult.Reject + + block: + var broken = att_1_0 + broken.signature.blob[5] += 1 + pool[].nextAttestationEpoch.setLen(0) # reset for test + # One invalid, one valid (batched) + let + fut_1_0 = validateAttestation( + pool, batchCrypto, broken, beaconTime, subnet, true) + fut_1_1 = validateAttestation( + pool, batchCrypto, att_1_1, beaconTime, subnet, true) + + check: + fut_1_0.waitFor().error()[0] == ValidationResult.Reject + fut_1_1.waitFor().isOk() + + block: + var broken = att_1_0 + # This shouldn't deserialize, which is a different way to break it + broken.signature.blob = default(type broken.signature.blob) + pool[].nextAttestationEpoch.setLen(0) # reset for test + # One invalid, one valid (batched) + let + fut_1_0 = validateAttestation( + pool, batchCrypto, broken, beaconTime, subnet, true) + fut_1_1 = validateAttestation( + pool, batchCrypto, att_1_1, beaconTime, subnet, true) + + check: + fut_1_0.waitFor().error()[0] == ValidationResult.Reject + fut_1_1.waitFor().isOk() diff --git a/tests/test_interop.nim b/tests/test_interop.nim index 1dfcc78c9..4964e82eb 100644 --- a/tests/test_interop.nim +++ b/tests/test_interop.nim @@ -136,7 +136,7 @@ suiteReport "Interop": check: # TODO re-enable - true or dep.sig == computed_sig + true or dep.sig == computed_sig.toValidatorSig() timedTest "Interop genesis": # Check against https://github.com/protolambda/zcli: diff --git a/tests/testblockutil.nim b/tests/testblockutil.nim index ee9edd4b9..c43fa6dfb 100644 --- a/tests/testblockutil.nim +++ b/tests/testblockutil.nim @@ -51,7 +51,7 @@ func makeDeposit*(i: int, flags: UpdateFlags = {}): DepositData = if skipBLSValidation notin flags: result.signature = get_deposit_signature( - defaultRuntimePreset, result, privkey) + defaultRuntimePreset, result, privkey).toValidatorSig() proc makeInitialDeposits*( n = SLOTS_PER_EPOCH, flags: UpdateFlags = {}): seq[DepositData] = @@ -68,7 +68,7 @@ func signBlock( signature: if skipBlsValidation notin flags: get_block_signature( - fork, genesis_validators_root, blck.slot, root, privKey) + fork, genesis_validators_root, blck.slot, root, privKey).toValidatorSig() else: ValidatorSig() ) @@ -93,7 +93,8 @@ proc addTestBlock*( randao_reveal = if skipBlsValidation notin flags: privKey.genRandaoReveal( - state.data.fork, state.data.genesis_validators_root, state.data.slot) + state.data.fork, state.data.genesis_validators_root, state.data.slot). + toValidatorSig() else: ValidatorSig() @@ -167,7 +168,7 @@ proc makeAttestation*( sig = if skipBLSValidation notin flags: get_attestation_signature(state.fork, state.genesis_validators_root, - data, hackPrivKey(validator)) + data, hackPrivKey(validator)).toValidatorSig() else: ValidatorSig() @@ -219,13 +220,12 @@ proc makeFullAttestations*( # Initial attestation var attestation = Attestation( aggregation_bits: CommitteeValidatorsBits.init(committee.len), - data: data, - signature: get_attestation_signature( - state.fork, state.genesis_validators_root, data, - hackPrivKey(state.validators[committee[0]])) - ) + data: data) + var agg {.noInit.}: AggregateSignature - agg.init(attestation.signature) + agg.init(get_attestation_signature( + state.fork, state.genesis_validators_root, data, + hackPrivKey(state.validators[committee[0]]))) # Aggregate the remainder attestation.aggregation_bits.setBit 0 @@ -237,7 +237,7 @@ proc makeFullAttestations*( hackPrivKey(state.validators[committee[j]]) )) - attestation.signature = agg.finish().exportRaw() + attestation.signature = agg.finish().toValidatorSig() result.add attestation iterator makeTestBlocks*(