Batch attestations (#2439)

* batch attestations

* Fixes (but now need to investigate the chronos 0 .. 4095 crash similar to https://github.com/status-im/nimbus-eth2/issues/1518

* Try to remove the processing loop to no avail :/

* batch aggregates

* use resultsBuffer size for triggering deadline schedule

* pass attestation pool tests

* Introduce async gossip validators. May fix the 4096 bug (reentrancy issue?) (similar to sync unknown blocks #1518)

* Put logging at debug level, add speed info

* remove unnecessary batch info when it is known to be one

* downgrade some logs to trace level

* better comments [skip ci]

* Address most review comments

* only use ref for async proc

* fix exceptions in eth2_network

* update async exceptions in gossip_validation

* eth2_network 2nd pass

* change to sleepAsync

* Update beacon_chain/gossip_processing/batch_validation.nim

Co-authored-by: Jacek Sieka <jacek@status.im>

Co-authored-by: Jacek Sieka <jacek@status.im>
This commit is contained in:
Mamy Ratsimbazafy 2021-04-02 16:36:43 +02:00 committed by GitHub
parent f821bc878e
commit 6b13cdce36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 639 additions and 81 deletions

View File

@ -0,0 +1,282 @@
# beacon_chain
# Copyright (c) 2019-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
# Status
chronicles, chronos,
stew/results,
eth/keys,
# Internals
../spec/[
datatypes, crypto, digest, helpers, signatures_batch],
../consensus_object_pools/[
blockchain_dag, block_quarantine,
attestation_pool, exit_pool,
block_pools_types, spec_cache
],
".."/[beacon_node_types, ssz, beacon_clock]
export BrHmacDrbgContext
logScope:
topics = "gossip_checks"
# Batched gossip validation
# ----------------------------------------------------------------
{.push raises: [Defect].}
type
BatchCrypto* = object
# The buffers are bounded by BatchedCryptoSize (16) which was chosen:
# - based on "nimble bench" in nim-blscurve
# so that low power devices like Raspberry Pi 4 can process
# that many batched verifications within 20ms
# - based on the accumulation rate of attestations and aggregates
# in large instances which were 12000 per slot (12s)
# hence 1 per ms (but the pattern is bursty around the 4s mark)
pendingBuffer: seq[SignatureSet]
resultsBuffer: seq[Future[Result[void, cstring]]]
sigVerifCache: BatchedBLSVerifierCache ##\
## A cache for batch BLS signature verification contexts
rng: ref BrHmacDrbgContext ##\
## A reference to the Nimbus application-wide RNG
const
# We cap waiting for an idle slot in case there's a lot of network traffic
# taking up all CPU - we don't want to _completely_ stop processing blocks
# in this case (attestations will get dropped) - doing so also allows us
# to benefit from more batching / larger network reads when under load.
BatchAttAccumTime = 10.milliseconds
# Threshold for immediate trigger of batch verification.
# A balance between throughput and worst case latency.
# At least 6 so that the constant factors
# (RNG for blinding and Final Exponentiation)
# are amortized,
# but not too big as we need to redo checks one-by-one if one failed.
BatchedCryptoSize = 16
proc new*(T: type BatchCrypto, rng: ref BrHmacDrbgContext): ref BatchCrypto =
(ref BatchCrypto)(rng: rng)
func clear(batchCrypto: var BatchCrypto) =
## Empty the crypto-pending attestations & aggregate queues
batchCrypto.pendingBuffer.setLen(0)
batchCrypto.resultsBuffer.setLen(0)
proc done(batchCrypto: var BatchCrypto, idx: int) =
## Send signal to [Attestation/Aggregate]Validator
## that the attestation was crypto-verified (and so gossip validated)
## with success
batchCrypto.resultsBuffer[idx].complete(Result[void, cstring].ok())
proc fail(batchCrypto: var BatchCrypto, idx: int, error: cstring) =
## Send signal to [Attestation/Aggregate]Validator
## that the attestation was NOT crypto-verified (and so NOT gossip validated)
batchCrypto.resultsBuffer[idx].complete(Result[void, cstring].err(error))
proc complete(batchCrypto: var BatchCrypto, idx: int, res: Result[void, cstring]) =
## Send signal to [Attestation/Aggregate]Validator
batchCrypto.resultsBuffer[idx].complete(res)
proc processBufferedCrypto(self: var BatchCrypto) =
## Drain all attestations waiting for crypto verifications
doAssert self.pendingBuffer.len ==
self.resultsBuffer.len
if self.pendingBuffer.len == 0:
return
trace "batch crypto - starting",
batchSize = self.pendingBuffer.len
let startTime = Moment.now()
var secureRandomBytes: array[32, byte]
self.rng[].brHmacDrbgGenerate(secureRandomBytes)
# TODO: For now only enable serial batch verification
let ok = batchVerifySerial(
self.sigVerifCache,
self.pendingBuffer,
secureRandomBytes)
let stopTime = Moment.now()
debug "batch crypto - finished",
batchSize = self.pendingBuffer.len,
cryptoVerified = ok,
dur = stopTime - startTime
if ok:
for i in 0 ..< self.resultsBuffer.len:
self.done(i)
else:
debug "batch crypto - failure, falling back",
batchSize = self.pendingBuffer.len
for i in 0 ..< self.pendingBuffer.len:
let ok = blsVerify self.pendingBuffer[i]
if ok:
self.done(i)
else:
self.fail(i, "batch crypto verification: invalid signature")
self.clear()
proc deferCryptoProcessing(self: ref BatchCrypto, idleTimeout: Duration) {.async.} =
## Process pending crypto check:
## - if time threshold is reached
## - or if networking is idle
# TODO: how to cancel the scheduled `deferCryptoProcessing(BatchAttAccumTime)` ?
# when the buffer size threshold is reached?
# In practice this only happens when we receive a burst of attestations/aggregates.
# Though it's possible to reach the threshold 9ms in,
# and have only 1ms left for further accumulation.
await sleepAsync(idleTimeout)
self[].processBufferedCrypto()
proc schedule(batchCrypto: ref BatchCrypto, fut: Future[Result[void, cstring]], checkThreshold = true) =
## Schedule a cryptocheck for processing
##
## The buffer is processed:
## - when 16 or more attestations/aggregates are buffered (BatchedCryptoSize)
## - when there are no network events (idleAsync)
## - otherwise after 10ms (BatchAttAccumTime)
# Note: use the resultsBuffer size to detect the first item
# as pendingBuffer is appended to 3 by 3 in case of aggregates
batchCrypto.resultsBuffer.add fut
if batchCrypto.resultsBuffer.len == 1:
# First attestation to be scheduled in the batch
# wait for an idle time or up to 10ms before processing
trace "batch crypto - scheduling next",
deadline = BatchAttAccumTime
asyncSpawn batchCrypto.deferCryptoProcessing(BatchAttAccumTime)
elif checkThreshold and
batchCrypto.resultsBuffer.len >= BatchedCryptoSize:
# Reached the max buffer size, process immediately
# TODO: how to cancel the scheduled `deferCryptoProcessing(BatchAttAccumTime)` ?
batchCrypto[].processBufferedCrypto()
proc scheduleAttestationCheck*(
batchCrypto: ref BatchCrypto,
fork: Fork, genesis_validators_root: Eth2Digest,
epochRef: EpochRef,
attestation: Attestation
): Option[Future[Result[void, cstring]]] =
## Schedule crypto verification of an attestation
##
## The buffer is processed:
## - when 16 or more attestations/aggregates are buffered (BatchedCryptoSize)
## - when there are no network events (idleAsync)
## - otherwise after 10ms (BatchAttAccumTime)
##
## This returns None if crypto sanity checks failed
## and a future with the deferred attestation check otherwise.
doAssert batchCrypto.pendingBuffer.len < BatchedCryptoSize
let sanity = batchCrypto
.pendingBuffer
.addAttestation(
fork, genesis_validators_root, epochRef,
attestation
)
if not sanity:
return none(Future[Result[void, cstring]])
let fut = newFuture[Result[void, cstring]](
"batch_validation.scheduleAttestationCheck"
)
batchCrypto.schedule(fut)
return some(fut)
proc scheduleAggregateChecks*(
batchCrypto: ref BatchCrypto,
fork: Fork, genesis_validators_root: Eth2Digest,
epochRef: EpochRef,
signedAggregateAndProof: SignedAggregateAndProof
): Option[tuple[slotCheck, aggregatorCheck, aggregateCheck: Future[Result[void, cstring]]]] =
## Schedule crypto verification of an aggregate
##
## This involves 3 checks:
## - verify_slot_signature
## - verify_aggregate_and_proof_signature
## - is_valid_indexed_attestation
##
## The buffer is processed:
## - when 16 or more attestations/aggregates are buffered (BatchedCryptoSize)
## - when there are no network events (idleAsync)
## - otherwise after 10ms (BatchAttAccumTime)
##
## This returns None if crypto sanity checks failed
## and 2 futures with the deferred aggregate checks otherwise.
doAssert batchCrypto.pendingBuffer.len < BatchedCryptoSize
template aggregate_and_proof: untyped = signedAggregateAndProof.message
template aggregate: untyped = aggregate_and_proof.aggregate
type R = tuple[slotCheck, aggregatorCheck, aggregateCheck: Future[Result[void, cstring]]]
# Enqueue in the buffer
# ------------------------------------------------------
let aggregator = epochRef.validator_keys[aggregate_and_proof.aggregator_index]
block:
let sanity = batchCrypto
.pendingBuffer
.addSlotSignature(
fork, genesis_validators_root,
aggregate.data.slot,
aggregator,
aggregate_and_proof.selection_proof
)
if not sanity:
return none(R)
block:
let sanity = batchCrypto
.pendingBuffer
.addAggregateAndProofSignature(
fork, genesis_validators_root,
aggregate_and_proof,
aggregator,
signed_aggregate_and_proof.signature
)
if not sanity:
return none(R)
block:
let sanity = batchCrypto
.pendingBuffer
.addAttestation(
fork, genesis_validators_root, epochRef,
aggregate
)
if not sanity:
return none(R)
let futSlot = newFuture[Result[void, cstring]](
"batch_validation.scheduleAggregateChecks.slotCheck"
)
let futAggregator = newFuture[Result[void, cstring]](
"batch_validation.scheduleAggregateChecks.aggregatorCheck"
)
let futAggregate = newFuture[Result[void, cstring]](
"batch_validation.scheduleAggregateChecks.aggregateCheck"
)
batchCrypto.schedule(futSlot, checkThreshold = false)
batchCrypto.schedule(futAggregator, checkThreshold = false)
batchCrypto.schedule(futAggregate)
return some((futSlot, futAggregator, futAggregate))

View File

@ -14,6 +14,7 @@ import
../spec/[crypto, datatypes, digest],
../consensus_object_pools/[block_clearance, blockchain_dag, exit_pool, attestation_pool],
./gossip_validation, ./gossip_to_consensus,
./batch_validation,
../validators/validator_pool,
../beacon_node_types,
../beacon_clock, ../conf, ../ssz/sszdump
@ -70,6 +71,10 @@ type
# ----------------------------------------------------------------
exitPool: ref ExitPool
# Almost validated, pending cryptographic signature check
# ----------------------------------------------------------------
batchCrypto: ref BatchCrypto
# Missing information
# ----------------------------------------------------------------
quarantine*: QuarantineRef
@ -85,6 +90,7 @@ proc new*(T: type Eth2Processor,
exitPool: ref ExitPool,
validatorPool: ref ValidatorPool,
quarantine: QuarantineRef,
rng: ref BrHmacDrbgContext,
getWallTime: GetWallTimeFn): ref Eth2Processor =
(ref Eth2Processor)(
config: config,
@ -94,7 +100,8 @@ proc new*(T: type Eth2Processor,
attestationPool: attestationPool,
exitPool: exitPool,
validatorPool: validatorPool,
quarantine: quarantine
quarantine: quarantine,
batchCrypto: BatchCrypto.new(rng = rng)
)
# Gossip Management
@ -179,11 +186,13 @@ proc checkForPotentialDoppelganger(
warn "We believe you are currently running another instance of the same validator. We've disconnected you from the network as this presents a significant slashing risk. Possible next steps are (a) making sure you've disconnected your validator from your old machine before restarting the client; and (b) running the client again with the gossip-slashing-protection option disabled, only if you are absolutely sure this is the only instance of your validator running, and reporting the issue at https://github.com/status-im/nimbus-eth2/issues."
quit QuitFailure
{.pop.} # async can raise anything
proc attestationValidator*(
self: var Eth2Processor,
self: ref Eth2Processor,
attestation: Attestation,
committeeIndex: uint64,
checksExpensive: bool = true): ValidationResult =
checksExpensive: bool = true): Future[ValidationResult] {.async.} =
logScope:
attestation = shortLog(attestation)
committeeIndex
@ -201,7 +210,10 @@ proc attestationValidator*(
# Potential under/overflows are fine; would just create odd metrics and logs
let delay = wallTime - attestation.data.slot.toBeaconTime
debug "Attestation received", delay
let v = self.attestationPool[].validateAttestation(
# Now proceed to validation
let v = await self.attestationPool.validateAttestation(
self.batchCrypto,
attestation, wallTime, committeeIndex, checksExpensive)
if v.isErr():
debug "Dropping attestation", err = v.error()
@ -210,16 +222,16 @@ proc attestationValidator*(
beacon_attestations_received.inc()
beacon_attestation_delay.observe(delay.toFloatSeconds())
self.checkForPotentialDoppelganger(attestation.data, v.value, wallSlot)
self[].checkForPotentialDoppelganger(attestation.data, v.value, wallSlot)
trace "Attestation validated"
self.verifQueues[].addAttestation(attestation, v.get())
ValidationResult.Accept
return ValidationResult.Accept
proc aggregateValidator*(
self: var Eth2Processor,
signedAggregateAndProof: SignedAggregateAndProof): ValidationResult =
self: ref Eth2Processor,
signedAggregateAndProof: SignedAggregateAndProof): Future[ValidationResult] {.async.} =
logScope:
aggregate = shortLog(signedAggregateAndProof.message.aggregate)
signature = shortLog(signedAggregateAndProof.signature)
@ -239,7 +251,8 @@ proc aggregateValidator*(
wallTime - signedAggregateAndProof.message.aggregate.data.slot.toBeaconTime
debug "Aggregate received", delay
let v = self.attestationPool[].validateAggregate(
let v = await self.attestationPool.validateAggregate(
self.batchCrypto,
signedAggregateAndProof, wallTime)
if v.isErr:
debug "Dropping aggregate",
@ -252,7 +265,7 @@ proc aggregateValidator*(
beacon_aggregates_received.inc()
beacon_aggregate_delay.observe(delay.toFloatSeconds())
self.checkForPotentialDoppelganger(
self[].checkForPotentialDoppelganger(
signedAggregateAndProof.message.aggregate.data, v.value, wallSlot)
trace "Aggregate validated",
@ -262,7 +275,9 @@ proc aggregateValidator*(
self.verifQueues[].addAggregate(signedAggregateAndProof, v.get())
ValidationResult.Accept
return ValidationResult.Accept
{.push raises: [Defect].}
proc attesterSlashingValidator*(
self: var Eth2Processor, attesterSlashing: AttesterSlashing):

View File

@ -5,8 +5,6 @@
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
std/math,
stew/results,
@ -37,7 +35,7 @@ type
v: Attestation
attesting_indices: seq[ValidatorIndex]
AggregateEntry* = AttestationEntry
AggregateEntry = AttestationEntry
VerifQueueManager* = object
## This manages the queues of blocks and attestations.
@ -70,6 +68,9 @@ type
# Producers
# ----------------------------------------------------------------
blocksQueue*: AsyncQueue[BlockEntry] # Exported for "test_sync_manager"
# TODO:
# is there a point to separate
# attestations & aggregates here?
attestationsQueue: AsyncQueue[AttestationEntry]
aggregatesQueue: AsyncQueue[AggregateEntry]
@ -78,6 +79,8 @@ type
consensusManager: ref ConsensusManager
## Blockchain DAG, AttestationPool and Quarantine
{.push raises: [Defect].}
# Initialization
# ------------------------------------------------------------------------------
@ -331,6 +334,10 @@ proc runQueueProcessingLoop*(self: ref VerifQueueManager) {.async.} =
aggregateFut = self[].aggregatesQueue.popFirst()
attestationFut = self[].attestationsQueue.popFirst()
# TODO:
# revisit `idleTimeout`
# and especially `attestationBatch` in light of batch validation
# in particular we might want `attestationBatch` to drain both attestation & aggregates
while true:
# Cooperative concurrency: one idle calculation step per loop - because
# we run both networking and CPU-heavy things like block processing

View File

@ -8,9 +8,12 @@
{.push raises: [Defect].}
import
# Standard library
std/[sequtils, intsets, deques],
chronicles,
# Status
chronicles, chronos,
stew/results,
# Internals
../spec/[
beaconstate, state_transition_block,
datatypes, crypto, digest, helpers, network, signatures],
@ -20,11 +23,15 @@ import
],
".."/[beacon_node_types, ssz, beacon_clock],
../validators/attestation_aggregation,
../extras
../extras,
./batch_validation
logScope:
topics = "gossip_checks"
# Internal checks
# ----------------------------------------------------------------
func check_attestation_block(
pool: AttestationPool, attestationSlot: Slot, blck: BlockRef):
Result[void, (ValidationResult, cstring)] =
@ -148,12 +155,18 @@ func check_attestation_subnet(
ok()
# Gossip Validation
# ----------------------------------------------------------------
{.pop.} # async can raises anything
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#beacon_attestation_subnet_id
proc validateAttestation*(
pool: var AttestationPool,
attestation: Attestation, wallTime: BeaconTime,
pool: ref AttestationPool,
batchCrypto: ref BatchCrypto,
attestation: Attestation,
wallTime: BeaconTime,
topicCommitteeIndex: uint64, checksExpensive: bool):
Result[seq[ValidatorIndex], (ValidationResult, cstring)] =
Future[Result[seq[ValidatorIndex], (ValidationResult, cstring)]] {.async.} =
# Some of the checks below have been reordered compared to the spec, to
# perform the cheap checks first - in particular, we want to avoid loading
# an `EpochRef` and checking signatures. This reordering might lead to
@ -172,12 +185,18 @@ proc validateAttestation*(
# attestation.data.slot + ATTESTATION_PROPAGATION_SLOT_RANGE >= current_slot
# >= attestation.data.slot (a client MAY queue future attestations for
# processing at the appropriate slot).
? check_propagation_slot_range(attestation.data, wallTime) # [IGNORE]
block:
let v = check_propagation_slot_range(attestation.data, wallTime) # [IGNORE]
if v.isErr():
return err(v.error)
# The attestation is unaggregated -- that is, it has exactly one
# participating validator (len([bit for bit in attestation.aggregation_bits
# if bit == 0b1]) == 1).
? check_aggregation_count(attestation, singular = true) # [REJECT]
block:
let v = check_aggregation_count(attestation, singular = true) # [REJECT]
if v.isErr():
return err(v.error)
# The block being voted for (attestation.data.beacon_block_root) has been seen
# (via both gossip and non-gossip sources) (a client MAY queue attestations for
@ -185,7 +204,11 @@ proc validateAttestation*(
# The block being voted for (attestation.data.beacon_block_root) passes
# validation.
# [IGNORE] if block is unseen so far and enqueue it in missing blocks
let target = ? check_beacon_and_target_block(pool, attestation.data) # [IGNORE/REJECT]
let target = block:
let v = check_beacon_and_target_block(pool[], attestation.data) # [IGNORE/REJECT]
if v.isErr():
return err(v.error)
v.get()
# The following rule follows implicitly from that we clear out any
# unviable blocks from the chain dag:
@ -210,7 +233,10 @@ proc validateAttestation*(
# committees_per_slot = get_committee_count_per_slot(state,
# attestation.data.target.epoch), which may be pre-computed along with the
# committee information for the signature check.
? check_attestation_subnet(epochRef, attestation, topicCommitteeIndex)
block:
let v = check_attestation_subnet(epochRef, attestation, topicCommitteeIndex) # [REJECT]
if v.isErr():
return err(v.error)
# [REJECT] The number of aggregation bits matches the committee size -- i.e.
# len(attestation.aggregation_bits) == len(get_beacon_committee(state,
@ -252,12 +278,29 @@ proc validateAttestation*(
# The signature of attestation is valid.
block:
# First pass - without cryptography
let v = is_valid_indexed_attestation(
fork, genesis_validators_root, epochRef, attesting_indices,
attestation, {})
attestation,
{skipBLSValidation})
if v.isErr():
return err((ValidationResult.Reject, v.error))
# Buffer crypto checks
let deferredCrypto = batchCrypto
.scheduleAttestationCheck(
fork, genesis_validators_root, epochRef,
attestation
)
if deferredCrypto.isNone():
return err((ValidationResult.Reject,
cstring("validateAttestation: crypto sanity checks failure")))
# Await the crypto check
let cryptoChecked = await deferredCrypto.get()
if cryptoChecked.isErr():
return err((ValidationResult.Reject, cryptoChecked.error))
# Only valid attestations go in the list, which keeps validator_index
# in range
if not (pool.nextAttestationEpoch.lenu64 > validator_index.uint64):
@ -265,13 +308,15 @@ proc validateAttestation*(
pool.nextAttestationEpoch[validator_index].subnet =
attestation.data.target.epoch + 1
ok(attesting_indices)
return ok(attesting_indices)
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#beacon_aggregate_and_proof
proc validateAggregate*(
pool: var AttestationPool,
signedAggregateAndProof: SignedAggregateAndProof, wallTime: BeaconTime):
Result[seq[ValidatorIndex], (ValidationResult, cstring)] =
pool: ref AttestationPool,
batchCrypto: ref BatchCrypto,
signedAggregateAndProof: SignedAggregateAndProof,
wallTime: BeaconTime):
Future[Result[seq[ValidatorIndex], (ValidationResult, cstring)]] {.async.} =
# Some of the checks below have been reordered compared to the spec, to
# perform the cheap checks first - in particular, we want to avoid loading
# an `EpochRef` and checking signatures. This reordering might lead to
@ -291,7 +336,10 @@ proc validateAggregate*(
# ATTESTATION_PROPAGATION_SLOT_RANGE slots (with a
# MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance) -- i.e. aggregate.data.slot +
# ATTESTATION_PROPAGATION_SLOT_RANGE >= current_slot >= aggregate.data.slot
? check_propagation_slot_range(aggregate.data, wallTime) # [IGNORE]
block:
let v = check_propagation_slot_range(aggregate.data, wallTime) # [IGNORE]
if v.isErr():
return err(v.error)
# [IGNORE] The valid aggregate attestation defined by
# hash_tree_root(aggregate) has not already been seen (via aggregate gossip,
@ -327,12 +375,19 @@ proc validateAggregate*(
# members, i.e. they counts don't match.
# But (2) would reflect an invalid aggregation in other ways, so reject it
# either way.
? check_aggregation_count(aggregate, singular = false)
block:
let v = check_aggregation_count(aggregate, singular = false) # [REJECT]
if v.isErr():
return err(v.error)
# [REJECT] The block being voted for (aggregate.data.beacon_block_root)
# passes validation.
# [IGNORE] if block is unseen so far and enqueue it in missing blocks
let target = ? check_beacon_and_target_block(pool, aggregate.data)
let target = block:
let v = check_beacon_and_target_block(pool[], aggregate.data) # [IGNORE/REJECT]
if v.isErr():
return err(v.error)
v.get()
# [REJECT] aggregate_and_proof.selection_proof selects the validator as an
# aggregator for the slot -- i.e. is_aggregator(state, aggregate.data.slot,
@ -354,42 +409,47 @@ proc validateAggregate*(
return err((ValidationResult.Reject, cstring(
"Aggregator's validator index not in committee")))
# [REJECT] The aggregate_and_proof.selection_proof is a valid signature of the
# aggregate.data.slot by the validator with index
# aggregate_and_proof.aggregator_index.
# get_slot_signature(state, aggregate.data.slot, privkey)
if aggregate_and_proof.aggregator_index >= epochRef.validator_keys.lenu64:
return err((ValidationResult.Reject, cstring("Invalid aggregator_index")))
let
fork = pool.chainDag.headState.data.data.fork
genesis_validators_root =
pool.chainDag.headState.data.data.genesis_validators_root
if not verify_slot_signature(
fork, genesis_validators_root, aggregate.data.slot,
epochRef.validator_keys[aggregate_and_proof.aggregator_index],
aggregate_and_proof.selection_proof):
return err((ValidationResult.Reject, cstring(
"Selection_proof signature verification failed")))
# [REJECT] The aggregator signature, signed_aggregate_and_proof.signature, is valid.
if not verify_aggregate_and_proof_signature(
fork, genesis_validators_root, aggregate_and_proof,
epochRef.validator_keys[aggregate_and_proof.aggregator_index],
signed_aggregate_and_proof.signature):
return err((ValidationResult.Reject, cstring(
"signed_aggregate_and_proof signature verification failed")))
let attesting_indices = get_attesting_indices(
epochRef, aggregate.data, aggregate.aggregation_bits)
# [REJECT] The signature of aggregate is valid.
block:
let v = is_valid_indexed_attestation(
fork, genesis_validators_root, epochRef, attesting_indices,
aggregate, {})
if v.isErr():
return err((ValidationResult.Reject, v.error))
# 1. [REJECT] The aggregate_and_proof.selection_proof is a valid signature of the
# aggregate.data.slot by the validator with index
# aggregate_and_proof.aggregator_index.
# get_slot_signature(state, aggregate.data.slot, privkey)
# 2. [REJECT] The aggregator signature, signed_aggregate_and_proof.signature, is valid.
# 3. [REJECT] The signature of aggregate is valid.
if aggregate_and_proof.aggregator_index >= epochRef.validator_keys.lenu64:
return err((ValidationResult.Reject, cstring("Invalid aggregator_index")))
let
fork = pool.chainDag.headState.data.data.fork
genesis_validators_root =
pool.chainDag.headState.data.data.genesis_validators_root
let deferredCrypto = batchCrypto
.scheduleAggregateChecks(
fork, genesis_validators_root, epochRef,
signed_aggregate_and_proof
)
if deferredCrypto.isNone():
return err((ValidationResult.Reject,
cstring("validateAttestation: crypto sanity checks failure")))
# [REJECT] aggregate_and_proof.selection_proof
let slotChecked = await deferredCrypto.get().slotCheck
if slotChecked.isErr():
return err((ValidationResult.Reject, cstring(
"Selection_proof signature verification failed")))
# [REJECT] The aggregator signature, signed_aggregate_and_proof.signature, is valid.
let aggregatorChecked = await deferredCrypto.get().aggregatorCheck
if aggregatorChecked.isErr():
return err((ValidationResult.Reject, cstring(
"signed_aggregate_and_proof aggregator signature verification failed")))
# [REJECT] The aggregator signature, signed_aggregate_and_proof.signature, is valid.
let aggregateChecked = await deferredCrypto.get().aggregateCheck
if aggregateChecked.isErr():
return err((ValidationResult.Reject, cstring(
"signed_aggregate_and_proof aggregate attester signatures verification failed")))
# The following rule follows implicitly from that we clear out any
# unviable blocks from the chain dag:
@ -407,7 +467,12 @@ proc validateAggregate*(
pool.nextAttestationEpoch[aggregate_and_proof.aggregator_index].aggregate =
aggregate.data.target.epoch + 1
ok(attesting_indices)
let attesting_indices = get_attesting_indices(
epochRef, aggregate.data, aggregate.aggregation_bits)
return ok(attesting_indices)
{.push raises: [Defect].}
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#beacon_block
proc isValidBeaconBlock*(

View File

@ -1704,6 +1704,40 @@ proc addValidator*[MsgType](node: Eth2Node,
node.pubsub.addValidator(topic & "_snappy", execValidator)
except Exception as exc: raiseAssert exc.msg # TODO fix libp2p
proc addAsyncValidator*[MsgType](node: Eth2Node,
topic: string,
msgValidator: proc(msg: MsgType):
Future[ValidationResult] {.gcsafe, raises: [Defect].} ) =
proc execValidator(
topic: string, message: GossipMsg): Future[ValidationResult] {.raises: [Defect].} =
inc nbc_gossip_messages_received
trace "Validating incoming gossip message",
len = message.data.len, topic
let decompressed = snappy.decode(message.data, GOSSIP_MAX_SIZE)
if decompressed.len == 0:
debug "Empty gossip data after decompression",
topic, len = message.data.len
result.complete(ValidationResult.Ignore)
return
let decoded = try:
SSZ.decode(decompressed, MsgType)
except:
error "SSZ decoding failure",
topic, len = message.data.len
result.complete(ValidationResult.Ignore)
return
return msgValidator(decoded)
try:
node.pubsub.addValidator(topic & "_snappy", execValidator)
except Exception as exc: raiseAssert exc.msg # TODO fix libp2p
proc unsubscribe*(node: Eth2Node, topic: string) {.raises: [Defect, CatchableError].} =
try:
node.pubsub.unsubscribeAll(topic & "_snappy")

View File

@ -339,6 +339,7 @@ proc init*(T: type BeaconNode,
verifQueues,
chainDag, attestationPool, exitPool, validatorPool,
quarantine,
rng,
proc(): BeaconTime = beaconClock.now())
var res = BeaconNode(
@ -1167,16 +1168,16 @@ proc installMessageValidators(node: BeaconNode) =
for it in 0'u64 ..< ATTESTATION_SUBNET_COUNT.uint64:
closureScope:
let ci = it
node.network.addValidator(
node.network.addAsyncValidator(
getAttestationTopic(node.forkDigest, ci),
# This proc needs to be within closureScope; don't lift out of loop.
proc(attestation: Attestation): ValidationResult =
node.processor[].attestationValidator(attestation, ci))
proc(attestation: Attestation): Future[ValidationResult] =
node.processor.attestationValidator(attestation, ci))
node.network.addValidator(
node.network.addAsyncValidator(
getAggregateAndProofsTopic(node.forkDigest),
proc(signedAggregateAndProof: SignedAggregateAndProof): ValidationResult =
node.processor[].aggregateValidator(signedAggregateAndProof))
proc(signedAggregateAndProof: SignedAggregateAndProof): Future[ValidationResult] =
node.processor.aggregateValidator(signedAggregateAndProof))
node.network.addValidator(
node.topicBeaconBlocks,

View File

@ -166,6 +166,16 @@ proc blsVerify*(
parsedKey.isSome() and
parsedKey.get.verify(message, parsedSig.get())
proc blsVerify*(sigSet: SignatureSet): bool =
## Unbatched verification
## of 1 SignatureSet
## tuple[pubkey: blscurve.PublicKey, message: array[32, byte], blscurve.signature: Signature]
verify(
sigSet.pubkey,
sigSet.message,
sigSet.signature
)
func blsSign*(privkey: ValidatorPrivKey, message: openArray[byte]): ValidatorSig =
## Computes a signature from a secret key and a message
let sig = SecretKey(privkey).sign(message)

View File

@ -89,11 +89,35 @@ proc aggregateAttesters(
aggPK.finish(attestersAgg)
return true
proc aggregateAttesters(
aggPK: var blscurve.PublicKey,
attestation: IndexedAttestation,
epochRef: auto
): bool =
mixin validator_keys
doAssert attestation.attesting_indices.len > 0
var attestersAgg{.noInit.}: AggregatePublicKey
attestersAgg.init(epochRef.validator_keys[attestation.attesting_indices[0]]
.pubkey.loadWithCacheOrExitFalse())
for i in 1 ..< attestation.attesting_indices.len:
attestersAgg.aggregate(epochRef.validator_keys[attestation.attesting_indices[i]]
.pubkey.loadWithCacheOrExitFalse())
aggPK.finish(attestersAgg)
return true
proc addIndexedAttestation(
sigs: var seq[SignatureSet],
attestation: IndexedAttestation,
state: BeaconState
): bool =
## Add an indexed attestation for batched BLS verification
## purposes
## This only verifies cryptography, checking that
## the indices are sorted and unique is not checked for example.
##
## Returns true if the indexed attestations was added to the batching buffer
## Returns false if saniy checks failed (non-empty, keys are valid)
if attestation.attesting_indices.len == 0:
# Aggregation spec requires non-empty collection
# - https://tools.ietf.org/html/draft-irtf-cfrg-bls-signature-04
@ -156,6 +180,125 @@ proc addAttestation(
return false
return true
# Public API
# ------------------------------------------------------
proc addAttestation*(
sigs: var seq[SignatureSet],
fork: Fork, genesis_validators_root: Eth2Digest,
epochRef: auto,
attestation: Attestation
): bool =
## Add an attestation for batched BLS verification
## purposes
## This only verifies cryptography
##
## Returns true if the attestation was added to the batching buffer
## Returns false if saniy checks failed (non-empty, keys are valid)
## In that case the seq[SignatureSet] is unmodified
mixin get_attesting_indices, validator_keys, pubkey
result = false
var attestersAgg{.noInit.}: AggregatePublicKey
for valIndex in epochRef.get_attesting_indices(
attestation.data,
attestation.aggregation_bits):
if not result: # first iteration
attestersAgg.init(epochRef.validator_keys[valIndex]
.loadWithCacheOrExitFalse())
result = true
else:
attestersAgg.aggregate(epochRef.validator_keys[valIndex]
.loadWithCacheOrExitFalse())
if not result:
# There was no attesters
return false
var attesters{.noinit.}: blscurve.PublicKey
attesters.finish(attestersAgg)
return sigs.addSignatureSet(
attesters,
attestation.data,
attestation.signature,
genesis_validators_root,
fork,
attestation.data.target.epoch,
DOMAIN_BEACON_ATTESTER)
proc addIndexedAttestation*(
sigs: var seq[SignatureSet],
fork: Fork, genesis_validators_root: Eth2Digest,
epochRef: auto,
attestation: IndexedAttestation,
): bool =
## Add an indexed attestation for batched BLS verification
## purposes
## This only verifies cryptography, checking that
## the indices are sorted and unique is not checked for example.
##
## Returns true if the indexed attestations was added to the batching buffer
## Returns false if saniy checks failed (non-empty, keys are valid)
## In that case the seq[SignatureSet] is unmodified
if attestation.attesting_indices.len == 0:
# Aggregation spec requires non-empty collection
# - https://tools.ietf.org/html/draft-irtf-cfrg-bls-signature-04
# Eth2 spec requires at least one attesting indice in slashing
# - https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/beacon-chain.md#is_valid_indexed_attestation
return false
var aggPK {.noInit.}: blscurve.PublicKey
if not aggPK.aggregateAttesters(attestation, epochRef):
return false
return sigs.addSignatureSet(
aggPK,
attestation.data,
attestation.signature,
genesis_validators_root,
fork,
attestation.data.target.epoch,
DOMAIN_BEACON_ATTESTER)
proc addSlotSignature*(
sigs: var seq[SignatureSet],
fork: Fork, genesis_validators_root: Eth2Digest,
slot: Slot,
pubkey: ValidatorPubKey,
signature: ValidatorSig): bool =
let epoch = compute_epoch_at_slot(slot)
return sigs.addSignatureSet(
pubkey.loadWithCacheOrExitFalse(),
sszObj = slot,
signature,
genesis_validators_root,
fork,
epoch,
DOMAIN_SELECTION_PROOF
)
proc addAggregateAndProofSignature*(
sigs: var seq[SignatureSet],
fork: Fork, genesis_validators_root: Eth2Digest,
aggregate_and_proof: AggregateAndProof,
pubkey: ValidatorPubKey,
signature: ValidatorSig
): bool =
let epoch = compute_epoch_at_slot(aggregate_and_proof.aggregate.data.slot)
return sigs.addSignatureSet(
pubkey.loadWithCacheOrExitFalse(),
sszObj = aggregate_and_proof,
signature,
genesis_validators_root,
fork,
epoch,
DOMAIN_AGGREGATE_AND_PROOF
)
proc collectSignatureSets*(
sigs: var seq[SignatureSet],
signed_block: SignedBeaconBlock,

View File

@ -167,7 +167,7 @@ proc sendAttestation*(
getAttestationTopic(node.forkDigest, subnet_index), attestation)
# Ensure node's own broadcast attestations end up in its attestation pool
discard node.processor[].attestationValidator(
discard node.processor.attestationValidator(
attestation, subnet_index, false)
beacon_attestations_sent.inc()

View File

@ -18,7 +18,7 @@ import
../beacon_chain/spec/[crypto, datatypes, digest, validator, state_transition,
helpers, beaconstate, presets, network],
../beacon_chain/[beacon_node_types, extras, beacon_clock],
../beacon_chain/gossip_processing/gossip_validation,
../beacon_chain/gossip_processing/[gossip_validation, batch_validation],
../beacon_chain/fork_choice/[fork_choice_types, fork_choice],
../beacon_chain/consensus_object_pools/[block_quarantine, blockchain_dag, block_clearance, attestation_pool],
# Test utilities
@ -398,6 +398,7 @@ suiteReport "Attestation validation " & preset():
pool = newClone(AttestationPool.init(chainDag, quarantine))
state = newClone(chainDag.headState)
cache = StateCache()
batchCrypto = BatchCrypto.new(keys.newRng())
# Slot 0 is a finalized slot - won't be making attestations for it..
check:
process_slots(state.data, state.data.data.slot + 1, cache)
@ -439,27 +440,27 @@ suiteReport "Attestation validation " & preset():
beaconTime = attestation.data.slot.toBeaconTime()
check:
validateAttestation(pool[], attestation, beaconTime, subnet, true).isOk
validateAttestation(pool, batchCrypto, attestation, beaconTime, subnet, true).waitFor().isOk
# Same validator again
validateAttestation(pool[], attestation, beaconTime, subnet, true).error()[0] ==
validateAttestation(pool, batchCrypto, attestation, beaconTime, subnet, true).waitFor().error()[0] ==
ValidationResult.Ignore
pool[].nextAttestationEpoch.setLen(0) # reset for test
check:
# Wrong subnet
validateAttestation(pool[], attestation, beaconTime, subnet + 1, true).isErr
validateAttestation(pool, batchCrypto, attestation, beaconTime, subnet + 1, true).waitFor().isErr
pool[].nextAttestationEpoch.setLen(0) # reset for test
check:
# Too far in the future
validateAttestation(
pool[], attestation, beaconTime - 1.seconds, subnet + 1, true).isErr
pool, batchCrypto, attestation, beaconTime - 1.seconds, subnet + 1, true).waitFor().isErr
pool[].nextAttestationEpoch.setLen(0) # reset for test
check:
# Too far in the past
validateAttestation(
pool[], attestation,
pool, batchCrypto, attestation,
beaconTime - (SECONDS_PER_SLOT * SLOTS_PER_EPOCH - 1).int.seconds,
subnet + 1, true).isErr
subnet + 1, true).waitFor().isErr