remove attestation/aggregate queue (#2519)

With the introduction of batching and lazy attestation aggregation, it
no longer makes sense to enqueue attestations between the signature
check and adding them to the attestation pool - this only takes up
valuable CPU without any real benefit.

* add successfully validated attestations to attestion pool directly
* avoid copying participant list around for single-vote attestations,
pass single validator index instead
* release decompressed gossip memory earlier, specially during async
message validation
* use cooked signatures in a few more places to avoid reloads and errors
* remove some Defect-raising versions of signature-loading
* release decompressed data memory before validating message
This commit is contained in:
Jacek Sieka 2021-04-26 22:39:44 +02:00 committed by GitHub
parent 8a3c070337
commit 7dba1b37dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 662 additions and 722 deletions

View File

@ -15,11 +15,6 @@ AllTests-mainnet
+ Working with aggregates [Preset: mainnet] OK
```
OK: 11/11 Fail: 0/11 Skip: 0/11
## Attestation validation [Preset: mainnet]
```diff
+ Validation sanity OK
```
OK: 1/1 Fail: 0/1 Skip: 0/1
## Beacon chain DB [Preset: mainnet]
```diff
+ empty database [Preset: mainnet] OK
@ -98,6 +93,11 @@ OK: 3/3 Fail: 0/3 Skip: 0/3
+ fork_choice - testing with votes OK
```
OK: 4/4 Fail: 0/4 Skip: 0/4
## Gossip validation [Preset: mainnet]
```diff
+ Validation sanity OK
```
OK: 1/1 Fail: 0/1 Skip: 0/1
## Honest validator
```diff
+ General pubsub topics OK

View File

@ -95,11 +95,12 @@ proc init*(T: type AttestationPool, chainDag: ChainDAGRef, quarantine: Quarantin
)
proc addForkChoiceVotes(
pool: var AttestationPool, slot: Slot, participants: seq[ValidatorIndex],
block_root: Eth2Digest, wallSlot: Slot) =
pool: var AttestationPool, slot: Slot,
attesting_indices: openArray[ValidatorIndex], block_root: Eth2Digest,
wallSlot: Slot) =
# Add attestation votes to fork choice
if (let v = pool.forkChoice.on_attestation(
pool.chainDag, slot, block_root, participants, wallSlot);
pool.chainDag, slot, block_root, attesting_indices, wallSlot);
v.isErr):
# This indicates that the fork choice and the chain dag are out of sync -
# this is most likely the result of a bug, but we'll try to keep going -
@ -154,7 +155,7 @@ func toAttestation(entry: AttestationEntry, validation: Validation): Attestation
Attestation(
aggregation_bits: validation.aggregation_bits,
data: entry.data,
signature: validation.aggregate_signature.finish().exportRaw()
signature: validation.aggregate_signature.finish().toValidatorSig()
)
func updateAggregates(entry: var AttestationEntry) =
@ -262,7 +263,7 @@ proc addAttestation(entry: var AttestationEntry,
proc addAttestation*(pool: var AttestationPool,
attestation: Attestation,
participants: seq[ValidatorIndex],
attesting_indices: openArray[ValidatorIndex],
signature: CookedSig,
wallSlot: Slot) =
## Add an attestation to the pool, assuming it's been validated already.
@ -273,7 +274,7 @@ proc addAttestation*(pool: var AttestationPool,
logScope:
attestation = shortLog(attestation)
doAssert attestation.signature == signature.exportRaw(),
doAssert attestation.signature == signature.toValidatorSig(),
"Deserialized signature must match the one in the attestation"
updateCurrent(pool, wallSlot)
@ -303,8 +304,8 @@ proc addAttestation*(pool: var AttestationPool,
return
pool.addForkChoiceVotes(
attestation.data.slot, participants, attestation.data.beacon_block_root,
wallSlot)
attestation.data.slot, attesting_indices,
attestation.data.beacon_block_root, wallSlot)
proc addForkChoice*(pool: var AttestationPool,
epochRef: EpochRef,
@ -343,7 +344,7 @@ iterator attestations*(pool: AttestationPool, slot: Option[Slot],
for index, signature in entry.singles:
singleAttestation.aggregation_bits.setBit(index)
singleAttestation.signature = signature.exportRaw()
singleAttestation.signature = signature.toValidatorSig()
yield singleAttestation
singleAttestation.aggregation_bits.clearBit(index)

View File

@ -78,6 +78,27 @@ iterator get_attesting_indices*(epochRef: EpochRef,
yield index
inc i
func get_attesting_indices_one*(epochRef: EpochRef,
data: AttestationData,
bits: CommitteeValidatorsBits):
Option[ValidatorIndex] =
# A variation on get_attesting_indices that returns the validator index only
# if only one validator index is set
if bits.lenu64 != get_beacon_committee_len(epochRef, data.slot, data.index.CommitteeIndex):
trace "get_attesting_indices: inconsistent aggregation and committee length"
none(ValidatorIndex)
else:
var res = none(ValidatorIndex)
var i = 0
for index in get_beacon_committee(epochRef, data.slot, data.index.CommitteeIndex):
if bits[i]:
if res.isNone():
res = some(index)
else:
return none(ValidatorIndex)
inc i
res
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/beacon-chain.md#get_attesting_indices
func get_attesting_indices*(epochRef: EpochRef,
data: AttestationData,

View File

@ -40,9 +40,6 @@ func compute_deltas(
old_balances: openArray[Gwei],
new_balances: openArray[Gwei]
): FcResult[void]
# TODO: raises [Defect] - once https://github.com/nim-lang/Nim/issues/12862 is fixed
# https://github.com/status-im/nimbus-eth2/pull/865#pullrequestreview-389117232
# Fork choice routines
# ----------------------------------------------------------------------
@ -71,7 +68,8 @@ proc init*(T: type ForkChoice,
epoch = epochRef.epoch, blck = shortLog(blck)
let
justified = BalanceCheckpoint(blck: blck, epoch: epochRef.epoch, balances: epochRef.effective_balances)
justified = BalanceCheckpoint(
blck: blck, epoch: epochRef.epoch, balances: epochRef.effective_balances)
finalized = Checkpoint(root: blck.root, epoch: epochRef.epoch)
best_justified = Checkpoint(
root: justified.blck.root, epoch: justified.epoch)
@ -176,7 +174,7 @@ proc on_attestation*(
dag: ChainDAGRef,
attestation_slot: Slot,
beacon_block_root: Eth2Digest,
attesting_indices: seq[ValidatorIndex],
attesting_indices: openArray[ValidatorIndex],
wallSlot: Slot
): FcResult[void] =
? self.update_time(dag, wallSlot)
@ -188,12 +186,14 @@ proc on_attestation*(
for validator_index in attesting_indices:
# attestation_slot and target epoch must match, per attestation rules
self.backend.process_attestation(
validator_index.ValidatorIndex, beacon_block_root,
attestation_slot.epoch)
validator_index, beacon_block_root, attestation_slot.epoch)
else:
# Spec:
# Attestations can only affect the fork choice of subsequent slots.
# Delay consideration in the fork choice until their slot is in the past.
self.queuedAttestations.add(QueuedAttestation(
slot: attestation_slot,
attesting_indices: attesting_indices,
attesting_indices: @attesting_indices,
block_root: beacon_block_root))
ok()

View File

@ -34,40 +34,18 @@ type
## Fork Choice Error Kinds
fcFinalizedNodeUnknown
fcJustifiedNodeUnknown
fcInvalidFinalizedRootChange
fcInvalidNodeIndex
fcInvalidParentIndex
fcInvalidBestChildIndex
fcInvalidJustifiedIndex
fcInvalidBestDescendant
fcInvalidParentDelta
fcInvalidNodeDelta
fcDeltaUnderflow
fcIndexUnderflow
fcInvalidDeltaLen
fcRevertedFinalizedEpoch
fcInvalidBestNode
fcInconsistentTick
# -------------------------
# TODO: Extra error modes beyond Proto/Lighthouse to be reviewed
fcUnknownParent
fcPruningFromOutdatedFinalizedRoot
AttErrorKind* = enum
attFromFuture
attFromPast
attBadTargetEpoch
attUnkownTarget
attUnknownBlock
attWrongTarget
attFutureSlot
FcUnderflowKind* = enum
## Fork Choice Overflow Kinds
fcUnderflowIndices = "Indices Overflow"
fcUnderflowBestChild = "Best Child Overflow"
fcUnderflowBestDescendant = "Best Descendant Overflow"
Index* = int
Delta* = int64
## Delta balances
@ -77,26 +55,18 @@ type
of fcFinalizedNodeUnknown,
fcJustifiedNodeUnknown:
blockRoot*: Eth2Digest
of fcInvalidFinalizedRootChange,
fcInconsistentTick:
of fcInconsistentTick:
discard
of fcInvalidNodeIndex,
fcInvalidParentIndex,
fcInvalidBestChildIndex,
fcInvalidJustifiedIndex,
fcInvalidBestDescendant,
fcInvalidParentDelta,
fcInvalidNodeDelta,
fcDeltaUnderflow:
index*: Index
of fcIndexUnderflow:
underflowKind*: FcUnderflowKind
of fcInvalidDeltaLen:
deltasLen*: int
indicesLen*: int
of fcRevertedFinalizedEpoch:
currentFinalizedEpoch*: Epoch
new_finalized_epoch*: Epoch
of fcInvalidBestNode:
startRoot*: Eth2Digest
justifiedEpoch*: Epoch

View File

@ -5,6 +5,8 @@
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
# Status
chronicles, chronos,
@ -27,29 +29,51 @@ logScope:
# Batched gossip validation
# ----------------------------------------------------------------
{.push raises: [Defect].}
type
BatchResult* {.pure.} = enum
Valid
Invalid
Timeout
Eager = proc(): bool {.gcsafe, raises: [Defect].} ##\
## Callback that returns true if eager processing should be done to lower
## latency at the expense of spending more cycles validating things, creating
## a crude timesharing priority mechanism.
Batch* = object
created: Moment
pendingBuffer: seq[SignatureSet]
resultsBuffer: seq[Future[BatchResult]]
BatchCrypto* = object
# The buffers are bounded by BatchedCryptoSize (16) which was chosen:
# Each batch is bounded by BatchedCryptoSize (16) which was chosen:
# - based on "nimble bench" in nim-blscurve
# so that low power devices like Raspberry Pi 4 can process
# that many batched verifications within 20ms
# - based on the accumulation rate of attestations and aggregates
# in large instances which were 12000 per slot (12s)
# hence 1 per ms (but the pattern is bursty around the 4s mark)
pendingBuffer: seq[SignatureSet]
resultsBuffer: seq[Future[Result[void, cstring]]]
# The number of batches is bounded by time - batch validation is skipped if
# we can't process them in the time that one slot takes, and we return
# timeout instead which prevents the gossip layer from forwarding the
# batch.
batches: seq[ref Batch]
eager: Eager ##\
## Eager is used to enable eager processing of attestations when it's
## prudent to do so (instead of leaving the CPU for other, presumably more
## important work like block processing)
sigVerifCache: BatchedBLSVerifierCache ##\
## A cache for batch BLS signature verification contexts
rng: ref BrHmacDrbgContext ##\
## A reference to the Nimbus application-wide RNG
pruneTime: Moment ## :ast time we had to prune something
const
# We cap waiting for an idle slot in case there's a lot of network traffic
# taking up all CPU - we don't want to _completely_ stop processing blocks
# in this case (attestations will get dropped) - doing so also allows us
# to benefit from more batching / larger network reads when under load.
# taking up all CPU - we don't want to _completely_ stop processing
# attestations - doing so also allows us to benefit from more batching /
# larger network reads when under load.
BatchAttAccumTime = 10.milliseconds
# Threshold for immediate trigger of batch verification.
@ -60,145 +84,174 @@ const
# but not too big as we need to redo checks one-by-one if one failed.
BatchedCryptoSize = 16
proc new*(T: type BatchCrypto, rng: ref BrHmacDrbgContext): ref BatchCrypto =
(ref BatchCrypto)(rng: rng)
proc new*(
T: type BatchCrypto, rng: ref BrHmacDrbgContext, eager: Eager): ref BatchCrypto =
(ref BatchCrypto)(rng: rng, eager: eager, pruneTime: Moment.now())
func clear(batchCrypto: var BatchCrypto) =
## Empty the crypto-pending attestations & aggregate queues
batchCrypto.pendingBuffer.setLen(0)
batchCrypto.resultsBuffer.setLen(0)
func len(batch: Batch): int =
doAssert batch.resultsBuffer.len() == batch.pendingBuffer.len()
batch.resultsBuffer.len()
proc done(batchCrypto: var BatchCrypto, idx: int) =
## Send signal to [Attestation/Aggregate]Validator
## that the attestation was crypto-verified (and so gossip validated)
## with success
batchCrypto.resultsBuffer[idx].complete(Result[void, cstring].ok())
func full(batch: Batch): bool =
batch.len() >= BatchedCryptoSize
proc fail(batchCrypto: var BatchCrypto, idx: int, error: cstring) =
## Send signal to [Attestation/Aggregate]Validator
## that the attestation was NOT crypto-verified (and so NOT gossip validated)
batchCrypto.resultsBuffer[idx].complete(Result[void, cstring].err(error))
proc clear(batch: var Batch) =
batch.pendingBuffer.setLen(0)
batch.resultsBuffer.setLen(0)
proc complete(batchCrypto: var BatchCrypto, idx: int, res: Result[void, cstring]) =
## Send signal to [Attestation/Aggregate]Validator
batchCrypto.resultsBuffer[idx].complete(res)
proc skip(batch: var Batch) =
for res in batch.resultsBuffer.mitems():
res.complete(BatchResult.Timeout)
batch.clear() # release memory early
proc processBufferedCrypto(self: var BatchCrypto) =
## Drain all attestations waiting for crypto verifications
proc pruneBatchQueue(batchCrypto: ref BatchCrypto) =
let
now = Moment.now()
doAssert self.pendingBuffer.len ==
self.resultsBuffer.len
# If batches haven't been processed for more than 12 seconds
while batchCrypto.batches.len() > 0:
if batchCrypto.batches[0][].created + SECONDS_PER_SLOT.int64.seconds > now:
break
if batchCrypto.pruneTime + SECONDS_PER_SLOT.int64.seconds > now:
notice "Batch queue pruned, skipping attestation validation",
batches = batchCrypto.batches.len()
batchCrypto.pruneTime = Moment.now()
batchCrypto.batches[0][].skip()
batchCrypto.batches.delete(0)
if self.pendingBuffer.len == 0:
proc processBatch(batchCrypto: ref BatchCrypto) =
## Process one batch, if there is any
# Pruning the queue here makes sure we catch up with processing if need be
batchCrypto.pruneBatchQueue() # Skip old batches
if batchCrypto[].batches.len() == 0:
# No more batches left, they might have been eagerly processed or pruned
return
let
batch = batchCrypto[].batches[0]
batchSize = batch[].len()
batchCrypto[].batches.del(0)
if batchSize == 0:
# Nothing to do in this batch, can happen when a batch is created without
# there being any signatures successfully added to it
return
trace "batch crypto - starting",
batchSize = self.pendingBuffer.len
batchSize
let startTime = Moment.now()
var secureRandomBytes: array[32, byte]
self.rng[].brHmacDrbgGenerate(secureRandomBytes)
batchCrypto[].rng[].brHmacDrbgGenerate(secureRandomBytes)
# TODO: For now only enable serial batch verification
let ok = batchVerifySerial(
self.sigVerifCache,
self.pendingBuffer,
batchCrypto.sigVerifCache,
batch.pendingBuffer,
secureRandomBytes)
let stopTime = Moment.now()
debug "batch crypto - finished",
batchSize = self.pendingBuffer.len,
trace "batch crypto - finished",
batchSize,
cryptoVerified = ok,
dur = stopTime - startTime
if ok:
for i in 0 ..< self.resultsBuffer.len:
self.done(i)
for res in batch.resultsBuffer.mitems():
res.complete(BatchResult.Valid)
else:
# Batched verification failed meaning that some of the signature checks
# failed, but we don't know which ones - check each signature separately
# instead
debug "batch crypto - failure, falling back",
batchSize = self.pendingBuffer.len
for i in 0 ..< self.pendingBuffer.len:
let ok = blsVerify self.pendingBuffer[i]
if ok:
self.done(i)
else:
self.fail(i, "batch crypto verification: invalid signature")
batchSize
for i, res in batch.resultsBuffer.mpairs():
let ok = blsVerify batch[].pendingBuffer[i]
res.complete(if ok: BatchResult.Valid else: BatchResult.Invalid)
self.clear()
batch[].clear() # release memory early
proc deferCryptoProcessing(self: ref BatchCrypto, idleTimeout: Duration) {.async.} =
## Process pending crypto check:
## - if time threshold is reached
## - or if networking is idle
proc deferCryptoProcessing(batchCrypto: ref BatchCrypto) {.async.} =
## Process pending crypto check after some time has passed - the time is
## chosen such that there's time to fill the batch but not so long that
## latency across the network is negatively affected
await sleepAsync(BatchAttAccumTime)
# TODO: how to cancel the scheduled `deferCryptoProcessing(BatchAttAccumTime)` ?
# when the buffer size threshold is reached?
# In practice this only happens when we receive a burst of attestations/aggregates.
# Though it's possible to reach the threshold 9ms in,
# and have only 1ms left for further accumulation.
await sleepAsync(idleTimeout)
self[].processBufferedCrypto()
# Take the first batch in the queue and process it - if eager processing has
# stolen it already, that's fine
batchCrypto.processBatch()
proc schedule(batchCrypto: ref BatchCrypto, fut: Future[Result[void, cstring]], checkThreshold = true) =
## Schedule a cryptocheck for processing
##
## The buffer is processed:
## - when 16 or more attestations/aggregates are buffered (BatchedCryptoSize)
## - when there are no network events (idleAsync)
## - otherwise after 10ms (BatchAttAccumTime)
proc getBatch(batchCrypto: ref BatchCrypto): (ref Batch, bool) =
# Get a batch suitable for attestation processing - in particular, attestation
# batches might be skipped
batchCrypto.pruneBatchQueue()
# Note: use the resultsBuffer size to detect the first item
# as pendingBuffer is appended to 3 by 3 in case of aggregates
if batchCrypto.batches.len() == 0 or
batchCrypto.batches[^1][].full():
# There are no batches in progress - start a new batch and schedule a
# deferred task to eventually handle it
let batch = (ref Batch)(created: Moment.now())
batchCrypto[].batches.add(batch)
(batch, true)
else:
let batch = batchCrypto[].batches[^1]
# len will be 0 when the batch was created but nothing added to it
# because of early failures
(batch, batch[].len() == 0)
proc scheduleBatch(batchCrypto: ref BatchCrypto, fresh: bool) =
if fresh:
# Every time we start a new round of batching, we need to launch a deferred
# task that will compute the result of the batch eventually in case the
# batch is never filled or eager processing is blocked
asyncSpawn batchCrypto.deferCryptoProcessing()
batchCrypto.resultsBuffer.add fut
if batchCrypto.resultsBuffer.len == 1:
# First attestation to be scheduled in the batch
# wait for an idle time or up to 10ms before processing
trace "batch crypto - scheduling next",
deadline = BatchAttAccumTime
asyncSpawn batchCrypto.deferCryptoProcessing(BatchAttAccumTime)
elif checkThreshold and
batchCrypto.resultsBuffer.len >= BatchedCryptoSize:
# Reached the max buffer size, process immediately
# TODO: how to cancel the scheduled `deferCryptoProcessing(BatchAttAccumTime)` ?
batchCrypto[].processBufferedCrypto()
if batchCrypto.batches.len() > 0 and
batchCrypto.batches[0][].full() and
batchCrypto.eager():
# If there's a full batch, process it eagerly assuming the callback allows
batchCrypto.processBatch()
proc scheduleAttestationCheck*(
batchCrypto: ref BatchCrypto,
fork: Fork, genesis_validators_root: Eth2Digest,
epochRef: EpochRef,
attestation: Attestation
): Option[(Future[Result[void, cstring]], CookedSig)] =
): Option[(Future[BatchResult], CookedSig)] =
## Schedule crypto verification of an attestation
##
## The buffer is processed:
## - when 16 or more attestations/aggregates are buffered (BatchedCryptoSize)
## - when there are no network events (idleAsync)
## - when eager processing is enabled and the batch is full
## - otherwise after 10ms (BatchAttAccumTime)
##
## This returns None if crypto sanity checks failed
## and a future with the deferred attestation check otherwise.
doAssert batchCrypto.pendingBuffer.len < BatchedCryptoSize
##
let (batch, fresh) = batchCrypto.getBatch()
let (sanity, sig) = batchCrypto
.pendingBuffer
.addAttestation(
fork, genesis_validators_root, epochRef,
attestation
)
if not sanity:
return none((Future[Result[void, cstring]], CookedSig))
doAssert batch.pendingBuffer.len < BatchedCryptoSize
let fut = newFuture[Result[void, cstring]](
let sig = batch
.pendingBuffer
.addAttestation(
fork, genesis_validators_root, epochRef,
attestation
)
if not sig.isSome():
return none((Future[BatchResult], CookedSig))
let fut = newFuture[BatchResult](
"batch_validation.scheduleAttestationCheck"
)
batchCrypto.schedule(fut)
batch[].resultsBuffer.add(fut)
return some((fut, sig))
batchCrypto.scheduleBatch(fresh)
return some((fut, sig.get()))
proc scheduleAggregateChecks*(
batchCrypto: ref BatchCrypto,
@ -207,7 +260,7 @@ proc scheduleAggregateChecks*(
signedAggregateAndProof: SignedAggregateAndProof
): Option[(
tuple[slotCheck, aggregatorCheck, aggregateCheck:
Future[Result[void, cstring]]],
Future[BatchResult]],
CookedSig)] =
## Schedule crypto verification of an aggregate
##
@ -217,71 +270,74 @@ proc scheduleAggregateChecks*(
## - is_valid_indexed_attestation
##
## The buffer is processed:
## - when 16 or more attestations/aggregates are buffered (BatchedCryptoSize)
## - when there are no network events (idleAsync)
## - when eager processing is enabled and the batch is full
## - otherwise after 10ms (BatchAttAccumTime)
##
## This returns None if crypto sanity checks failed
## and 2 futures with the deferred aggregate checks otherwise.
doAssert batchCrypto.pendingBuffer.len < BatchedCryptoSize
## This returns None if the signatures could not be loaded.
## and 3 futures with the deferred aggregate checks otherwise.
let (batch, fresh) = batchCrypto.getBatch()
doAssert batch[].pendingBuffer.len < BatchedCryptoSize
template aggregate_and_proof: untyped = signedAggregateAndProof.message
template aggregate: untyped = aggregate_and_proof.aggregate
type R = (
tuple[slotCheck, aggregatorCheck, aggregateCheck:
Future[Result[void, cstring]]],
Future[BatchResult]],
CookedSig)
# Enqueue in the buffer
# ------------------------------------------------------
let aggregator = epochRef.validator_keys[aggregate_and_proof.aggregator_index]
block:
let sanity = batchCrypto
.pendingBuffer
.addSlotSignature(
fork, genesis_validators_root,
aggregate.data.slot,
aggregator,
aggregate_and_proof.selection_proof
)
if not sanity:
if not batch
.pendingBuffer
.addSlotSignature(
fork, genesis_validators_root,
aggregate.data.slot,
aggregator,
aggregate_and_proof.selection_proof
):
return none(R)
block:
let sanity = batchCrypto
.pendingBuffer
.addAggregateAndProofSignature(
fork, genesis_validators_root,
aggregate_and_proof,
aggregator,
signed_aggregate_and_proof.signature
)
if not sanity:
return none(R)
let (sanity, sig) = batchCrypto
.pendingBuffer
.addAttestation(
fork, genesis_validators_root, epochRef,
aggregate
)
if not sanity:
return none(R)
let futSlot = newFuture[Result[void, cstring]](
let futSlot = newFuture[BatchResult](
"batch_validation.scheduleAggregateChecks.slotCheck"
)
let futAggregator = newFuture[Result[void, cstring]](
batch.resultsBuffer.add(futSlot)
block:
if not batch
.pendingBuffer
.addAggregateAndProofSignature(
fork, genesis_validators_root,
aggregate_and_proof,
aggregator,
signed_aggregate_and_proof.signature
):
batchCrypto.scheduleBatch(fresh)
return none(R)
let futAggregator = newFuture[BatchResult](
"batch_validation.scheduleAggregateChecks.aggregatorCheck"
)
let futAggregate = newFuture[Result[void, cstring]](
batch.resultsBuffer.add(futAggregator)
let sig = batch
.pendingBuffer
.addAttestation(
fork, genesis_validators_root, epochRef,
aggregate
)
if not sig.isSome():
batchCrypto.scheduleBatch(fresh)
return none(R)
let futAggregate = newFuture[BatchResult](
"batch_validation.scheduleAggregateChecks.aggregateCheck"
)
batch.resultsBuffer.add(futAggregate)
batchCrypto.schedule(futSlot, checkThreshold = false)
batchCrypto.schedule(futAggregator, checkThreshold = false)
batchCrypto.schedule(futAggregate)
batchCrypto.scheduleBatch(fresh)
return some(((futSlot, futAggregator, futAggregate), sig))
return some(((futSlot, futAggregator, futAggregate), sig.get()))

View File

@ -98,7 +98,11 @@ proc new*(T: type Eth2Processor,
exitPool: exitPool,
validatorPool: validatorPool,
quarantine: quarantine,
batchCrypto: BatchCrypto.new(rng = rng)
batchCrypto: BatchCrypto.new(
rng = rng,
# Only run eager attestation signature verification if we're not
# processing blocks in order to give priority to block processing
eager = proc(): bool = not verifQueues[].hasBlocks())
)
# Gossip Management
@ -219,12 +223,14 @@ proc attestationValidator*(
beacon_attestations_received.inc()
beacon_attestation_delay.observe(delay.toFloatSeconds())
let (attestation_index, sig) = v.get()
self[].checkForPotentialDoppelganger(
attestation.data, v.value.attestingIndices, wallSlot)
attestation.data, [attestation_index], wallSlot)
trace "Attestation validated"
let (attestingIndices, sig) = v.get()
self.verifQueues[].addAttestation(attestation, attestingIndices, sig)
self.attestationPool[].addAttestation(
attestation, [attestation_index], sig, wallSlot)
return ValidationResult.Accept
@ -264,8 +270,10 @@ proc aggregateValidator*(
beacon_aggregates_received.inc()
beacon_aggregate_delay.observe(delay.toFloatSeconds())
let (attesting_indices, sig) = v.get()
self[].checkForPotentialDoppelganger(
signedAggregateAndProof.message.aggregate.data, v.value.attestingIndices,
signedAggregateAndProof.message.aggregate.data, attesting_indices,
wallSlot)
trace "Aggregate validated",
@ -273,9 +281,8 @@ proc aggregateValidator*(
selection_proof = signedAggregateAndProof.message.selection_proof,
wallSlot
let (attestingIndices, sig) = v.get()
self.verifQueues[].addAggregate(
signedAggregateAndProof, attestingIndices, sig)
self.attestationPool[].addAttestation(
signedAggregateAndProof.message.aggregate, attesting_indices, sig, wallSlot)
return ValidationResult.Accept

View File

@ -22,12 +22,6 @@ import
declareHistogram beacon_store_block_duration_seconds,
"storeBlock() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf]
declareCounter beacon_attestations_dropped_queue_full,
"Number of attestations dropped because queue is full"
declareCounter beacon_aggregates_dropped_queue_full,
"Number of aggregates dropped because queue is full"
type
SyncBlock* = object
blk*: SignedBeaconBlock
@ -75,15 +69,6 @@ type
# Producers
# ----------------------------------------------------------------
blocksQueue*: AsyncQueue[BlockEntry] # Exported for "test_sync_manager"
# TODO:
# is there a point to separate
# attestations & aggregates here?
attestationsQueue: AsyncQueue[AttestationEntry]
attestationsDropped: int
attestationsDropTime: tuple[afterGenesis: bool, slot: Slot]
aggregatesQueue: AsyncQueue[AggregateEntry]
aggregatesDropped: int
aggregatesDropTime: tuple[afterGenesis: bool, slot: Slot]
# Consumer
# ----------------------------------------------------------------
@ -107,21 +92,8 @@ proc new*(T: type VerifQueueManager,
getWallTime: getWallTime,
blocksQueue: newAsyncQueue[BlockEntry](1),
# limit to the max number of aggregates we expect to see in one slot
aggregatesQueue: newAsyncQueue[AggregateEntry](
(TARGET_AGGREGATORS_PER_COMMITTEE * MAX_COMMITTEES_PER_SLOT).int),
# This queue is a bit harder to bound reasonably - we want to get a good
# spread of votes across committees - ideally at least TARGET_COMMITTEE_SIZE
# per committee - assuming randomness in vote arrival, this limit should
# cover that but of course, when votes arrive depends on a number of
# factors that are not entire random
attestationsQueue: newAsyncQueue[AttestationEntry](
(TARGET_COMMITTEE_SIZE * MAX_COMMITTEES_PER_SLOT).int),
blocksQueue: newAsyncQueue[BlockEntry](),
consensusManager: consensusManager,
attestationsDropTime: getWallTime().toSlot(),
aggregatesDropTime: getWallTime().toSlot(),
)
# Sync callbacks
@ -145,86 +117,25 @@ proc complete*(blk: SyncBlock, res: Result[void, BlockError]) =
if blk.resfut != nil:
blk.resfut.complete(res)
proc hasBlocks*(self: VerifQueueManager): bool =
self.blocksQueue.len() > 0
# Enqueue
# ------------------------------------------------------------------------------
proc addBlock*(self: var VerifQueueManager, syncBlock: SyncBlock) =
## Enqueue a Gossip-validated block for consensus verification
# Backpressure:
# If no item can be enqueued because buffer is full,
# we suspend here.
# There is no backpressure here - producers must wait for the future in the
# SyncBlock to constrain their own processing
# Producers:
# - Gossip (when synced)
# - SyncManager (during sync)
# - RequestManager (missing ancestor blocks)
# addLast doesn't fail
asyncSpawn(self.blocksQueue.addLast(BlockEntry(v: syncBlock)))
proc addAttestation*(
self: var VerifQueueManager, att: Attestation,
att_indices: seq[ValidatorIndex], sig: CookedSig) =
## Enqueue a Gossip-validated attestation for consensus verification
# Backpressure:
# If buffer is full, the oldest attestation is dropped and the newest is enqueued
# Producer:
# - Gossip (when synced)
while self.attestationsQueue.full():
self.attestationsDropped += 1
beacon_attestations_dropped_queue_full.inc()
try:
discard self.attestationsQueue.popFirstNoWait()
except AsyncQueueEmptyError as exc:
raiseAssert "If queue is full, we have at least one item! " & exc.msg
if self.attestationsDropped > 0:
let now = self.getWallTime().toSlot() # Print notice once per slot
if now != self.attestationsDropTime:
notice "Queue full, attestations dropped",
count = self.attestationsDropped
self.attestationsDropTime = now
self.attestationsDropped = 0
try:
self.attestationsQueue.addLastNoWait(
AttestationEntry(v: att, attesting_indices: att_indices, sig: sig))
except AsyncQueueFullError as exc:
raiseAssert "We just checked that queue is not full! " & exc.msg
proc addAggregate*(
self: var VerifQueueManager, agg: SignedAggregateAndProof,
att_indices: seq[ValidatorIndex], sig: CookedSig) =
## Enqueue a Gossip-validated aggregate attestation for consensus verification
# Backpressure:
# If buffer is full, the oldest aggregate is dropped and the newest is enqueued
# Producer:
# - Gossip (when synced)
while self.aggregatesQueue.full():
self.aggregatesDropped += 1
beacon_aggregates_dropped_queue_full.inc()
try:
discard self.aggregatesQueue.popFirstNoWait()
except AsyncQueueEmptyError as exc:
raiseAssert "We just checked that queue is not full! " & exc.msg
if self.aggregatesDropped > 0:
let now = self.getWallTime().toSlot() # Print notice once per slot
if now != self.aggregatesDropTime:
notice "Queue full, aggregates dropped",
count = self.aggregatesDropped
self.aggregatesDropTime = now
self.aggregatesDropped = 0
try:
self.aggregatesQueue.addLastNoWait(AggregateEntry(
v: agg.message.aggregate,
attesting_indices: att_indices,
sig: sig))
except AsyncQueueFullError as exc:
raiseAssert "We just checked that queue is not full! " & exc.msg
# addLast doesn't fail with unbounded queues, but we'll add asyncSpawn as a
# sanity check
asyncSpawn self.blocksQueue.addLast(BlockEntry(v: syncBlock))
# Storage
# ------------------------------------------------------------------------------
@ -272,40 +183,6 @@ proc storeBlock(
# Event Loop
# ------------------------------------------------------------------------------
proc processAttestation(
self: var VerifQueueManager, entry: AttestationEntry) =
logScope:
signature = shortLog(entry.v.signature)
let
wallTime = self.getWallTime()
(afterGenesis, wallSlot) = wallTime.toSlot()
if not afterGenesis:
error "Processing attestation before genesis, clock turned back?"
quit 1
trace "Processing attestation"
self.consensusManager.attestationPool[].addAttestation(
entry.v, entry.attesting_indices, entry.sig, wallSlot)
proc processAggregate(
self: var VerifQueueManager, entry: AggregateEntry) =
logScope:
signature = shortLog(entry.v.signature)
let
wallTime = self.getWallTime()
(afterGenesis, wallSlot) = wallTime.toSlot()
if not afterGenesis:
error "Processing aggregate before genesis, clock turned back?"
quit 1
trace "Processing aggregate"
self.consensusManager.attestationPool[].addAttestation(
entry.v, entry.attesting_indices, entry.sig, wallSlot)
proc processBlock(self: var VerifQueueManager, entry: BlockEntry) =
logScope:
blockRoot = shortLog(entry.v.blk.root)
@ -360,67 +237,18 @@ proc processBlock(self: var VerifQueueManager, entry: BlockEntry) =
entry.v.resFut.complete(Result[void, BlockError].err(res.error()))
proc runQueueProcessingLoop*(self: ref VerifQueueManager) {.async.} =
# Blocks in eth2 arrive on a schedule for every slot:
#
# * Block arrives at time 0
# * Attestations arrives at time 4
# * Aggregate arrives at time 8
var
blockFut = self[].blocksQueue.popFirst()
aggregateFut = self[].aggregatesQueue.popFirst()
attestationFut = self[].attestationsQueue.popFirst()
# TODO:
# revisit `idleTimeout`
# and especially `attestationBatch` in light of batch validation
# in particular we might want `attestationBatch` to drain both attestation & aggregates
while true:
# Cooperative concurrency: one idle calculation step per loop - because
# Cooperative concurrency: one block per loop iteration - because
# we run both networking and CPU-heavy things like block processing
# on the same thread, we need to make sure that there is steady progress
# on the networking side or we get long lockups that lead to timeouts.
const
# We cap waiting for an idle slot in case there's a lot of network traffic
# taking up all CPU - we don't want to _completely_ stop processing blocks
# in this case (attestations will get dropped) - doing so also allows us
# to benefit from more batching / larger network reads when under load.
# in this case - doing so also allows us to benefit from more batching /
# larger network reads when under load.
idleTimeout = 10.milliseconds
# Attestation processing is fairly quick and therefore done in batches to
# avoid some of the `Future` overhead
attestationBatch = 16
discard await idleAsync().withTimeout(idleTimeout)
# Avoid one more `await` when there's work to do
if not (blockFut.finished or aggregateFut.finished or attestationFut.finished):
trace "Waiting for processing work"
await blockFut or aggregateFut or attestationFut
# Only run one task per idle iteration, in priority order: blocks are needed
# for all other processing - then come aggregates which are cheap to
# process but might have a big impact on fork choice - last come
# attestations which individually have the smallest effect on chain progress
if blockFut.finished:
self[].processBlock(blockFut.read())
blockFut = self[].blocksQueue.popFirst()
elif aggregateFut.finished:
# aggregates will be dropped under heavy load on producer side
self[].processAggregate(aggregateFut.read())
for i in 0..<attestationBatch: # process a few at a time - this is fairly fast
if self[].aggregatesQueue.empty():
break
self[].processAggregate(self[].aggregatesQueue.popFirstNoWait())
aggregateFut = self[].aggregatesQueue.popFirst()
elif attestationFut.finished:
# attestations will be dropped under heavy load on producer side
self[].processAttestation(attestationFut.read())
for i in 0..<attestationBatch: # process a few at a time - this is fairly fast
if self[].attestationsQueue.empty():
break
self[].processAttestation(self[].attestationsQueue.popFirstNoWait())
attestationFut = self[].attestationsQueue.popFirst()
self[].processBlock(await self[].blocksQueue.popFirst())

View File

@ -9,9 +9,9 @@
import
# Standard library
std/[sequtils, intsets, deques],
std/[intsets, deques],
# Status
chronicles, chronos,
chronicles, chronos, metrics,
stew/results,
# Internals
../spec/[
@ -32,6 +32,12 @@ export ValidationResult
logScope:
topics = "gossip_checks"
declareCounter beacon_attestations_dropped_queue_full,
"Number of attestations dropped because queue is full"
declareCounter beacon_aggregates_dropped_queue_full,
"Number of aggregates dropped because queue is full"
# Internal checks
# ----------------------------------------------------------------
@ -149,7 +155,6 @@ func check_attestation_subnet(
# Gossip Validation
# ----------------------------------------------------------------
{.pop.} # async can raises anything
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#beacon_attestation_subnet_id
proc validateAttestation*(
@ -158,7 +163,7 @@ proc validateAttestation*(
attestation: Attestation,
wallTime: BeaconTime,
attestation_subnet: uint64, checksExpensive: bool):
Future[Result[tuple[attestingIndices: seq[ValidatorIndex], sig: CookedSig],
Future[Result[tuple[attesting_index: ValidatorIndex, sig: CookedSig],
(ValidationResult, cstring)]] {.async.} =
# Some of the checks below have been reordered compared to the spec, to
# perform the cheap checks first - in particular, we want to avoid loading
@ -247,13 +252,13 @@ proc validateAttestation*(
fork = getStateField(pool.chainDag.headState, fork)
genesis_validators_root =
getStateField(pool.chainDag.headState, genesis_validators_root)
attesting_indices = get_attesting_indices(
attesting_index = get_attesting_indices_one(
epochRef, attestation.data, attestation.aggregation_bits)
# The number of aggregation bits matches the committee size, which ensures
# this condition holds.
doAssert attesting_indices.len == 1, "Per bits check above"
let validator_index = toSeq(attesting_indices)[0]
doAssert attesting_index.isSome(), "We've checked bits length and one count already"
let validator_index = attesting_index.get()
# There has been no other valid attestation seen on an attestation subnet
# that has an identical `attestation.data.target.epoch` and participating
@ -271,7 +276,7 @@ proc validateAttestation*(
# TODO this means that (a) this becomes an "expensive" check and (b) it is
# doing in-principle unnecessary work, since this should be known from the
# attestation creation.
return ok((attesting_indices, attestation.signature.load.get().CookedSig))
return ok((validator_index, attestation.signature.load.get().CookedSig))
# The signature of attestation is valid.
block:
@ -295,9 +300,16 @@ proc validateAttestation*(
# Await the crypto check
let
(cryptoFut, sig) = deferredCrypto.get()
cryptoChecked = await cryptoFut
if cryptoChecked.isErr():
return err((ValidationResult.Reject, cryptoChecked.error))
var x = (await cryptoFut)
case x
of BatchResult.Invalid:
return err((ValidationResult.Reject, cstring("validateAttestation: invalid signature")))
of BatchResult.Timeout:
beacon_attestations_dropped_queue_full.inc()
return err((ValidationResult.Ignore, cstring("validateAttestation: timeout checking signature")))
of BatchResult.Valid:
discard # keep going only in this case
# Only valid attestations go in the list, which keeps validator_index
# in range
@ -306,7 +318,7 @@ proc validateAttestation*(
pool.nextAttestationEpoch[validator_index].subnet =
attestation.data.target.epoch + 1
return ok((attesting_indices, sig))
return ok((validator_index, sig))
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#beacon_aggregate_and_proof
proc validateAggregate*(
@ -430,28 +442,46 @@ proc validateAggregate*(
)
if deferredCrypto.isNone():
return err((ValidationResult.Reject,
cstring("validateAttestation: crypto sanity checks failure")))
cstring("validateAggregate: crypto sanity checks failure")))
# [REJECT] aggregate_and_proof.selection_proof
let
(cryptoFuts, sig) = deferredCrypto.get()
slotChecked = await cryptoFuts.slotCheck
if slotChecked.isErr():
return err((ValidationResult.Reject, cstring(
"Selection_proof signature verification failed")))
block:
# [REJECT] aggregate_and_proof.selection_proof
var x = await cryptoFuts.slotCheck
case x
of BatchResult.Invalid:
return err((ValidationResult.Reject, cstring("validateAggregate: invalid slot signature")))
of BatchResult.Timeout:
beacon_aggregates_dropped_queue_full.inc()
return err((ValidationResult.Reject, cstring("validateAggregate: timeout checking slot signature")))
of BatchResult.Valid:
discard
block:
# [REJECT] The aggregator signature, signed_aggregate_and_proof.signature, is valid.
let aggregatorChecked = await cryptoFuts.aggregatorCheck
if aggregatorChecked.isErr():
return err((ValidationResult.Reject, cstring(
"signed_aggregate_and_proof aggregator signature verification failed")))
var x = await cryptoFuts.aggregatorCheck
case x
of BatchResult.Invalid:
return err((ValidationResult.Reject, cstring("validateAggregate: invalid aggregator signature")))
of BatchResult.Timeout:
beacon_aggregates_dropped_queue_full.inc()
return err((ValidationResult.Reject, cstring("validateAggregate: timeout checking aggregator signature")))
of BatchResult.Valid:
discard
block:
# [REJECT] The aggregator signature, signed_aggregate_and_proof.signature, is valid.
let aggregateChecked = await cryptoFuts.aggregateCheck
if aggregateChecked.isErr():
return err((ValidationResult.Reject, cstring(
"signed_aggregate_and_proof aggregate attester signatures verification failed")))
var x = await cryptoFuts.aggregateCheck
case x
of BatchResult.Invalid:
return err((ValidationResult.Reject, cstring("validateAggregate: invalid aggregate signature")))
of BatchResult.Timeout:
beacon_aggregates_dropped_queue_full.inc()
return err((ValidationResult.Reject, cstring("validateAggregate: timeout checking aggregate signature")))
of BatchResult.Valid:
discard
# The following rule follows implicitly from that we clear out any
# unviable blocks from the chain dag:
@ -474,8 +504,6 @@ proc validateAggregate*(
return ok((attesting_indices, sig))
{.push raises: [Defect].}
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#beacon_block
proc isValidBeaconBlock*(
dag: ChainDAGRef, quarantine: var QuarantineRef,

View File

@ -53,5 +53,5 @@ func makeDeposit*(
withdrawal_credentials: makeWithdrawalCredentials(pubkey))
if skipBLSValidation notin flags:
result.signature = preset.get_deposit_signature(result, privkey)
result.signature = preset.get_deposit_signature(result, privkey).toValidatorSig()

View File

@ -1665,22 +1665,29 @@ proc setValidTopics*(node: Eth2Node, topics: openArray[string]) =
proc(topic: string): bool {.gcsafe, raises: [Defect].} =
topic in node.validTopics
proc newValidationResultFuture(v: ValidationResult): Future[ValidationResult] =
let res = newFuture[ValidationResult]("eth2_network.execValidator")
res.complete(v)
res
proc addValidator*[MsgType](node: Eth2Node,
topic: string,
msgValidator: proc(msg: MsgType):
ValidationResult {.gcsafe, raises: [Defect].} ) =
# Validate messages as soon as subscribed
proc execValidator(
topic: string, message: GossipMsg): Future[ValidationResult] {.async.} =
topic: string, message: GossipMsg): Future[ValidationResult] {.raises: [Defect].} =
inc nbc_gossip_messages_received
trace "Validating incoming gossip message",
len = message.data.len, topic
let res =
try:
let decompressed = snappy.decode(message.data, GOSSIP_MAX_SIZE)
var decompressed = snappy.decode(message.data, GOSSIP_MAX_SIZE)
if decompressed.len > 0:
msgValidator(SSZ.decode(decompressed, MsgType))
let decoded = SSZ.decode(decompressed, MsgType)
decompressed = newSeq[byte](0) # release memory before validating
msgValidator(decoded)
else:
debug "Empty gossip data after decompression",
topic, len = message.data.len
@ -1689,7 +1696,7 @@ proc addValidator*[MsgType](node: Eth2Node,
debug "Gossip validation error",
msg = err.msg, topic, len = message.data.len
ValidationResult.Ignore
return res
return newValidationResultFuture(res)
try:
node.pubsub.addValidator(topic & "_snappy", execValidator)
@ -1699,31 +1706,28 @@ proc addAsyncValidator*[MsgType](node: Eth2Node,
topic: string,
msgValidator: proc(msg: MsgType):
Future[ValidationResult] {.gcsafe, raises: [Defect].} ) =
proc execValidator(
topic: string, message: GossipMsg): Future[ValidationResult] {.raises: [Defect].} =
inc nbc_gossip_messages_received
trace "Validating incoming gossip message",
len = message.data.len, topic
let decompressed = snappy.decode(message.data, GOSSIP_MAX_SIZE)
if decompressed.len == 0:
debug "Empty gossip data after decompression",
topic, len = message.data.len
result.complete(ValidationResult.Ignore)
return
let decoded = try:
SSZ.decode(decompressed, MsgType)
except:
error "SSZ decoding failure",
topic, len = message.data.len
result.complete(ValidationResult.Ignore)
return
return msgValidator(decoded)
let res =
try:
var decompressed = snappy.decode(message.data, GOSSIP_MAX_SIZE)
if decompressed.len > 0:
let decoded = SSZ.decode(decompressed, MsgType)
decompressed = newSeq[byte](0) # release memory before validating
return msgValidator(decoded) # Reuses future from msgValidator
else:
debug "Empty gossip data after decompression",
topic, len = message.data.len
ValidationResult.Ignore
except CatchableError as err:
debug "Gossip validation error",
msg = err.msg, topic, len = message.data.len
ValidationResult.Ignore
return newValidationResultFuture(res)
try:
node.pubsub.addValidator(topic & "_snappy", execValidator)

View File

@ -1598,7 +1598,7 @@ proc handleValidatorExitCommand(config: BeaconNodeConf) {.async.} =
validator_index: validatorIdx))
signedExit.signature = get_voluntary_exit_signature(
fork, genesisValidatorsRoot, signedExit.message, signingKey.get)
fork, genesisValidatorsRoot, signedExit.message, signingKey.get).toValidatorSig()
template ask(prompt: string): string =
try:

View File

@ -33,4 +33,4 @@ programMain:
let privKey = validators[ValidatorPubKey.fromHex(args[0]).get()]
echo blsSign(privKey, Eth2Digest.fromHex(args[1]).data)
echo blsSign(privKey, Eth2Digest.fromHex(args[1]).data).toValidatorSig()

View File

@ -50,7 +50,7 @@ type
# signatures lazily - this helps operations like comparisons and hashes to
# be fast (which is important), makes loading blocks and states fast, and
# allows invalid values in the SSZ byte stream, which is valid from an SSZ
# point of view - the invalid values are later processed to
# point of view - the invalid values are caught later
ValidatorPubKey* = object
blob*: array[RawPubKeySize, byte]
@ -69,10 +69,9 @@ type
SomeSig* = TrustedSig | ValidatorSig
CookedSig* = distinct blscurve.Signature ## \
## Allows loading in an atttestation or other message's signature once across
## all its computations, rather than repeatedly re-loading it each time it is
## referenced. This primarily currently serves the attestation pool.
## Cooked signatures are those that have been loaded successfully from a
## ValidatorSig and are used to avoid expensive reloading as well as error
## checking
export AggregateSignature
# API
@ -108,36 +107,26 @@ proc loadWithCache*(v: ValidatorPubKey): Option[blscurve.PublicKey] =
else:
none blscurve.PublicKey
proc load*(v: ValidatorSig): Option[blscurve.Signature] =
proc load*(v: ValidatorSig): Option[CookedSig] =
## Parse signature blob - this may fail
var parsed: blscurve.Signature
if fromBytes(parsed, v.blob):
some(parsed)
some(CookedSig(parsed))
else:
none(blscurve.Signature)
func init*(agg: var AggregateSignature, sig: ValidatorSig) {.inline.}=
## Initializes an aggregate signature context
## This assumes that the signature is valid
agg.init(sig.load().get())
none(CookedSig)
func init*(agg: var AggregateSignature, sig: CookedSig) {.inline.}=
## Initializes an aggregate signature context
agg.init(blscurve.Signature(sig))
func init*(T: type AggregateSignature, sig: CookedSig | ValidatorSig): T =
func init*(T: type AggregateSignature, sig: CookedSig): T =
result.init(sig)
proc aggregate*(agg: var AggregateSignature, sig: ValidatorSig) {.inline.}=
## Aggregate two Validator Signatures
## Both signatures must be valid
agg.aggregate(sig.load.get())
proc aggregate*(agg: var AggregateSignature, sig: CookedSig) {.inline.}=
## Aggregate two Validator Signatures
## Aggregate two valid Validator Signatures
agg.aggregate(blscurve.Signature(sig))
func finish*(agg: AggregateSignature): CookedSig {.inline.}=
func finish*(agg: AggregateSignature): CookedSig {.inline.} =
## Canonicalize an AggregateSignature into a signature
var sig: blscurve.Signature
sig.finish(agg)
@ -146,7 +135,7 @@ func finish*(agg: AggregateSignature): CookedSig {.inline.}=
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/beacon-chain.md#bls-signatures
proc blsVerify*(
pubkey: ValidatorPubKey, message: openArray[byte],
signature: ValidatorSig): bool =
signature: CookedSig): bool =
## Check that a signature is valid for a message
## under the provided public key.
## returns `true` if the signature is valid, `false` otherwise.
@ -155,19 +144,23 @@ proc blsVerify*(
## It is recommended to use the overload that accepts a proof-of-possession
## to enforce correct usage.
let
parsedSig = signature.load()
parsedKey = pubkey.loadWithCache()
# It may happen that signatures or keys fail to parse as invalid blobs may
# be passed around - for example, the deposit contract doesn't verify
# signatures, so the loading happens lazily at verification time instead!
parsedKey.isSome() and
parsedKey.get.verify(message, blscurve.Signature(signature))
proc blsVerify*(
pubkey: ValidatorPubKey, message: openArray[byte],
signature: ValidatorSig): bool =
let parsedSig = signature.load()
if parsedSig.isNone():
false
else:
let
parsedKey = pubkey.loadWithCache()
# It may happen that signatures or keys fail to parse as invalid blobs may
# be passed around - for example, the deposit contract doesn't verify
# signatures, so the loading happens lazily at verification time instead!
parsedKey.isSome() and
parsedKey.get.verify(message, parsedSig.get())
blsVerify(pubkey, message, parsedSig.get())
proc blsVerify*(sigSet: SignatureSet): bool =
## Unbatched verification
@ -179,15 +172,14 @@ proc blsVerify*(sigSet: SignatureSet): bool =
sigSet.signature
)
func blsSign*(privkey: ValidatorPrivKey, message: openArray[byte]): ValidatorSig =
func blsSign*(privkey: ValidatorPrivKey, message: openArray[byte]): CookedSig =
## Computes a signature from a secret key and a message
let sig = SecretKey(privkey).sign(message)
ValidatorSig(blob: sig.exportRaw())
CookedSig(SecretKey(privkey).sign(message))
proc blsFastAggregateVerify*(
publicKeys: openArray[ValidatorPubKey],
message: openArray[byte],
signature: ValidatorSig
signature: CookedSig
): bool =
## Verify the aggregate of multiple signatures on the same message
## This function is faster than AggregateVerify
@ -207,9 +199,6 @@ proc blsFastAggregateVerify*(
# in blscurve which already exists internally
# - or at network/databases/serialization boundaries we do not
# allow invalid BLS objects to pollute consensus routines
let parsedSig = signature.load()
if not parsedSig.isSome():
return false
var unwrapped: seq[PublicKey]
for pubkey in publicKeys:
let realkey = pubkey.loadWithCache()
@ -217,7 +206,18 @@ proc blsFastAggregateVerify*(
return false
unwrapped.add realkey.get
fastAggregateVerify(unwrapped, message, parsedSig.get())
fastAggregateVerify(unwrapped, message, blscurve.Signature(signature))
proc blsFastAggregateVerify*(
publicKeys: openArray[ValidatorPubKey],
message: openArray[byte],
signature: ValidatorSig
): bool =
let parsedSig = signature.load()
if not parsedSig.isSome():
false
else:
blsFastAggregateVerify(publicKeys, message, parsedSig.get())
proc toGaugeValue*(hash: Eth2Digest): int64 =
# Only the last 8 bytes are taken into consideration in accordance
@ -252,7 +252,7 @@ template toRaw*(x: TrustedSig): auto =
func toHex*(x: BlsCurveType): string =
toHex(toRaw(x))
func exportRaw*(x: CookedSig): ValidatorSig =
func toValidatorSig*(x: CookedSig): ValidatorSig =
ValidatorSig(blob: blscurve.Signature(x).exportRaw())
func fromRaw*(T: type ValidatorPrivKey, bytes: openArray[byte]): BlsResult[T] =

View File

@ -761,5 +761,5 @@ proc prepareDeposit*(preset: RuntimePreset,
pubkey: signingPubKey,
withdrawal_credentials: makeWithdrawalCredentials(withdrawalPubKey))
res.signature = preset.get_deposit_signature(res, signingKey)
res.signature = preset.get_deposit_signature(res, signingKey).toValidatorSig()
return res

View File

@ -35,7 +35,7 @@ func compute_slot_root*(
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#aggregation-selection
func get_slot_signature*(
fork: Fork, genesis_validators_root: Eth2Digest, slot: Slot,
privkey: ValidatorPrivKey): ValidatorSig =
privkey: ValidatorPrivKey): CookedSig =
blsSign(privKey, compute_slot_root(fork, genesis_validators_root, slot).data)
proc verify_slot_signature*(
@ -60,7 +60,7 @@ func compute_epoch_root*(
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#randao-reveal
func get_epoch_signature*(
fork: Fork, genesis_validators_root: Eth2Digest, epoch: Epoch,
privkey: ValidatorPrivKey): ValidatorSig =
privkey: ValidatorPrivKey): CookedSig =
blsSign(privKey, compute_epoch_root(fork, genesis_validators_root, epoch).data)
proc verify_epoch_signature*(
@ -85,7 +85,7 @@ func compute_block_root*(
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#signature
func get_block_signature*(
fork: Fork, genesis_validators_root: Eth2Digest, slot: Slot,
root: Eth2Digest, privkey: ValidatorPrivKey): ValidatorSig =
root: Eth2Digest, privkey: ValidatorPrivKey): CookedSig =
blsSign(privKey, compute_block_root(fork, genesis_validators_root, slot, root).data)
proc verify_block_signature*(
@ -114,7 +114,7 @@ func compute_aggregate_and_proof_root*(fork: Fork, genesis_validators_root: Eth2
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#broadcast-aggregate
func get_aggregate_and_proof_signature*(fork: Fork, genesis_validators_root: Eth2Digest,
aggregate_and_proof: AggregateAndProof,
privKey: ValidatorPrivKey): ValidatorSig =
privKey: ValidatorPrivKey): CookedSig =
blsSign(privKey, compute_aggregate_and_proof_root(fork, genesis_validators_root,
aggregate_and_proof).data)
@ -144,7 +144,7 @@ func compute_attestation_root*(
func get_attestation_signature*(
fork: Fork, genesis_validators_root: Eth2Digest,
attestation_data: AttestationData,
privkey: ValidatorPrivKey): ValidatorSig =
privkey: ValidatorPrivKey): CookedSig =
blsSign(privKey, compute_attestation_root(fork, genesis_validators_root,
attestation_data).data)
@ -165,7 +165,7 @@ proc verify_attestation_signature*(
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/beacon-chain.md#deposits
func get_deposit_signature*(preset: RuntimePreset,
deposit: DepositData,
privkey: ValidatorPrivKey): ValidatorSig =
privkey: ValidatorPrivKey): CookedSig =
let
deposit_message = deposit.getDepositMessage()
# Fork-agnostic domain since deposits are valid across forks
@ -188,7 +188,7 @@ func get_voluntary_exit_signature*(
fork: Fork,
genesis_validators_root: Eth2Digest,
voluntary_exit: VoluntaryExit,
privkey: ValidatorPrivKey): ValidatorSig =
privkey: ValidatorPrivKey): CookedSig =
let
domain = get_domain(
fork, DOMAIN_VOLUNTARY_EXIT, voluntary_exit.epoch, genesis_validators_root)

View File

@ -30,7 +30,7 @@ func `$`*(s: SignatureSet): string =
# unlike when Nimbus did eager loading which ensured they were correct beforehand
template loadOrExit(signature: ValidatorSig, failReturn: auto):
blscurve.Signature =
CookedSig =
## Load a BLS signature from a raw signature
## Exits the **caller** with false if the signature is invalid
let sig = signature.load()
@ -51,11 +51,11 @@ func addSignatureSet[T](
sigs: var seq[SignatureSet],
pubkey: blscurve.PublicKey,
sszObj: T,
signature: ValidatorSig | blscurve.Signature,
signature: CookedSig,
genesis_validators_root: Eth2Digest,
fork: Fork,
epoch: Epoch,
domain: DomainType): bool {.raises: [Defect].}=
domain: DomainType) =
## Add a new signature set triplet (pubkey, message, signature)
## to a collection of signature sets for batch verification.
## Can return false if `signature` wasn't deserialized to a valid BLS signature.
@ -68,20 +68,11 @@ func addSignatureSet[T](
)
).data
when signature is ValidatorSig:
sigs.add((
pubkey,
signing_root,
signature.loadOrExit(false)
))
else:
sigs.add((
pubkey,
signing_root,
signature
))
return true
sigs.add((
pubkey,
signing_root,
blscurve.Signature(signature)
))
proc aggregateAttesters(
aggPK: var blscurve.PublicKey,
@ -138,15 +129,14 @@ proc addIndexedAttestation(
if not aggPK.aggregateAttesters(attestation, state):
return false
if not sigs.addSignatureSet(
sigs.addSignatureSet(
aggPK,
attestation.data,
attestation.signature,
attestation.signature.loadOrExit(false),
state.genesis_validators_root,
state.fork,
attestation.data.target.epoch,
DOMAIN_BEACON_ATTESTER):
return false
DOMAIN_BEACON_ATTESTER)
return true
proc addAttestation(
@ -155,39 +145,38 @@ proc addAttestation(
state: BeaconState,
cache: var StateCache
): bool =
result = false
var inited = false
var attestersAgg{.noInit.}: AggregatePublicKey
for valIndex in state.get_attesting_indices(
attestation.data,
attestation.aggregation_bits,
cache
):
if not result: # first iteration
if not inited: # first iteration
attestersAgg.init(state.validators[valIndex]
.pubkey.loadWithCacheOrExit(false))
result = true
inited = true
else:
attestersAgg.aggregate(state.validators[valIndex]
.pubkey.loadWithCacheOrExit(false))
if not result:
if not inited:
# There were no attesters
return false
var attesters{.noinit.}: blscurve.PublicKey
attesters.finish(attestersAgg)
if not sigs.addSignatureSet(
sigs.addSignatureSet(
attesters,
attestation.data,
attestation.signature,
attestation.signature.loadOrExit(false),
state.genesis_validators_root,
state.fork,
attestation.data.target.epoch,
DOMAIN_BEACON_ATTESTER):
return false
return true
DOMAIN_BEACON_ATTESTER)
true
# Public API
# ------------------------------------------------------
@ -197,7 +186,7 @@ proc addAttestation*(
fork: Fork, genesis_validators_root: Eth2Digest,
epochRef: auto,
attestation: Attestation
): tuple[valid: bool, sig: CookedSig] =
): Option[CookedSig] =
## Add an attestation for batched BLS verification
## purposes
## This only verifies cryptography
@ -207,75 +196,38 @@ proc addAttestation*(
## In that case the seq[SignatureSet] is unmodified
mixin get_attesting_indices, validator_keys, pubkey
let defaultFail = (false, default(CookedSig))
result = defaultFail
var inited = false
var attestersAgg{.noInit.}: AggregatePublicKey
for valIndex in epochRef.get_attesting_indices(
attestation.data,
attestation.aggregation_bits):
if not result.valid: # first iteration
if not inited: # first iteration
attestersAgg.init(epochRef.validator_keys[valIndex]
.loadWithCacheOrExit(defaultFail))
result.valid = true
.loadWithCacheOrExit(none(CookedSig)))
inited = true
else:
attestersAgg.aggregate(epochRef.validator_keys[valIndex]
.loadWithCacheOrExit(defaultFail))
.loadWithCacheOrExit(none(CookedSig)))
if not result.valid:
if not inited:
# There were no attesters
return defaultFail
return none(CookedSig)
var attesters{.noinit.}: blscurve.PublicKey
attesters.finish(attestersAgg)
let cookedSig = attestation.signature.loadOrExit(defaultFail)
let cookedSig = attestation.signature.loadOrExit(none(CookedSig))
return (
sigs.addSignatureSet(
sigs.addSignatureSet(
attesters,
attestation.data,
cookedSig,
genesis_validators_root,
fork,
attestation.data.target.epoch,
DOMAIN_BEACON_ATTESTER),
CookedSig(cookedSig))
DOMAIN_BEACON_ATTESTER)
proc addIndexedAttestation*(
sigs: var seq[SignatureSet],
fork: Fork, genesis_validators_root: Eth2Digest,
epochRef: auto,
attestation: IndexedAttestation,
): bool =
## Add an indexed attestation for batched BLS verification
## purposes
## This only verifies cryptography, checking that
## the indices are sorted and unique is not checked for example.
##
## Returns true if the indexed attestations was added to the batching buffer
## Returns false if saniy checks failed (non-empty, keys are valid)
## In that case the seq[SignatureSet] is unmodified
if attestation.attesting_indices.len == 0:
# Aggregation spec requires non-empty collection
# - https://tools.ietf.org/html/draft-irtf-cfrg-bls-signature-04
# Eth2 spec requires at least one attesting indice in slashing
# - https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/beacon-chain.md#is_valid_indexed_attestation
return false
var aggPK {.noInit.}: blscurve.PublicKey
if not aggPK.aggregateAttesters(attestation, epochRef):
return false
return sigs.addSignatureSet(
aggPK,
attestation.data,
attestation.signature,
genesis_validators_root,
fork,
attestation.data.target.epoch,
DOMAIN_BEACON_ATTESTER)
some(CookedSig(cookedSig))
proc addSlotSignature*(
sigs: var seq[SignatureSet],
@ -283,18 +235,19 @@ proc addSlotSignature*(
slot: Slot,
pubkey: ValidatorPubKey,
signature: ValidatorSig): bool =
let epoch = compute_epoch_at_slot(slot)
return sigs.addSignatureSet(
sigs.addSignatureSet(
pubkey.loadWithCacheOrExit(false),
sszObj = slot,
signature,
signature.loadOrExit(false),
genesis_validators_root,
fork,
epoch,
DOMAIN_SELECTION_PROOF
)
true
proc addAggregateAndProofSignature*(
sigs: var seq[SignatureSet],
fork: Fork, genesis_validators_root: Eth2Digest,
@ -304,16 +257,18 @@ proc addAggregateAndProofSignature*(
): bool =
let epoch = compute_epoch_at_slot(aggregate_and_proof.aggregate.data.slot)
return sigs.addSignatureSet(
sigs.addSignatureSet(
pubkey.loadWithCacheOrExit(false),
sszObj = aggregate_and_proof,
signature,
signature.loadOrExit(false),
genesis_validators_root,
fork,
epoch,
DOMAIN_AGGREGATE_AND_PROOF
)
true
proc collectSignatureSets*(
sigs: var seq[SignatureSet],
signed_block: SignedBeaconBlock,
@ -345,27 +300,25 @@ proc collectSignatureSets*(
# 1. Block proposer
# ----------------------------------------------------
if not sigs.addSignatureSet(
sigs.addSignatureSet(
pubkey,
signed_block.message,
signed_block.signature,
signed_block.signature.loadOrExit(false),
state.genesis_validators_root,
state.fork,
epoch,
DOMAIN_BEACON_PROPOSER):
return false
DOMAIN_BEACON_PROPOSER)
# 2. Randao Reveal
# ----------------------------------------------------
if not sigs.addSignatureSet(
sigs.addSignatureSet(
pubkey,
epoch,
signed_block.message.body.randao_reveal,
signed_block.message.body.randao_reveal.loadOrExit(false),
state.genesis_validators_root,
state.fork,
epoch,
DOMAIN_RANDAO):
return false
DOMAIN_RANDAO)
# 3. Proposer slashings
# ----------------------------------------------------
@ -386,32 +339,30 @@ proc collectSignatureSets*(
let header_1 = slashing.signed_header_1
let proposer1 = state.validators[header_1.message.proposer_index]
let epoch1 = header_1.message.slot.compute_epoch_at_slot()
if not sigs.addSignatureSet(
sigs.addSignatureSet(
proposer1.pubkey.loadWithCacheOrExit(false),
header_1.message,
header_1.signature,
header_1.signature.loadOrExit(false),
state.genesis_validators_root,
state.fork,
epoch1,
DOMAIN_BEACON_PROPOSER
):
return false
)
# Conflicting block 2
block:
let header_2 = slashing.signed_header_2
let proposer2 = state.validators[header_2.message.proposer_index]
let epoch2 = header_2.message.slot.compute_epoch_at_slot()
if not sigs.addSignatureSet(
sigs.addSignatureSet(
proposer2.pubkey.loadWithCacheOrExit(false),
header_2.message,
header_2.signature,
header_2.signature.loadOrExit(false),
state.genesis_validators_root,
state.fork,
epoch2,
DOMAIN_BEACON_PROPOSER
):
return false
)
# 4. Attester slashings
# ----------------------------------------------------
@ -466,15 +417,14 @@ proc collectSignatureSets*(
# fixed in 1.4.2
template volex: untyped = signed_block.message.body.voluntary_exits[i]
if not sigs.addSignatureSet(
sigs.addSignatureSet(
state.validators[volex.message.validator_index]
.pubkey.loadWithCacheOrExit(false),
volex.message,
volex.signature,
volex.signature.loadOrExit(false),
state.genesis_validators_root,
state.fork,
volex.message.epoch,
DOMAIN_VOLUNTARY_EXIT):
return false
DOMAIN_VOLUNTARY_EXIT)
return true

View File

@ -25,8 +25,9 @@ func init*(T: type ValidatorPool,
## `genesis_validators_root` is used as an unique ID for the
## blockchain
## `backend` is the KeyValue Store backend
result.validators = initTable[ValidatorPubKey, AttachedValidator]()
result.slashingProtection = slashingProtectionDB
T(
slashingProtection: slashingProtectionDB
)
template count*(pool: ValidatorPool): int =
pool.validators.len
@ -72,23 +73,23 @@ proc signWithRemoteValidator(v: AttachedValidator, data: Eth2Digest):
proc signBlockProposal*(v: AttachedValidator, fork: Fork,
genesis_validators_root: Eth2Digest, slot: Slot,
blockRoot: Eth2Digest): Future[ValidatorSig] {.async.} =
if v.kind == inProcess:
result = get_block_signature(
fork, genesis_validators_root, slot, blockRoot, v.privKey)
return if v.kind == inProcess:
get_block_signature(
fork, genesis_validators_root, slot, blockRoot, v.privKey).toValidatorSig()
else:
let root = compute_block_root(fork, genesis_validators_root, slot, blockRoot)
result = await signWithRemoteValidator(v, root)
await signWithRemoteValidator(v, root)
proc signAttestation*(v: AttachedValidator,
attestation: AttestationData,
fork: Fork, genesis_validators_root: Eth2Digest):
Future[ValidatorSig] {.async.} =
if v.kind == inProcess:
result = get_attestation_signature(
fork, genesis_validators_root, attestation, v.privKey)
return if v.kind == inProcess:
get_attestation_signature(
fork, genesis_validators_root, attestation, v.privKey).toValidatorSig()
else:
let root = compute_attestation_root(fork, genesis_validators_root, attestation)
result = await signWithRemoteValidator(v, root)
await signWithRemoteValidator(v, root)
proc produceAndSignAttestation*(validator: AttachedValidator,
attestationData: AttestationData,
@ -107,36 +108,36 @@ proc signAggregateAndProof*(v: AttachedValidator,
aggregate_and_proof: AggregateAndProof,
fork: Fork, genesis_validators_root: Eth2Digest):
Future[ValidatorSig] {.async.} =
if v.kind == inProcess:
result = get_aggregate_and_proof_signature(
fork, genesis_validators_root, aggregate_and_proof, v.privKey)
return if v.kind == inProcess:
get_aggregate_and_proof_signature(
fork, genesis_validators_root, aggregate_and_proof, v.privKey).toValidatorSig()
else:
let root = compute_aggregate_and_proof_root(
fork, genesis_validators_root, aggregate_and_proof)
result = await signWithRemoteValidator(v, root)
await signWithRemoteValidator(v, root)
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#randao-reveal
func genRandaoReveal*(k: ValidatorPrivKey, fork: Fork,
genesis_validators_root: Eth2Digest, slot: Slot): ValidatorSig =
genesis_validators_root: Eth2Digest, slot: Slot): CookedSig =
get_epoch_signature(
fork, genesis_validators_root, slot.compute_epoch_at_slot, k)
proc genRandaoReveal*(v: AttachedValidator, fork: Fork,
genesis_validators_root: Eth2Digest, slot: Slot):
Future[ValidatorSig] {.async.} =
if v.kind == inProcess:
return genRandaoReveal(v.privKey, fork, genesis_validators_root, slot)
return if v.kind == inProcess:
genRandaoReveal(v.privKey, fork, genesis_validators_root, slot).toValidatorSig()
else:
let root = compute_epoch_root(
fork, genesis_validators_root, slot.compute_epoch_at_slot)
result = await signWithRemoteValidator(v, root)
await signWithRemoteValidator(v, root)
proc getSlotSig*(v: AttachedValidator, fork: Fork,
genesis_validators_root: Eth2Digest, slot: Slot
): Future[ValidatorSig] {.async.} =
if v.kind == inProcess:
result = get_slot_signature(
fork, genesis_validators_root, slot, v.privKey)
return if v.kind == inProcess:
get_slot_signature(
fork, genesis_validators_root, slot, v.privKey).toValidatorSig()
else:
let root = compute_slot_root(fork, genesis_validators_root, slot)
result = await signWithRemoteValidator(v, root)
await signWithRemoteValidator(v, root)

View File

@ -117,8 +117,8 @@ cli do(slots = SLOTS_PER_EPOCH * 5,
Attestation(
data: data,
aggregation_bits: aggregation_bits,
signature: sig
), @[validatorIdx], sig.load.get().CookedSig, data.slot)
signature: sig.toValidatorSig()
), [validatorIdx], sig, data.slot)
proc proposeBlock(slot: Slot) =
if rand(r, 1.0) > blockRatio:
@ -141,7 +141,8 @@ cli do(slots = SLOTS_PER_EPOCH * 5,
hashedState,
proposerIdx,
head.root,
privKey.genRandaoReveal(state.fork, state.genesis_validators_root, slot),
privKey.genRandaoReveal(state.fork, state.genesis_validators_root,
slot).toValidatorSig(),
eth1ProposalData.vote,
default(GraffitiBytes),
attPool.getAttestationsForBlock(state, cache),
@ -164,7 +165,7 @@ cli do(slots = SLOTS_PER_EPOCH * 5,
newBlock.signature = withTimerRet(timers[tSignBlock]):
get_block_signature(
state.fork, state.genesis_validators_root, newBlock.message.slot,
blockRoot, privKey)
blockRoot, privKey).toValidatorSig()
let added = chainDag.addRawBlock(quarantine, newBlock) do (
blckRef: BlockRef, signedBlock: TrustedSignedBeaconBlock,

View File

@ -131,7 +131,7 @@ cli do(slots = SLOTS_PER_EPOCH * 5,
attestation =
makeAttestation(state[].data, latest_block_root, scas, target_slot,
i.CommitteeIndex, v, cache, flags)
agg.init(attestation.signature)
agg.init(attestation.signature.load.get())
first = false
else:
let att2 =
@ -140,8 +140,8 @@ cli do(slots = SLOTS_PER_EPOCH * 5,
if not att2.aggregation_bits.overlaps(attestation.aggregation_bits):
attestation.aggregation_bits.incl(att2.aggregation_bits)
if skipBlsValidation notin flags:
agg.aggregate(att2.signature)
attestation.signature = agg.finish().exportRaw()
agg.aggregate(att2.signature.load.get())
attestation.signature = agg.finish().toValidatorSig()
if not first:
# add the attestation if any of the validators attested, as given

View File

@ -21,6 +21,7 @@ import # Unit test
./test_discovery,
./test_eth1_monitor,
./test_exit_pool,
./test_gossip_validation,
./test_helpers,
./test_honest_validator,
./test_interop,

View File

@ -78,7 +78,7 @@ proc signMockAttestation*(state: BeaconState, attestation: var Attestation) =
agg.aggregate(sig)
if first_iter != true:
attestation.signature = agg.finish().exportRaw()
attestation.signature = agg.finish().toValidatorSig()
# Otherwise no participants so zero sig
proc mockAttestationImpl(

View File

@ -8,7 +8,7 @@
import
options,
# Specs
../../beacon_chain/spec/[datatypes, helpers, signatures, validator],
../../beacon_chain/spec/[crypto, datatypes, helpers, signatures, validator],
# Internals
../../beacon_chain/ssz,
# Mock helpers
@ -28,11 +28,11 @@ proc signMockBlockImpl(
signedBlock.message.body.randao_reveal = get_epoch_signature(
state.fork, state.genesis_validators_root, block_slot.compute_epoch_at_slot,
privkey)
privkey).toValidatorSig()
signedBlock.root = hash_tree_root(signedBlock.message)
signedBlock.signature = get_block_signature(
state.fork, state.genesis_validators_root, block_slot,
signedBlock.root, privkey)
signedBlock.root, privkey).toValidatorSig()
proc signMockBlock*(state: BeaconState, signedBlock: var SignedBeaconBlock) =
signMockBlockImpl(state, signedBlock)

View File

@ -44,7 +44,7 @@ func mockDepositData(
): DepositData =
var ret = mockDepositData(pubkey, amount)
if skipBlsValidation notin flags:
ret.signature = defaultRuntimePreset.get_deposit_signature(ret, privkey)
ret.signature = defaultRuntimePreset.get_deposit_signature(ret, privkey).toValidatorSig()
ret
template mockGenesisDepositsImpl(

View File

@ -15,14 +15,14 @@ import
stew/byteutils,
eth/keys,
# Internal
../beacon_chain/[beacon_node_types, extras, beacon_clock],
../beacon_chain/gossip_processing/[gossip_validation, batch_validation],
../beacon_chain/[beacon_node_types, extras],
../beacon_chain/gossip_processing/[gossip_validation],
../beacon_chain/fork_choice/[fork_choice_types, fork_choice],
../beacon_chain/consensus_object_pools/[
block_quarantine, blockchain_dag, block_clearance, attestation_pool],
../beacon_chain/ssz/merkleization,
../beacon_chain/spec/[crypto, datatypes, digest, validator, state_transition,
helpers, beaconstate, presets, network],
helpers, beaconstate, presets],
# Test utilities
./testutil, ./testblockutil
@ -41,12 +41,12 @@ func combine(tgt: var Attestation, src: Attestation) =
tgt.aggregation_bits.incl(src.aggregation_bits)
var agg {.noInit.}: AggregateSignature
agg.init(tgt.signature)
agg.aggregate(src.signature)
tgt.signature = agg.finish().exportRaw()
agg.init(tgt.signature.load().get())
agg.aggregate(src.signature.load.get())
tgt.signature = agg.finish().toValidatorSig()
func loadSig(a: Attestation): CookedSig =
a.signature.load.get().CookedSig
a.signature.load.get()
proc pruneAtFinalization(dag: ChainDAGRef, attPool: AttestationPool) =
if dag.needStateCachesAndForkChoicePruning():
@ -560,80 +560,3 @@ suiteReport "Attestation pool processing" & preset():
pool[].addForkChoice(epochRef, blckRef, signedBlock.message, blckRef.slot)
doAssert: b10Add_clone.error == (ValidationResult.Ignore, Duplicate)
suiteReport "Attestation validation " & preset():
setup:
# Genesis state that results in 3 members per committee
var
chainDag = init(ChainDAGRef, defaultRuntimePreset, makeTestDB(SLOTS_PER_EPOCH * 3))
quarantine = QuarantineRef.init(keys.newRng())
pool = newClone(AttestationPool.init(chainDag, quarantine))
state = newClone(chainDag.headState)
cache = StateCache()
batchCrypto = BatchCrypto.new(keys.newRng())
# Slot 0 is a finalized slot - won't be making attestations for it..
check:
process_slots(state.data, getStateField(state, slot) + 1, cache)
timedTest "Validation sanity":
# TODO: refactor tests to avoid skipping BLS validation
chainDag.updateFlags.incl {skipBLSValidation}
var
cache: StateCache
for blck in makeTestBlocks(
chainDag.headState.data, chainDag.head.root, cache,
int(SLOTS_PER_EPOCH * 5), false):
let added = chainDag.addRawBlock(quarantine, blck) do (
blckRef: BlockRef, signedBlock: TrustedSignedBeaconBlock,
epochRef: EpochRef, state: HashedBeaconState):
# Callback add to fork choice if valid
pool[].addForkChoice(epochRef, blckRef, signedBlock.message, blckRef.slot)
check: added.isOk()
chainDag.updateHead(added[], quarantine)
pruneAtFinalization(chainDag, pool[])
var
# Create an attestation for slot 1!
beacon_committee = get_beacon_committee(
chainDag.headState.data.data, chainDag.head.slot, 0.CommitteeIndex, cache)
attestation = makeAttestation(
chainDag.headState.data.data, chainDag.head.root, beacon_committee[0], cache)
committees_per_slot =
get_committee_count_per_slot(chainDag.headState.data.data,
attestation.data.slot.epoch, cache)
subnet = compute_subnet_for_attestation(
committees_per_slot,
attestation.data.slot, attestation.data.index.CommitteeIndex)
beaconTime = attestation.data.slot.toBeaconTime()
check:
validateAttestation(pool, batchCrypto, attestation, beaconTime, subnet, true).waitFor().isOk
# Same validator again
validateAttestation(pool, batchCrypto, attestation, beaconTime, subnet, true).waitFor().error()[0] ==
ValidationResult.Ignore
pool[].nextAttestationEpoch.setLen(0) # reset for test
check:
# Wrong subnet
validateAttestation(pool, batchCrypto, attestation, beaconTime, subnet + 1, true).waitFor().isErr
pool[].nextAttestationEpoch.setLen(0) # reset for test
check:
# Too far in the future
validateAttestation(
pool, batchCrypto, attestation, beaconTime - 1.seconds, subnet + 1, true).waitFor().isErr
pool[].nextAttestationEpoch.setLen(0) # reset for test
check:
# Too far in the past
validateAttestation(
pool, batchCrypto, attestation,
beaconTime - (SECONDS_PER_SLOT * SLOTS_PER_EPOCH - 1).int.seconds,
subnet + 1, true).waitFor().isErr

View File

@ -0,0 +1,149 @@
# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.used.}
import
# Status lib
unittest2,
chronicles, chronos,
eth/keys,
# Internal
../beacon_chain/[beacon_node_types, extras, beacon_clock],
../beacon_chain/gossip_processing/[gossip_validation, batch_validation],
../beacon_chain/fork_choice/[fork_choice_types, fork_choice],
../beacon_chain/consensus_object_pools/[
block_quarantine, blockchain_dag, block_clearance, attestation_pool],
../beacon_chain/ssz/merkleization,
../beacon_chain/spec/[crypto, datatypes, digest, validator, state_transition,
helpers, presets, network],
# Test utilities
./testutil, ./testblockutil
proc pruneAtFinalization(dag: ChainDAGRef, attPool: AttestationPool) =
if dag.needStateCachesAndForkChoicePruning():
dag.pruneStateCachesDAG()
# pool[].prune() # We test logic without att_1_0 pool / fork choice pruning
suiteReport "Gossip validation " & preset():
setup:
# Genesis state that results in 3 members per committee
var
chainDag = init(ChainDAGRef, defaultRuntimePreset, makeTestDB(SLOTS_PER_EPOCH * 3))
quarantine = QuarantineRef.init(keys.newRng())
pool = newClone(AttestationPool.init(chainDag, quarantine))
state = newClone(chainDag.headState)
cache = StateCache()
batchCrypto = BatchCrypto.new(keys.newRng(), eager = proc(): bool = false)
# Slot 0 is a finalized slot - won't be making attestations for it..
check:
process_slots(state.data, getStateField(state, slot) + 1, cache)
timedTest "Validation sanity":
# TODO: refactor tests to avoid skipping BLS validation
chainDag.updateFlags.incl {skipBLSValidation}
var
cache: StateCache
for blck in makeTestBlocks(
chainDag.headState.data, chainDag.head.root, cache,
int(SLOTS_PER_EPOCH * 5), false):
let added = chainDag.addRawBlock(quarantine, blck) do (
blckRef: BlockRef, signedBlock: TrustedSignedBeaconBlock,
epochRef: EpochRef, state: HashedBeaconState):
# Callback add to fork choice if valid
pool[].addForkChoice(epochRef, blckRef, signedBlock.message, blckRef.slot)
check: added.isOk()
chainDag.updateHead(added[], quarantine)
pruneAtFinalization(chainDag, pool[])
var
# Create attestations for slot 1
beacon_committee = get_beacon_committee(
chainDag.headState.data.data, chainDag.head.slot, 0.CommitteeIndex, cache)
att_1_0 = makeAttestation(
chainDag.headState.data.data, chainDag.head.root, beacon_committee[0], cache)
att_1_1 = makeAttestation(
chainDag.headState.data.data, chainDag.head.root, beacon_committee[1], cache)
committees_per_slot =
get_committee_count_per_slot(chainDag.headState.data.data,
att_1_0.data.slot.epoch, cache)
subnet = compute_subnet_for_attestation(
committees_per_slot,
att_1_0.data.slot, att_1_0.data.index.CommitteeIndex)
beaconTime = att_1_0.data.slot.toBeaconTime()
check:
validateAttestation(pool, batchCrypto, att_1_0, beaconTime, subnet, true).waitFor().isOk
# Same validator again
validateAttestation(pool, batchCrypto, att_1_0, beaconTime, subnet, true).waitFor().error()[0] ==
ValidationResult.Ignore
pool[].nextAttestationEpoch.setLen(0) # reset for test
check:
# Wrong subnet
validateAttestation(pool, batchCrypto, att_1_0, beaconTime, subnet + 1, true).waitFor().isErr
pool[].nextAttestationEpoch.setLen(0) # reset for test
check:
# Too far in the future
validateAttestation(
pool, batchCrypto, att_1_0, beaconTime - 1.seconds, subnet + 1, true).waitFor().isErr
pool[].nextAttestationEpoch.setLen(0) # reset for test
check:
# Too far in the past
validateAttestation(
pool, batchCrypto, att_1_0,
beaconTime - (SECONDS_PER_SLOT * SLOTS_PER_EPOCH - 1).int.seconds,
subnet + 1, true).waitFor().isErr
block:
var broken = att_1_0
broken.signature.blob[0] += 1
pool[].nextAttestationEpoch.setLen(0) # reset for test
check:
# Invalid signature
validateAttestation(
pool, batchCrypto, broken, beaconTime, subnet, true).waitFor().
error()[0] == ValidationResult.Reject
block:
var broken = att_1_0
broken.signature.blob[5] += 1
pool[].nextAttestationEpoch.setLen(0) # reset for test
# One invalid, one valid (batched)
let
fut_1_0 = validateAttestation(
pool, batchCrypto, broken, beaconTime, subnet, true)
fut_1_1 = validateAttestation(
pool, batchCrypto, att_1_1, beaconTime, subnet, true)
check:
fut_1_0.waitFor().error()[0] == ValidationResult.Reject
fut_1_1.waitFor().isOk()
block:
var broken = att_1_0
# This shouldn't deserialize, which is a different way to break it
broken.signature.blob = default(type broken.signature.blob)
pool[].nextAttestationEpoch.setLen(0) # reset for test
# One invalid, one valid (batched)
let
fut_1_0 = validateAttestation(
pool, batchCrypto, broken, beaconTime, subnet, true)
fut_1_1 = validateAttestation(
pool, batchCrypto, att_1_1, beaconTime, subnet, true)
check:
fut_1_0.waitFor().error()[0] == ValidationResult.Reject
fut_1_1.waitFor().isOk()

View File

@ -136,7 +136,7 @@ suiteReport "Interop":
check:
# TODO re-enable
true or dep.sig == computed_sig
true or dep.sig == computed_sig.toValidatorSig()
timedTest "Interop genesis":
# Check against https://github.com/protolambda/zcli:

View File

@ -51,7 +51,7 @@ func makeDeposit*(i: int, flags: UpdateFlags = {}): DepositData =
if skipBLSValidation notin flags:
result.signature = get_deposit_signature(
defaultRuntimePreset, result, privkey)
defaultRuntimePreset, result, privkey).toValidatorSig()
proc makeInitialDeposits*(
n = SLOTS_PER_EPOCH, flags: UpdateFlags = {}): seq[DepositData] =
@ -68,7 +68,7 @@ func signBlock(
signature:
if skipBlsValidation notin flags:
get_block_signature(
fork, genesis_validators_root, blck.slot, root, privKey)
fork, genesis_validators_root, blck.slot, root, privKey).toValidatorSig()
else:
ValidatorSig()
)
@ -93,7 +93,8 @@ proc addTestBlock*(
randao_reveal =
if skipBlsValidation notin flags:
privKey.genRandaoReveal(
state.data.fork, state.data.genesis_validators_root, state.data.slot)
state.data.fork, state.data.genesis_validators_root, state.data.slot).
toValidatorSig()
else:
ValidatorSig()
@ -167,7 +168,7 @@ proc makeAttestation*(
sig =
if skipBLSValidation notin flags:
get_attestation_signature(state.fork, state.genesis_validators_root,
data, hackPrivKey(validator))
data, hackPrivKey(validator)).toValidatorSig()
else:
ValidatorSig()
@ -219,13 +220,12 @@ proc makeFullAttestations*(
# Initial attestation
var attestation = Attestation(
aggregation_bits: CommitteeValidatorsBits.init(committee.len),
data: data,
signature: get_attestation_signature(
state.fork, state.genesis_validators_root, data,
hackPrivKey(state.validators[committee[0]]))
)
data: data)
var agg {.noInit.}: AggregateSignature
agg.init(attestation.signature)
agg.init(get_attestation_signature(
state.fork, state.genesis_validators_root, data,
hackPrivKey(state.validators[committee[0]])))
# Aggregate the remainder
attestation.aggregation_bits.setBit 0
@ -237,7 +237,7 @@ proc makeFullAttestations*(
hackPrivKey(state.validators[committee[j]])
))
attestation.signature = agg.finish().exportRaw()
attestation.signature = agg.finish().toValidatorSig()
result.add attestation
iterator makeTestBlocks*(