async batch verification (+40% sig verification throughput) (#5176)
* async batch verification When batch verification is done, the main thread is blocked reducing concurrency. With this PR, the new thread signalling primitive in chronos is used to offload the full batch verification process to a separate thread allowing the main threads to continue async operations while the other threads verify signatures. Similar to previous behavior, the number of ongoing batch verifications is capped to prevent runaway resource usage. In addition to the asynchronous processing, 3 addition changes help drive throughput: * A loop is used for batch accumulation: this prevents a stampede of small batches in eager mode where both the eager and the scheduled batch runner would pick batches off the queue, prematurely picking "fresh" batches off the queue * An additional small wait is introduced for small batches - this helps create slightly larger batches which make better used of the increased concurrency * Up to 2 batches are scheduled to the threadpool during high pressure, reducing startup latency for the threads Together, these changes increase attestation verification throughput under load up to 30%. * fixup * Update submodules * fix blst build issues (and a PIC warning) * bump --------- Co-authored-by: Zahary Karadjov <zahary@gmail.com>
This commit is contained in:
parent
0a4036baee
commit
b8a32419b8
|
@ -180,7 +180,7 @@ proc verify*(f: EraFile, cfg: RuntimeConfig): Result[Eth2Digest, string] =
|
||||||
|
|
||||||
rng = HmacDrbgContext.new()
|
rng = HmacDrbgContext.new()
|
||||||
taskpool = Taskpool.new()
|
taskpool = Taskpool.new()
|
||||||
var verifier = BatchVerifier(rng: rng, taskpool: taskpool)
|
var verifier = BatchVerifier.init(rng, taskpool)
|
||||||
|
|
||||||
var tmp: seq[byte]
|
var tmp: seq[byte]
|
||||||
? f.getStateSSZ(startSlot, tmp)
|
? f.getStateSSZ(startSlot, tmp)
|
||||||
|
|
|
@ -8,17 +8,18 @@
|
||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[deques, sequtils],
|
std/[atomics, deques, sequtils],
|
||||||
|
stew/ptrops,
|
||||||
metrics,
|
metrics,
|
||||||
# Status
|
# Status
|
||||||
chronicles, chronos,
|
chronicles, chronos, chronos/threadsync,
|
||||||
../spec/signatures_batch,
|
../spec/signatures_batch,
|
||||||
../consensus_object_pools/[blockchain_dag, spec_cache]
|
../consensus_object_pools/[blockchain_dag, spec_cache]
|
||||||
|
|
||||||
export signatures_batch, blockchain_dag
|
export signatures_batch, blockchain_dag
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "gossip_checks"
|
topics = "batch_validation"
|
||||||
|
|
||||||
declareCounter batch_verification_batches,
|
declareCounter batch_verification_batches,
|
||||||
"Total number of batches processed"
|
"Total number of batches processed"
|
||||||
|
@ -26,6 +27,8 @@ declareCounter batch_verification_signatures,
|
||||||
"Total number of verified signatures before aggregation"
|
"Total number of verified signatures before aggregation"
|
||||||
declareCounter batch_verification_aggregates,
|
declareCounter batch_verification_aggregates,
|
||||||
"Total number of verified signatures after aggregation"
|
"Total number of verified signatures after aggregation"
|
||||||
|
declareCounter batch_verification_batches_skipped,
|
||||||
|
"Total number of batches skipped"
|
||||||
|
|
||||||
# Batched gossip validation
|
# Batched gossip validation
|
||||||
# ----------------------------------------------------------------
|
# ----------------------------------------------------------------
|
||||||
|
@ -52,93 +55,119 @@ declareCounter batch_verification_aggregates,
|
||||||
# different signatures, as most validators have the same view of the network -
|
# different signatures, as most validators have the same view of the network -
|
||||||
# at least 2/3 or we're in deep trouble :)
|
# at least 2/3 or we're in deep trouble :)
|
||||||
|
|
||||||
|
const
|
||||||
|
BatchAttAccumTime = 10.milliseconds
|
||||||
|
## Amount of time spent accumulating signatures from the network before
|
||||||
|
## performing verification
|
||||||
|
|
||||||
|
BatchedCryptoSize = 72
|
||||||
|
## Threshold for immediate trigger of batch verification.
|
||||||
|
## A balance between throughput and worst case latency.
|
||||||
|
## At least 6 so that the constant factors
|
||||||
|
## (RNG for blinding and Final Exponentiation)
|
||||||
|
## are amortized, but not too big as we need to redo checks one-by-one if
|
||||||
|
## one failed.
|
||||||
|
## The current value is based on experiments, where 72 gives an average
|
||||||
|
## batch size of ~30 signatures per batch, or 2.5 signatures per aggregate
|
||||||
|
## (meaning an average of 12 verifications per batch which on a raspberry
|
||||||
|
## should be doable in less than 30ms). In the same experiment, a value of
|
||||||
|
## 36 resulted in 17-18 signatures per batch and 1.7-1.9 signatures per
|
||||||
|
## aggregate - this node was running on mainnet with
|
||||||
|
## `--subscribe-all-subnets` turned on - typical nodes will see smaller
|
||||||
|
## batches.
|
||||||
|
|
||||||
|
InflightVerifications = 2
|
||||||
|
## Maximum number of concurrent in-flight verifications
|
||||||
|
|
||||||
type
|
type
|
||||||
BatchResult* {.pure.} = enum
|
BatchResult* {.pure.} = enum
|
||||||
Invalid # Invalid by default
|
Invalid # Invalid by default
|
||||||
Valid
|
Valid
|
||||||
Timeout
|
Timeout
|
||||||
|
|
||||||
Eager = proc(): bool {.gcsafe, raises: [Defect].} ##\
|
Eager = proc(): bool {.gcsafe, raises: [].}
|
||||||
## Callback that returns true if eager processing should be done to lower
|
## Callback that returns true if eager processing should be done to lower
|
||||||
## latency at the expense of spending more cycles validating things, creating
|
## latency at the expense of spending more cycles validating things,
|
||||||
## a crude timesharing priority mechanism.
|
## creating a crude timesharing priority mechanism.
|
||||||
|
|
||||||
BatchItem* = object
|
BatchItem* = object
|
||||||
sigset: SignatureSet
|
sigset: SignatureSet
|
||||||
fut: Future[BatchResult]
|
fut: Future[BatchResult]
|
||||||
|
|
||||||
Batch* = object
|
Batch* = object
|
||||||
|
## A batch represents up to BatchedCryptoSize non-aggregated signatures
|
||||||
created: Moment
|
created: Moment
|
||||||
sigsets: seq[SignatureSet]
|
sigsets: seq[SignatureSet]
|
||||||
items: seq[BatchItem]
|
items: seq[BatchItem]
|
||||||
|
|
||||||
|
VerifierItem = object
|
||||||
|
verifier: ref BatchVerifier
|
||||||
|
signal: ThreadSignalPtr
|
||||||
|
inflight: Future[void]
|
||||||
|
|
||||||
BatchCrypto* = object
|
BatchCrypto* = object
|
||||||
# Each batch is bounded by BatchedCryptoSize which was chosen:
|
|
||||||
# - based on "nimble bench" in nim-blscurve
|
|
||||||
# so that low power devices like Raspberry Pi 4 can process
|
|
||||||
# that many batched verifications within ~30ms on average
|
|
||||||
# - based on the accumulation rate of attestations and aggregates
|
|
||||||
# in large instances which were 12000 per slot (12s)
|
|
||||||
# hence 1 per ms (but the pattern is bursty around the 4s mark)
|
|
||||||
# The number of batches is bounded by time - batch validation is skipped if
|
|
||||||
# we can't process them in the time that one slot takes, and we return
|
|
||||||
# timeout instead which prevents the gossip layer from forwarding the
|
|
||||||
# batch.
|
|
||||||
batches: Deque[ref Batch]
|
batches: Deque[ref Batch]
|
||||||
eager: Eager ##\
|
eager: Eager
|
||||||
## Eager is used to enable eager processing of attestations when it's
|
## Eager is used to enable eager processing of attestations when it's
|
||||||
## prudent to do so (instead of leaving the CPU for other, presumably more
|
## prudent to do so (instead of leaving the CPU for other, presumably more
|
||||||
## important work like block processing)
|
## important work like block processing)
|
||||||
##
|
|
||||||
verifier: BatchVerifier
|
taskpool: Taskpool
|
||||||
|
rng: ref HmacDrbgContext
|
||||||
|
|
||||||
|
verifiers: array[InflightVerifications, VerifierItem]
|
||||||
|
## Each batch verification reqires a separate verifier
|
||||||
|
verifier: int
|
||||||
|
|
||||||
pruneTime: Moment ## last time we had to prune something
|
pruneTime: Moment ## last time we had to prune something
|
||||||
|
|
||||||
# `nim-metrics` library is a bit too slow to update on every batch, so
|
|
||||||
# we accumulate here instead
|
|
||||||
counts: tuple[signatures, batches, aggregates: int64]
|
counts: tuple[signatures, batches, aggregates: int64]
|
||||||
|
# `nim-metrics` library is a bit too slow to update on every batch, so
|
||||||
|
# we accumulate here instead
|
||||||
|
|
||||||
# Most scheduled checks require this immutable value, so don't require it
|
|
||||||
# to be provided separately each time
|
|
||||||
genesis_validators_root: Eth2Digest
|
genesis_validators_root: Eth2Digest
|
||||||
|
# Most scheduled checks require this immutable value, so don't require it
|
||||||
|
# to be provided separately each time
|
||||||
|
|
||||||
const
|
processor: Future[void]
|
||||||
# We cap waiting for an idle slot in case there's a lot of network traffic
|
|
||||||
# taking up all CPU - we don't want to _completely_ stop processing
|
|
||||||
# attestations - doing so also allows us to benefit from more batching /
|
|
||||||
# larger network reads when under load.
|
|
||||||
BatchAttAccumTime = 10.milliseconds
|
|
||||||
|
|
||||||
# Threshold for immediate trigger of batch verification.
|
BatchTask = object
|
||||||
# A balance between throughput and worst case latency.
|
ok: Atomic[bool]
|
||||||
# At least 6 so that the constant factors
|
setsPtr: ptr UncheckedArray[SignatureSet]
|
||||||
# (RNG for blinding and Final Exponentiation)
|
numSets: int
|
||||||
# are amortized, but not too big as we need to redo checks one-by-one if one
|
secureRandomBytes: array[32, byte]
|
||||||
# failed.
|
taskpool: Taskpool
|
||||||
# The current value is based on experiments, where 72 gives an average batch
|
cache: ptr BatchedBLSVerifierCache
|
||||||
# size of ~30 signatures per batch, or 2.5 signatures per aggregate (meaning
|
signal: ThreadSignalPtr
|
||||||
# an average of 12 verifications per batch which on a raspberry should be
|
|
||||||
# doable in less than 30ms). In the same experiment, a value of 36 resulted
|
|
||||||
# in 17-18 signatures per batch and 1.7-1.9 signatures per aggregate - this
|
|
||||||
# node was running on mainnet with `--subscribe-all-subnets` turned on -
|
|
||||||
# typical nodes will see smaller batches.
|
|
||||||
BatchedCryptoSize = 72
|
|
||||||
|
|
||||||
proc new*(
|
proc new*(
|
||||||
T: type BatchCrypto, rng: ref HmacDrbgContext,
|
T: type BatchCrypto, rng: ref HmacDrbgContext,
|
||||||
eager: Eager, genesis_validators_root: Eth2Digest, taskpool: TaskPoolPtr):
|
eager: Eager, genesis_validators_root: Eth2Digest, taskpool: TaskPoolPtr):
|
||||||
ref BatchCrypto =
|
Result[ref BatchCrypto, string] =
|
||||||
(ref BatchCrypto)(
|
let res = (ref BatchCrypto)(
|
||||||
verifier: BatchVerifier(rng: rng, taskpool: taskpool),
|
rng: rng, taskpool: taskpool,
|
||||||
eager: eager,
|
eager: eager,
|
||||||
genesis_validators_root: genesis_validators_root,
|
genesis_validators_root: genesis_validators_root,
|
||||||
pruneTime: Moment.now())
|
pruneTime: Moment.now())
|
||||||
|
|
||||||
func len(batch: Batch): int =
|
for i in 0..<res.verifiers.len:
|
||||||
batch.items.len()
|
res.verifiers[i] = VerifierItem(
|
||||||
|
verifier: BatchVerifier.new(rng, taskpool),
|
||||||
|
signal: block:
|
||||||
|
let sig = ThreadSignalPtr.new()
|
||||||
|
sig.valueOr:
|
||||||
|
for j in 0..<i:
|
||||||
|
discard res.verifiers[j].signal.close()
|
||||||
|
return err(sig.error())
|
||||||
|
)
|
||||||
|
|
||||||
|
ok res
|
||||||
|
|
||||||
func full(batch: Batch): bool =
|
func full(batch: Batch): bool =
|
||||||
batch.len() >= BatchedCryptoSize
|
batch.items.len() >= BatchedCryptoSize
|
||||||
|
|
||||||
|
func half(batch: Batch): bool =
|
||||||
|
batch.items.len() >= (BatchedCryptoSize div 2)
|
||||||
|
|
||||||
proc complete(batchItem: var BatchItem, v: BatchResult) =
|
proc complete(batchItem: var BatchItem, v: BatchResult) =
|
||||||
batchItem.fut.complete(v)
|
batchItem.fut.complete(v)
|
||||||
|
@ -146,26 +175,36 @@ proc complete(batchItem: var BatchItem, v: BatchResult) =
|
||||||
|
|
||||||
proc complete(batchItem: var BatchItem, ok: bool) =
|
proc complete(batchItem: var BatchItem, ok: bool) =
|
||||||
batchItem.fut.complete(if ok: BatchResult.Valid else: BatchResult.Invalid)
|
batchItem.fut.complete(if ok: BatchResult.Valid else: BatchResult.Invalid)
|
||||||
batchItem.fut = nil
|
|
||||||
|
|
||||||
proc skip(batch: var Batch) =
|
proc skip(batch: var Batch) =
|
||||||
for res in batch.items.mitems():
|
for res in batch.items.mitems():
|
||||||
res.complete(BatchResult.Timeout)
|
res.complete(BatchResult.Timeout)
|
||||||
|
|
||||||
proc pruneBatchQueue(batchCrypto: ref BatchCrypto) =
|
proc complete(batchCrypto: var BatchCrypto, batch: var Batch, ok: bool) =
|
||||||
let
|
if ok:
|
||||||
now = Moment.now()
|
for res in batch.items.mitems():
|
||||||
|
res.complete(BatchResult.Valid)
|
||||||
|
else:
|
||||||
|
# Batched verification failed meaning that some of the signature checks
|
||||||
|
# failed, but we don't know which ones - check each signature separately
|
||||||
|
# instead
|
||||||
|
debug "batch crypto - failure, falling back",
|
||||||
|
items = batch.items.len()
|
||||||
|
|
||||||
# If batches haven't been processed for more than 12 seconds
|
for item in batch.items.mitems():
|
||||||
while batchCrypto.batches.len() > 0:
|
item.complete(blsVerify item.sigset)
|
||||||
if batchCrypto.batches[0][].created + SECONDS_PER_SLOT.int64.seconds > now:
|
|
||||||
break
|
|
||||||
if batchCrypto.pruneTime + SECONDS_PER_SLOT.int64.seconds > now:
|
|
||||||
notice "Batch queue pruned, skipping attestation validation",
|
|
||||||
batches = batchCrypto.batches.len()
|
|
||||||
batchCrypto.pruneTime = Moment.now()
|
|
||||||
|
|
||||||
batchCrypto.batches.popFirst()[].skip()
|
batchCrypto.counts.batches += 1
|
||||||
|
batchCrypto.counts.signatures += batch.items.len()
|
||||||
|
batchCrypto.counts.aggregates += batch.sigsets.len()
|
||||||
|
|
||||||
|
if batchCrypto.counts.batches >= 256:
|
||||||
|
# Not too often, so as not to overwhelm our metrics
|
||||||
|
batch_verification_batches.inc(batchCrypto.counts.batches)
|
||||||
|
batch_verification_signatures.inc(batchCrypto.counts.signatures)
|
||||||
|
batch_verification_aggregates.inc(batchCrypto.counts.aggregates)
|
||||||
|
|
||||||
|
reset(batchCrypto.counts)
|
||||||
|
|
||||||
func combine(a: var Signature, b: Signature) =
|
func combine(a: var Signature, b: Signature) =
|
||||||
var tmp = AggregateSignature.init(CookedSig(a))
|
var tmp = AggregateSignature.init(CookedSig(a))
|
||||||
|
@ -177,135 +216,158 @@ func combine(a: var PublicKey, b: PublicKey) =
|
||||||
tmp.aggregate(b)
|
tmp.aggregate(b)
|
||||||
a = PublicKey(tmp.finish())
|
a = PublicKey(tmp.finish())
|
||||||
|
|
||||||
proc processBatch(batchCrypto: ref BatchCrypto) =
|
proc batchVerifyTask(task: ptr BatchTask) {.nimcall.} =
|
||||||
## Process one batch, if there is any
|
# Task suitable for running in taskpools - look, no GC!
|
||||||
|
let
|
||||||
|
tp = task[].taskpool
|
||||||
|
ok = tp.spawn batchVerify(
|
||||||
|
tp, task[].cache, task[].setsPtr, task[].numSets,
|
||||||
|
addr task[].secureRandomBytes)
|
||||||
|
|
||||||
# Pruning the queue here makes sure we catch up with processing if need be
|
task[].ok.store(sync ok)
|
||||||
batchCrypto.pruneBatchQueue() # Skip old batches
|
|
||||||
|
|
||||||
if batchCrypto[].batches.len() == 0:
|
discard task[].signal.fireSync()
|
||||||
# No more batches left, they might have been eagerly processed or pruned
|
|
||||||
|
proc batchVerifyAsync*(
|
||||||
|
verifier: ref BatchVerifier, signal: ThreadSignalPtr,
|
||||||
|
batch: ref Batch): Future[bool] {.async.} =
|
||||||
|
var task = BatchTask(
|
||||||
|
setsPtr: makeUncheckedArray(baseAddr batch[].sigsets),
|
||||||
|
numSets: batch[].sigsets.len,
|
||||||
|
taskpool: verifier[].taskpool,
|
||||||
|
cache: addr verifier[].sigVerifCache,
|
||||||
|
signal: signal,
|
||||||
|
)
|
||||||
|
verifier[].rng[].generate(task.secureRandomBytes)
|
||||||
|
|
||||||
|
# task will stay allocated in the async environment at least until the signal
|
||||||
|
# has fired at which point it's safe to release it
|
||||||
|
let taskPtr = addr task
|
||||||
|
verifier[].taskpool.spawn batchVerifyTask(taskPtr)
|
||||||
|
await signal.wait()
|
||||||
|
task.ok.load()
|
||||||
|
|
||||||
|
proc processBatch(
|
||||||
|
batchCrypto: ref BatchCrypto, batch: ref Batch,
|
||||||
|
verifier: ref BatchVerifier, signal: ThreadSignalPtr) {.async.} =
|
||||||
|
let
|
||||||
|
numSets = batch[].sigsets.len()
|
||||||
|
|
||||||
|
if numSets == 0:
|
||||||
|
# Nothing to do in this batch, can happen when a batch is created without
|
||||||
|
# there being any signatures successfully added to it
|
||||||
return
|
return
|
||||||
|
|
||||||
let
|
let
|
||||||
batch = batchCrypto[].batches.popFirst()
|
startTick = Moment.now()
|
||||||
batchSize = batch[].sigsets.len()
|
|
||||||
|
|
||||||
if batchSize == 0:
|
# If the hardware is too slow to keep up or an event caused a temporary
|
||||||
# Nothing to do in this batch, can happen when a batch is created without
|
# buildup of signature verification tasks, the batch will be dropped so as to
|
||||||
# there being any signatures successfully added to it
|
# recover and not cause even further buildup - this puts an (elastic) upper
|
||||||
discard
|
# bound on the amount of queued-up work
|
||||||
else:
|
if batch[].created + SECONDS_PER_SLOT.int64.seconds < startTick:
|
||||||
trace "batch crypto - starting",
|
if batchCrypto.pruneTime + SECONDS_PER_SLOT.int64.seconds < startTick:
|
||||||
batchSize
|
notice "Batch queue pruned, skipping attestation validation",
|
||||||
|
batches = batchCrypto.batches.len()
|
||||||
|
batchCrypto.pruneTime = startTick
|
||||||
|
|
||||||
let
|
batch[].skip()
|
||||||
startTick = Moment.now()
|
|
||||||
ok =
|
|
||||||
if batchSize == 1: blsVerify(batch[].sigsets[0])
|
|
||||||
else: batchCrypto.verifier.batchVerify(batch[].sigsets)
|
|
||||||
|
|
||||||
trace "batch crypto - finished",
|
batch_verification_batches_skipped.inc()
|
||||||
batchSize,
|
|
||||||
cryptoVerified = ok,
|
return
|
||||||
|
|
||||||
|
trace "batch crypto - starting", numSets, items = batch[].items.len
|
||||||
|
|
||||||
|
if numSets == 1:
|
||||||
|
# Avoid taskpools overhead when there's only a single signature to verify
|
||||||
|
trace "batch crypto - finished (1)",
|
||||||
|
numSets, items = batch[].items.len(),
|
||||||
batchDur = Moment.now() - startTick
|
batchDur = Moment.now() - startTick
|
||||||
|
|
||||||
if ok:
|
batchCrypto[].complete(batch[], blsVerify(batch[].sigsets[0]))
|
||||||
for res in batch.items.mitems():
|
return
|
||||||
res.complete(BatchResult.Valid)
|
|
||||||
else:
|
|
||||||
# Batched verification failed meaning that some of the signature checks
|
|
||||||
# failed, but we don't know which ones - check each signature separately
|
|
||||||
# instead
|
|
||||||
debug "batch crypto - failure, falling back",
|
|
||||||
items = batch[].items.len()
|
|
||||||
|
|
||||||
for item in batch[].items.mitems():
|
let ok = await batchVerifyAsync(verifier, signal, batch)
|
||||||
item.complete(blsVerify item.sigset)
|
|
||||||
|
|
||||||
batchCrypto[].counts.batches += 1
|
trace "batch crypto - finished",
|
||||||
batchCrypto[].counts.signatures += batch[].items.len()
|
numSets, items = batch[].items.len(), ok,
|
||||||
batchCrypto[].counts.aggregates += batch[].sigsets.len()
|
batchDur = Moment.now() - startTick
|
||||||
|
|
||||||
if batchCrypto[].counts.batches >= 256:
|
batchCrypto[].complete(batch[], ok)
|
||||||
# Not too often, so as not to overwhelm our metrics
|
|
||||||
batch_verification_batches.inc(batchCrypto[].counts.batches)
|
|
||||||
batch_verification_signatures.inc(batchCrypto[].counts.signatures)
|
|
||||||
batch_verification_aggregates.inc(batchCrypto[].counts.aggregates)
|
|
||||||
|
|
||||||
reset(batchCrypto[].counts)
|
proc processLoop(batchCrypto: ref BatchCrypto) {.async.} =
|
||||||
|
|
||||||
proc deferCryptoProcessing(batchCrypto: ref BatchCrypto) {.async.} =
|
|
||||||
## Process pending crypto check after some time has passed - the time is
|
## Process pending crypto check after some time has passed - the time is
|
||||||
## chosen such that there's time to fill the batch but not so long that
|
## chosen such that there's time to fill the batch but not so long that
|
||||||
## latency across the network is negatively affected
|
## latency across the network is negatively affected
|
||||||
await sleepAsync(BatchAttAccumTime)
|
while batchCrypto[].batches.len() > 0:
|
||||||
|
# When eager processing is enabled, we can start processing the next batch
|
||||||
|
# as soon as it's full - otherwise, wait for more signatures to accumulate
|
||||||
|
if not batchCrypto[].batches.peekFirst()[].full() or
|
||||||
|
not batchCrypto[].eager():
|
||||||
|
|
||||||
# Take the first batch in the queue and process it - if eager processing has
|
await sleepAsync(BatchAttAccumTime)
|
||||||
# stolen it already, that's fine
|
|
||||||
batchCrypto.processBatch()
|
|
||||||
|
|
||||||
proc getBatch(batchCrypto: ref BatchCrypto): (ref Batch, bool) =
|
# We still haven't filled even half the batch - wait a bit more (and give
|
||||||
# Get a batch suitable for attestation processing - in particular, attestation
|
# chonos time to work its task queue)
|
||||||
# batches might be skipped
|
if not batchCrypto[].batches.peekFirst()[].half():
|
||||||
batchCrypto.pruneBatchQueue()
|
await sleepAsync(BatchAttAccumTime div 2)
|
||||||
|
|
||||||
|
# Pick the "next" verifier
|
||||||
|
let verifier = (batchCrypto[].verifier + 1) mod batchCrypto.verifiers.len
|
||||||
|
batchCrypto[].verifier = verifier
|
||||||
|
|
||||||
|
# BatchVerifier:s may not be shared, so make sure the previous round
|
||||||
|
# using this verifier is finished
|
||||||
|
if batchCrypto[].verifiers[verifier].inflight != nil and
|
||||||
|
not batchCrypto[].verifiers[verifier].inflight.finished():
|
||||||
|
await batchCrypto[].verifiers[verifier].inflight
|
||||||
|
|
||||||
|
batchCrypto[].verifiers[verifier].inflight = batchCrypto.processBatch(
|
||||||
|
batchCrypto[].batches.popFirst(),
|
||||||
|
batchCrypto[].verifiers[verifier].verifier,
|
||||||
|
batchCrypto[].verifiers[verifier].signal)
|
||||||
|
|
||||||
|
proc getBatch(batchCrypto: var BatchCrypto): ref Batch =
|
||||||
if batchCrypto.batches.len() == 0 or
|
if batchCrypto.batches.len() == 0 or
|
||||||
batchCrypto.batches.peekLast[].full():
|
batchCrypto.batches.peekLast[].full():
|
||||||
# There are no batches in progress - start a new batch and schedule a
|
|
||||||
# deferred task to eventually handle it
|
|
||||||
let batch = (ref Batch)(created: Moment.now())
|
let batch = (ref Batch)(created: Moment.now())
|
||||||
batchCrypto[].batches.addLast(batch)
|
batchCrypto.batches.addLast(batch)
|
||||||
(batch, true)
|
batch
|
||||||
else:
|
else:
|
||||||
let batch = batchCrypto[].batches.peekLast()
|
batchCrypto.batches.peekLast()
|
||||||
# len will be 0 when the batch was created but nothing added to it
|
|
||||||
# because of early failures
|
|
||||||
(batch, batch[].len() == 0)
|
|
||||||
|
|
||||||
proc scheduleBatch(batchCrypto: ref BatchCrypto, fresh: bool) =
|
proc scheduleProcessor(batchCrypto: ref BatchCrypto) =
|
||||||
if fresh:
|
if batchCrypto.processor == nil or batchCrypto.processor.finished():
|
||||||
# Every time we start a new round of batching, we need to launch a deferred
|
batchCrypto.processor = batchCrypto.processLoop()
|
||||||
# task that will compute the result of the batch eventually in case the
|
|
||||||
# batch is never filled or eager processing is blocked
|
|
||||||
asyncSpawn batchCrypto.deferCryptoProcessing()
|
|
||||||
|
|
||||||
if batchCrypto.batches.len() > 0 and
|
proc verifySoon(
|
||||||
batchCrypto.batches.peekFirst()[].full() and
|
batchCrypto: ref BatchCrypto, name: static string,
|
||||||
batchCrypto.eager():
|
sigset: SignatureSet): Future[BatchResult] =
|
||||||
# If there's a full batch, process it eagerly assuming the callback allows
|
let
|
||||||
batchCrypto.processBatch()
|
batch = batchCrypto[].getBatch()
|
||||||
|
fut = newFuture[BatchResult](name)
|
||||||
|
|
||||||
template withBatch(
|
var found = false
|
||||||
batchCrypto: ref BatchCrypto, name: cstring,
|
# Find existing signature sets with the same message - if we can verify an
|
||||||
body: untyped): Future[BatchResult] =
|
# aggregate instead of several signatures, that is _much_ faster
|
||||||
block:
|
for item in batch[].sigsets.mitems():
|
||||||
let
|
if item.message == sigset.message:
|
||||||
(batch, fresh) = batchCrypto.getBatch()
|
item.signature.combine(sigset.signature)
|
||||||
|
item.pubkey.combine(sigset.pubkey)
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
|
||||||
let
|
if not found:
|
||||||
fut = newFuture[BatchResult](name)
|
batch[].sigsets.add sigset
|
||||||
sigset = body
|
|
||||||
|
|
||||||
var found = false
|
# We need to keep the "original" sigset to allow verifying each signature
|
||||||
# Find existing signature sets with the same message - if we can verify an
|
# one by one in the case the combined operation fails
|
||||||
# aggregate instead of several signatures, that is _much_ faster
|
batch[].items.add(BatchItem(sigset: sigset, fut: fut))
|
||||||
for item in batch[].sigsets.mitems():
|
|
||||||
if item.message == sigset.message:
|
|
||||||
item.signature.combine(sigset.signature)
|
|
||||||
item.pubkey.combine(sigset.pubkey)
|
|
||||||
found = true
|
|
||||||
break
|
|
||||||
|
|
||||||
if not found:
|
batchCrypto.scheduleProcessor()
|
||||||
batch[].sigsets.add sigset
|
|
||||||
|
|
||||||
# We need to keep the "original" sigset to allow verifying each signature
|
fut
|
||||||
# one by one in the case the combined operation fails
|
|
||||||
batch[].items.add(BatchItem(sigset: sigset, fut: fut))
|
|
||||||
|
|
||||||
batchCrypto.scheduleBatch(fresh)
|
|
||||||
fut
|
|
||||||
|
|
||||||
# See also verify_attestation_signature
|
# See also verify_attestation_signature
|
||||||
proc scheduleAttestationCheck*(
|
proc scheduleAttestationCheck*(
|
||||||
|
@ -325,7 +387,7 @@ proc scheduleAttestationCheck*(
|
||||||
let
|
let
|
||||||
sig = signature.load().valueOr:
|
sig = signature.load().valueOr:
|
||||||
return err("attestation: cannot load signature")
|
return err("attestation: cannot load signature")
|
||||||
fut = batchCrypto.withBatch("batch_validation.scheduleAttestationCheck"):
|
fut = batchCrypto.verifySoon("batch_validation.scheduleAttestationCheck"):
|
||||||
attestation_signature_set(
|
attestation_signature_set(
|
||||||
fork, batchCrypto[].genesis_validators_root, attestationData, pubkey,
|
fork, batchCrypto[].genesis_validators_root, attestationData, pubkey,
|
||||||
sig)
|
sig)
|
||||||
|
@ -370,15 +432,15 @@ proc scheduleAggregateChecks*(
|
||||||
return err("aggregateAndProof: invalid aggregate signature")
|
return err("aggregateAndProof: invalid aggregate signature")
|
||||||
|
|
||||||
let
|
let
|
||||||
aggregatorFut = batchCrypto.withBatch("scheduleAggregateChecks.aggregator"):
|
aggregatorFut = batchCrypto.verifySoon("scheduleAggregateChecks.aggregator"):
|
||||||
aggregate_and_proof_signature_set(
|
aggregate_and_proof_signature_set(
|
||||||
fork, batchCrypto[].genesis_validators_root, aggregate_and_proof,
|
fork, batchCrypto[].genesis_validators_root, aggregate_and_proof,
|
||||||
aggregatorKey, aggregatorSig)
|
aggregatorKey, aggregatorSig)
|
||||||
slotFut = batchCrypto.withBatch("scheduleAggregateChecks.selection_proof"):
|
slotFut = batchCrypto.verifySoon("scheduleAggregateChecks.selection_proof"):
|
||||||
slot_signature_set(
|
slot_signature_set(
|
||||||
fork, batchCrypto[].genesis_validators_root, aggregate.data.slot,
|
fork, batchCrypto[].genesis_validators_root, aggregate.data.slot,
|
||||||
aggregatorKey, slotSig)
|
aggregatorKey, slotSig)
|
||||||
aggregateFut = batchCrypto.withBatch("scheduleAggregateChecks.aggregate"):
|
aggregateFut = batchCrypto.verifySoon("scheduleAggregateChecks.aggregate"):
|
||||||
attestation_signature_set(
|
attestation_signature_set(
|
||||||
fork, batchCrypto[].genesis_validators_root, aggregate.data,
|
fork, batchCrypto[].genesis_validators_root, aggregate.data,
|
||||||
aggregateKey, aggregateSig)
|
aggregateKey, aggregateSig)
|
||||||
|
@ -402,7 +464,7 @@ proc scheduleSyncCommitteeMessageCheck*(
|
||||||
let
|
let
|
||||||
sig = signature.load().valueOr:
|
sig = signature.load().valueOr:
|
||||||
return err("SyncCommitteMessage: cannot load signature")
|
return err("SyncCommitteMessage: cannot load signature")
|
||||||
fut = batchCrypto.withBatch("scheduleSyncCommitteeMessageCheck"):
|
fut = batchCrypto.verifySoon("scheduleSyncCommitteeMessageCheck"):
|
||||||
sync_committee_message_signature_set(
|
sync_committee_message_signature_set(
|
||||||
fork, batchCrypto[].genesis_validators_root, slot, beacon_block_root,
|
fork, batchCrypto[].genesis_validators_root, slot, beacon_block_root,
|
||||||
pubkey, sig)
|
pubkey, sig)
|
||||||
|
@ -444,15 +506,15 @@ proc scheduleContributionChecks*(
|
||||||
dag, dag.syncCommitteeParticipants(contribution.slot + 1, subcommitteeIdx),
|
dag, dag.syncCommitteeParticipants(contribution.slot + 1, subcommitteeIdx),
|
||||||
contribution.aggregation_bits)
|
contribution.aggregation_bits)
|
||||||
let
|
let
|
||||||
aggregatorFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.aggregator"):
|
aggregatorFut = batchCrypto.verifySoon("scheduleContributionAndProofChecks.aggregator"):
|
||||||
contribution_and_proof_signature_set(
|
contribution_and_proof_signature_set(
|
||||||
fork, batchCrypto[].genesis_validators_root, contribution_and_proof,
|
fork, batchCrypto[].genesis_validators_root, contribution_and_proof,
|
||||||
aggregatorKey, aggregatorSig)
|
aggregatorKey, aggregatorSig)
|
||||||
proofFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.selection_proof"):
|
proofFut = batchCrypto.verifySoon("scheduleContributionAndProofChecks.selection_proof"):
|
||||||
sync_committee_selection_proof_set(
|
sync_committee_selection_proof_set(
|
||||||
fork, batchCrypto[].genesis_validators_root, contribution.slot,
|
fork, batchCrypto[].genesis_validators_root, contribution.slot,
|
||||||
subcommitteeIdx, aggregatorKey, proofSig)
|
subcommitteeIdx, aggregatorKey, proofSig)
|
||||||
contributionFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.contribution"):
|
contributionFut = batchCrypto.verifySoon("scheduleContributionAndProofChecks.contribution"):
|
||||||
sync_committee_message_signature_set(
|
sync_committee_message_signature_set(
|
||||||
fork, batchCrypto[].genesis_validators_root, contribution.slot,
|
fork, batchCrypto[].genesis_validators_root, contribution.slot,
|
||||||
contribution.beacon_block_root, contributionKey, contributionSig)
|
contribution.beacon_block_root, contributionKey, contributionSig)
|
||||||
|
@ -460,11 +522,10 @@ proc scheduleContributionChecks*(
|
||||||
ok((aggregatorFut, proofFut, contributionFut, contributionSig))
|
ok((aggregatorFut, proofFut, contributionFut, contributionSig))
|
||||||
|
|
||||||
proc scheduleBlsToExecutionChangeCheck*(
|
proc scheduleBlsToExecutionChangeCheck*(
|
||||||
batchCrypto: ref BatchCrypto,
|
batchCrypto: ref BatchCrypto,
|
||||||
genesisFork: Fork,
|
genesis_fork: Fork, signedBLSToExecutionChange: SignedBLSToExecutionChange,
|
||||||
signedBLSToExecutionChange: SignedBLSToExecutionChange): Result[tuple[
|
dag: ChainDAGRef):
|
||||||
blsToExecutionFut: Future[BatchResult],
|
Result[tuple[fut: Future[BatchResult], sig: CookedSig], cstring] =
|
||||||
sig: CookedSig], cstring] =
|
|
||||||
## Schedule crypto verification of all signatures in a
|
## Schedule crypto verification of all signatures in a
|
||||||
## SignedBLSToExecutionChange message
|
## SignedBLSToExecutionChange message
|
||||||
##
|
##
|
||||||
|
@ -481,16 +542,15 @@ proc scheduleBlsToExecutionChangeCheck*(
|
||||||
let
|
let
|
||||||
# Only called when matching already-known withdrawal credentials, so it's
|
# Only called when matching already-known withdrawal credentials, so it's
|
||||||
# resistant to allowing loadWithCache DoSing
|
# resistant to allowing loadWithCache DoSing
|
||||||
validatorChangePubkey =
|
pubkey = dag.validatorKey(
|
||||||
signedBLSToExecutionChange.message.from_bls_pubkey.loadWithCache.valueOr:
|
signedBLSToExecutionChange.message.validator_index).valueOr:
|
||||||
return err("scheduleBlsToExecutionChangeCheck: cannot load BLS to withdrawals pubkey")
|
return err("SignedAggregateAndProof: invalid validator index")
|
||||||
|
sig = signedBLSToExecutionChange.signature.load().valueOr:
|
||||||
validatorChangeSig = signedBLSToExecutionChange.signature.load().valueOr:
|
|
||||||
return err("scheduleBlsToExecutionChangeCheck: invalid validator change signature")
|
return err("scheduleBlsToExecutionChangeCheck: invalid validator change signature")
|
||||||
validatorChangeFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.contribution"):
|
fut = batchCrypto.verifySoon("scheduleContributionAndProofChecks.contribution"):
|
||||||
bls_to_execution_change_signature_set(
|
bls_to_execution_change_signature_set(
|
||||||
genesis_fork, batchCrypto[].genesis_validators_root,
|
genesis_fork, batchCrypto[].genesis_validators_root,
|
||||||
signedBLSToExecutionChange.message,
|
signedBLSToExecutionChange.message,
|
||||||
validatorChangePubkey, validatorChangeSig)
|
pubkey, sig)
|
||||||
|
|
||||||
ok((validatorChangeFut, validatorChangeSig))
|
ok((fut, sig))
|
||||||
|
|
|
@ -144,7 +144,7 @@ proc new*(T: type BlockProcessor,
|
||||||
validatorMonitor: validatorMonitor,
|
validatorMonitor: validatorMonitor,
|
||||||
blobQuarantine: blobQuarantine,
|
blobQuarantine: blobQuarantine,
|
||||||
getBeaconTime: getBeaconTime,
|
getBeaconTime: getBeaconTime,
|
||||||
verifier: BatchVerifier(rng: rng, taskpool: taskpool),
|
verifier: BatchVerifier.init(rng, taskpool)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Sync callbacks
|
# Sync callbacks
|
||||||
|
|
|
@ -194,7 +194,8 @@ proc new*(T: type Eth2Processor,
|
||||||
# Only run eager attestation signature verification if we're not
|
# Only run eager attestation signature verification if we're not
|
||||||
# processing blocks in order to give priority to block processing
|
# processing blocks in order to give priority to block processing
|
||||||
eager = proc(): bool = not blockProcessor[].hasBlocks(),
|
eager = proc(): bool = not blockProcessor[].hasBlocks(),
|
||||||
genesis_validators_root = dag.genesis_validators_root, taskpool)
|
genesis_validators_root = dag.genesis_validators_root, taskpool).expect(
|
||||||
|
"working batcher")
|
||||||
)
|
)
|
||||||
|
|
||||||
# Each validator logs, validates then passes valid data to its destination
|
# Each validator logs, validates then passes valid data to its destination
|
||||||
|
|
|
@ -974,7 +974,7 @@ proc validateBlsToExecutionChange*(
|
||||||
|
|
||||||
# BLS to execution change signatures are batch-verified
|
# BLS to execution change signatures are batch-verified
|
||||||
let deferredCrypto = batchCrypto.scheduleBlsToExecutionChangeCheck(
|
let deferredCrypto = batchCrypto.scheduleBlsToExecutionChangeCheck(
|
||||||
pool.dag.cfg.genesisFork, signed_address_change)
|
pool.dag.cfg.genesisFork, signed_address_change, pool.dag)
|
||||||
if deferredCrypto.isErr():
|
if deferredCrypto.isErr():
|
||||||
return pool.checkedReject(deferredCrypto.error)
|
return pool.checkedReject(deferredCrypto.error)
|
||||||
|
|
||||||
|
|
|
@ -41,7 +41,7 @@ gnosis_chiado_genesis_end:
|
||||||
cdecl(gnosis_chiado_genesis_size):
|
cdecl(gnosis_chiado_genesis_size):
|
||||||
.quad gnosis_chiado_genesis_end - gnosis_chiado_genesis_data
|
.quad gnosis_chiado_genesis_end - gnosis_chiado_genesis_data
|
||||||
|
|
||||||
#if defined(__linux__) && defined(__pie__)
|
#if defined(__linux__) && (defined(__pie__) || defined(__pic__))
|
||||||
.section .data.rel.ro,"aw",@progbits
|
.section .data.rel.ro,"aw",@progbits
|
||||||
#elif defined(__APPLE__)
|
#elif defined(__APPLE__)
|
||||||
.section __DATA,__const
|
.section __DATA,__const
|
||||||
|
|
|
@ -48,7 +48,7 @@ eth2_sepolia_genesis_end:
|
||||||
cdecl(eth2_sepolia_genesis_size):
|
cdecl(eth2_sepolia_genesis_size):
|
||||||
.quad eth2_sepolia_genesis_end - eth2_sepolia_genesis_data
|
.quad eth2_sepolia_genesis_end - eth2_sepolia_genesis_data
|
||||||
|
|
||||||
#if defined(__linux__) && defined(__pie__)
|
#if defined(__linux__) && (defined(__pie__) || defined(__pic__))
|
||||||
.section .data.rel.ro,"aw",@progbits
|
.section .data.rel.ro,"aw",@progbits
|
||||||
#elif defined(__APPLE__)
|
#elif defined(__APPLE__)
|
||||||
.section __DATA,__const
|
.section __DATA,__const
|
||||||
|
|
|
@ -28,13 +28,30 @@ type
|
||||||
TaskPoolPtr* = Taskpool
|
TaskPoolPtr* = Taskpool
|
||||||
|
|
||||||
BatchVerifier* = object
|
BatchVerifier* = object
|
||||||
sigVerifCache*: BatchedBLSVerifierCache ##\
|
sigVerifCache*: BatchedBLSVerifierCache
|
||||||
## A cache for batch BLS signature verification contexts
|
## A cache for batch BLS signature verification contexts
|
||||||
rng*: ref HmacDrbgContext ##\
|
rng*: ref HmacDrbgContext
|
||||||
## A reference to the Nimbus application-wide RNG
|
## A reference to the Nimbus application-wide RNG
|
||||||
|
|
||||||
taskpool*: TaskPoolPtr
|
taskpool*: TaskPoolPtr
|
||||||
|
|
||||||
|
proc init*(
|
||||||
|
T: type BatchVerifier, rng: ref HmacDrbgContext,
|
||||||
|
taskpool: TaskPoolPtr): BatchVerifier =
|
||||||
|
BatchVerifier(
|
||||||
|
sigVerifCache: BatchedBLSVerifierCache.init(taskpool),
|
||||||
|
rng: rng,
|
||||||
|
taskpool: taskpool,
|
||||||
|
)
|
||||||
|
|
||||||
|
proc new*(
|
||||||
|
T: type BatchVerifier, rng: ref HmacDrbgContext,
|
||||||
|
taskpool: TaskPoolPtr): ref BatchVerifier =
|
||||||
|
(ref BatchVerifier)(
|
||||||
|
sigVerifCache: BatchedBLSVerifierCache.init(taskpool),
|
||||||
|
rng: rng,
|
||||||
|
taskpool: taskpool,
|
||||||
|
)
|
||||||
|
|
||||||
func `$`*(s: SignatureSet): string =
|
func `$`*(s: SignatureSet): string =
|
||||||
"(pubkey: 0x" & s.pubkey.toHex() &
|
"(pubkey: 0x" & s.pubkey.toHex() &
|
||||||
", signing_root: 0x" & s.message.toHex() &
|
", signing_root: 0x" & s.message.toHex() &
|
||||||
|
@ -433,7 +450,5 @@ proc collectSignatureSets*(
|
||||||
|
|
||||||
proc batchVerify*(verifier: var BatchVerifier, sigs: openArray[SignatureSet]): bool =
|
proc batchVerify*(verifier: var BatchVerifier, sigs: openArray[SignatureSet]): bool =
|
||||||
let bytes = verifier.rng[].generate(array[32, byte])
|
let bytes = verifier.rng[].generate(array[32, byte])
|
||||||
try:
|
|
||||||
verifier.taskpool.batchVerify(verifier.sigVerifCache, sigs, bytes)
|
verifier.taskpool.batchVerify(verifier.sigVerifCache, sigs, bytes)
|
||||||
except Exception as exc:
|
|
||||||
raiseAssert exc.msg # Shouldn't happen
|
|
||||||
|
|
|
@ -49,6 +49,12 @@ if defined(release) and not defined(disableLTO):
|
||||||
# used for static libraries.
|
# used for static libraries.
|
||||||
discard
|
discard
|
||||||
|
|
||||||
|
# Hidden visibility allows for better position-independent codegen - it also
|
||||||
|
# resolves a build issue in BLST where otherwise private symbols would require
|
||||||
|
# an unsupported relocation on PIE-enabled distros such as ubuntu - BLST itself
|
||||||
|
# solves this via a linker script which is messy
|
||||||
|
switch("passC", "-fvisibility=hidden")
|
||||||
|
|
||||||
# show C compiler warnings
|
# show C compiler warnings
|
||||||
if defined(cwarnings):
|
if defined(cwarnings):
|
||||||
let common_gcc_options = "-Wno-discarded-qualifiers -Wno-incompatible-pointer-types"
|
let common_gcc_options = "-Wno-discarded-qualifiers -Wno-incompatible-pointer-types"
|
||||||
|
|
|
@ -320,12 +320,13 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
|
||||||
eth1Chain = Eth1Chain.init(cfg, db, 0, default Eth2Digest)
|
eth1Chain = Eth1Chain.init(cfg, db, 0, default Eth2Digest)
|
||||||
merkleizer = DepositsMerkleizer.init(depositTreeSnapshot.depositContractState)
|
merkleizer = DepositsMerkleizer.init(depositTreeSnapshot.depositContractState)
|
||||||
taskpool = Taskpool.new()
|
taskpool = Taskpool.new()
|
||||||
verifier = BatchVerifier(rng: rng, taskpool: taskpool)
|
verifier = BatchVerifier.init(rng, taskpool)
|
||||||
quarantine = newClone(Quarantine.init())
|
quarantine = newClone(Quarantine.init())
|
||||||
attPool = AttestationPool.init(dag, quarantine)
|
attPool = AttestationPool.init(dag, quarantine)
|
||||||
batchCrypto = BatchCrypto.new(
|
batchCrypto = BatchCrypto.new(
|
||||||
rng, eager = func(): bool = true,
|
rng, eager = func(): bool = true,
|
||||||
genesis_validators_root = dag.genesis_validators_root, taskpool)
|
genesis_validators_root = dag.genesis_validators_root,
|
||||||
|
taskpool).expect("working batcher")
|
||||||
syncCommitteePool = newClone SyncCommitteeMsgPool.init(rng, cfg)
|
syncCommitteePool = newClone SyncCommitteeMsgPool.init(rng, cfg)
|
||||||
timers: array[Timers, RunningStat]
|
timers: array[Timers, RunningStat]
|
||||||
attesters: RunningStat
|
attesters: RunningStat
|
||||||
|
|
|
@ -351,7 +351,7 @@ proc doRunTest(path: string, fork: ConsensusFork) =
|
||||||
let
|
let
|
||||||
rng = HmacDrbgContext.new()
|
rng = HmacDrbgContext.new()
|
||||||
taskpool = Taskpool.new()
|
taskpool = Taskpool.new()
|
||||||
var verifier = BatchVerifier(rng: rng, taskpool: taskpool)
|
var verifier = BatchVerifier.init(rng, taskpool)
|
||||||
|
|
||||||
let steps = loadOps(path, fork)
|
let steps = loadOps(path, fork)
|
||||||
var time = stores.fkChoice.checkpoints.time
|
var time = stores.fkChoice.checkpoints.time
|
||||||
|
|
|
@ -65,7 +65,7 @@ suite "Attestation pool processing" & preset():
|
||||||
ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 6),
|
ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 6),
|
||||||
validatorMonitor, {})
|
validatorMonitor, {})
|
||||||
taskpool = Taskpool.new()
|
taskpool = Taskpool.new()
|
||||||
verifier = BatchVerifier(rng: rng, taskpool: taskpool)
|
verifier = BatchVerifier.init(rng, taskpool)
|
||||||
quarantine = newClone(Quarantine.init())
|
quarantine = newClone(Quarantine.init())
|
||||||
pool = newClone(AttestationPool.init(dag, quarantine))
|
pool = newClone(AttestationPool.init(dag, quarantine))
|
||||||
state = newClone(dag.headState)
|
state = newClone(dag.headState)
|
||||||
|
|
|
@ -40,7 +40,7 @@ suite "Block processor" & preset():
|
||||||
validatorMonitor = newClone(ValidatorMonitor.init())
|
validatorMonitor = newClone(ValidatorMonitor.init())
|
||||||
dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {})
|
dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {})
|
||||||
taskpool = Taskpool.new()
|
taskpool = Taskpool.new()
|
||||||
verifier = BatchVerifier(rng: rng, taskpool: taskpool)
|
verifier = BatchVerifier.init(rng, taskpool)
|
||||||
quarantine = newClone(Quarantine.init())
|
quarantine = newClone(Quarantine.init())
|
||||||
blobQuarantine = newClone(BlobQuarantine())
|
blobQuarantine = newClone(BlobQuarantine())
|
||||||
attestationPool = newClone(AttestationPool.init(dag, quarantine))
|
attestationPool = newClone(AttestationPool.init(dag, quarantine))
|
||||||
|
|
|
@ -44,7 +44,8 @@ suite "Block pool processing" & preset():
|
||||||
db = makeTestDB(SLOTS_PER_EPOCH)
|
db = makeTestDB(SLOTS_PER_EPOCH)
|
||||||
validatorMonitor = newClone(ValidatorMonitor.init())
|
validatorMonitor = newClone(ValidatorMonitor.init())
|
||||||
dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {})
|
dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {})
|
||||||
verifier = BatchVerifier(rng: rng, taskpool: Taskpool.new())
|
taskpool = Taskpool.new()
|
||||||
|
verifier = BatchVerifier.init(rng, taskpool)
|
||||||
quarantine = Quarantine.init()
|
quarantine = Quarantine.init()
|
||||||
state = newClone(dag.headState)
|
state = newClone(dag.headState)
|
||||||
cache = StateCache()
|
cache = StateCache()
|
||||||
|
@ -293,7 +294,8 @@ suite "Block pool altair processing" & preset():
|
||||||
db = makeTestDB(SLOTS_PER_EPOCH)
|
db = makeTestDB(SLOTS_PER_EPOCH)
|
||||||
validatorMonitor = newClone(ValidatorMonitor.init())
|
validatorMonitor = newClone(ValidatorMonitor.init())
|
||||||
dag = init(ChainDAGRef, cfg, db, validatorMonitor, {})
|
dag = init(ChainDAGRef, cfg, db, validatorMonitor, {})
|
||||||
verifier = BatchVerifier(rng: rng, taskpool: Taskpool.new())
|
taskpool = Taskpool.new()
|
||||||
|
verifier = BatchVerifier.init(rng, taskpool)
|
||||||
quarantine = Quarantine.init()
|
quarantine = Quarantine.init()
|
||||||
state = newClone(dag.headState)
|
state = newClone(dag.headState)
|
||||||
cache = StateCache()
|
cache = StateCache()
|
||||||
|
@ -369,7 +371,8 @@ suite "chain DAG finalization tests" & preset():
|
||||||
db = makeTestDB(SLOTS_PER_EPOCH)
|
db = makeTestDB(SLOTS_PER_EPOCH)
|
||||||
validatorMonitor = newClone(ValidatorMonitor.init())
|
validatorMonitor = newClone(ValidatorMonitor.init())
|
||||||
dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {})
|
dag = init(ChainDAGRef, defaultRuntimeConfig, db, validatorMonitor, {})
|
||||||
verifier = BatchVerifier(rng: rng, taskpool: Taskpool.new())
|
taskpool = Taskpool.new()
|
||||||
|
verifier = BatchVerifier.init(rng, taskpool)
|
||||||
quarantine = Quarantine.init()
|
quarantine = Quarantine.init()
|
||||||
cache = StateCache()
|
cache = StateCache()
|
||||||
info = ForkedEpochInfo()
|
info = ForkedEpochInfo()
|
||||||
|
@ -639,7 +642,8 @@ suite "Old database versions" & preset():
|
||||||
{skipBlsValidation}))
|
{skipBlsValidation}))
|
||||||
genBlock = get_initial_beacon_block(genState[])
|
genBlock = get_initial_beacon_block(genState[])
|
||||||
var
|
var
|
||||||
verifier = BatchVerifier(rng: rng, taskpool: Taskpool.new())
|
taskpool = Taskpool.new()
|
||||||
|
verifier = BatchVerifier.init(rng, taskpool)
|
||||||
quarantine = Quarantine.init()
|
quarantine = Quarantine.init()
|
||||||
|
|
||||||
test "pre-1.1.0":
|
test "pre-1.1.0":
|
||||||
|
@ -687,7 +691,8 @@ suite "Diverging hardforks":
|
||||||
db = makeTestDB(SLOTS_PER_EPOCH)
|
db = makeTestDB(SLOTS_PER_EPOCH)
|
||||||
validatorMonitor = newClone(ValidatorMonitor.init())
|
validatorMonitor = newClone(ValidatorMonitor.init())
|
||||||
dag = init(ChainDAGRef, phase0RuntimeConfig, db, validatorMonitor, {})
|
dag = init(ChainDAGRef, phase0RuntimeConfig, db, validatorMonitor, {})
|
||||||
verifier = BatchVerifier(rng: rng, taskpool: Taskpool.new())
|
taskpool = Taskpool.new()
|
||||||
|
verifier = BatchVerifier.init(rng, taskpool)
|
||||||
quarantine = newClone(Quarantine.init())
|
quarantine = newClone(Quarantine.init())
|
||||||
cache = StateCache()
|
cache = StateCache()
|
||||||
info = ForkedEpochInfo()
|
info = ForkedEpochInfo()
|
||||||
|
@ -929,7 +934,7 @@ suite "Backfill":
|
||||||
taskpool = Taskpool.new()
|
taskpool = Taskpool.new()
|
||||||
var
|
var
|
||||||
cache: StateCache
|
cache: StateCache
|
||||||
verifier = BatchVerifier(rng: rng, taskpool: taskpool)
|
verifier = BatchVerifier.init(rng, taskpool)
|
||||||
quarantine = newClone(Quarantine.init())
|
quarantine = newClone(Quarantine.init())
|
||||||
|
|
||||||
let
|
let
|
||||||
|
@ -1070,7 +1075,8 @@ suite "Latest valid hash" & preset():
|
||||||
db = makeTestDB(SLOTS_PER_EPOCH)
|
db = makeTestDB(SLOTS_PER_EPOCH)
|
||||||
validatorMonitor = newClone(ValidatorMonitor.init())
|
validatorMonitor = newClone(ValidatorMonitor.init())
|
||||||
dag = init(ChainDAGRef, runtimeConfig, db, validatorMonitor, {})
|
dag = init(ChainDAGRef, runtimeConfig, db, validatorMonitor, {})
|
||||||
verifier = BatchVerifier(rng: rng, taskpool: Taskpool.new())
|
taskpool = Taskpool.new()
|
||||||
|
verifier = BatchVerifier.init(rng, taskpool)
|
||||||
quarantine = newClone(Quarantine.init())
|
quarantine = newClone(Quarantine.init())
|
||||||
cache = StateCache()
|
cache = StateCache()
|
||||||
info = ForkedEpochInfo()
|
info = ForkedEpochInfo()
|
||||||
|
@ -1139,7 +1145,8 @@ suite "Pruning":
|
||||||
tmpState = assignClone(dag.headState)
|
tmpState = assignClone(dag.headState)
|
||||||
|
|
||||||
var
|
var
|
||||||
verifier = BatchVerifier(rng: rng, taskpool: Taskpool.new())
|
taskpool = Taskpool.new()
|
||||||
|
verifier = BatchVerifier.init(rng, taskpool)
|
||||||
quarantine = Quarantine.init()
|
quarantine = Quarantine.init()
|
||||||
cache = StateCache()
|
cache = StateCache()
|
||||||
blocks = @[dag.head]
|
blocks = @[dag.head]
|
||||||
|
@ -1196,7 +1203,7 @@ suite "Ancestry":
|
||||||
type Node = tuple[blck: BlockRef, state: ref phase0.HashedBeaconState]
|
type Node = tuple[blck: BlockRef, state: ref phase0.HashedBeaconState]
|
||||||
template bid(n: Node): BlockId = n.blck.bid
|
template bid(n: Node): BlockId = n.blck.bid
|
||||||
|
|
||||||
var verifier = BatchVerifier(rng: rng, taskpool: taskpool)
|
var verifier = BatchVerifier.init(rng, taskpool)
|
||||||
proc addBlock(parent: Node, slot: Slot): Node =
|
proc addBlock(parent: Node, slot: Slot): Node =
|
||||||
dag.updateHead(parent.blck, quarantine[], [])
|
dag.updateHead(parent.blck, quarantine[], [])
|
||||||
|
|
||||||
|
@ -1491,7 +1498,7 @@ template runShufflingTests(cfg: RuntimeConfig, numRandomTests: int) =
|
||||||
taskpool = Taskpool.new()
|
taskpool = Taskpool.new()
|
||||||
|
|
||||||
var
|
var
|
||||||
verifier = BatchVerifier(rng: rng, taskpool: taskpool)
|
verifier = BatchVerifier.init(rng, taskpool)
|
||||||
graffiti: GraffitiBytes
|
graffiti: GraffitiBytes
|
||||||
proc addBlocks(blocks: uint64, attested: bool, cache: var StateCache) =
|
proc addBlocks(blocks: uint64, attested: bool, cache: var StateCache) =
|
||||||
inc distinctBase(graffiti)[0] # Avoid duplicate blocks across branches
|
inc distinctBase(graffiti)[0] # Avoid duplicate blocks across branches
|
||||||
|
|
|
@ -43,7 +43,7 @@ suite "Gossip validation " & preset():
|
||||||
ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 3),
|
ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 3),
|
||||||
validatorMonitor, {})
|
validatorMonitor, {})
|
||||||
taskpool = Taskpool.new()
|
taskpool = Taskpool.new()
|
||||||
verifier = BatchVerifier(rng: rng, taskpool: taskpool)
|
verifier = BatchVerifier.init(rng, taskpool)
|
||||||
quarantine = newClone(Quarantine.init())
|
quarantine = newClone(Quarantine.init())
|
||||||
pool = newClone(AttestationPool.init(dag, quarantine))
|
pool = newClone(AttestationPool.init(dag, quarantine))
|
||||||
state = newClone(dag.headState)
|
state = newClone(dag.headState)
|
||||||
|
@ -51,7 +51,8 @@ suite "Gossip validation " & preset():
|
||||||
info = ForkedEpochInfo()
|
info = ForkedEpochInfo()
|
||||||
batchCrypto = BatchCrypto.new(
|
batchCrypto = BatchCrypto.new(
|
||||||
rng, eager = proc(): bool = false,
|
rng, eager = proc(): bool = false,
|
||||||
genesis_validators_root = dag.genesis_validators_root, taskpool)
|
genesis_validators_root = dag.genesis_validators_root, taskpool).expect(
|
||||||
|
"working batcher")
|
||||||
# Slot 0 is a finalized slot - won't be making attestations for it..
|
# Slot 0 is a finalized slot - won't be making attestations for it..
|
||||||
check:
|
check:
|
||||||
process_slots(
|
process_slots(
|
||||||
|
@ -190,7 +191,7 @@ suite "Gossip validation - Extra": # Not based on preset config
|
||||||
quarantine = newClone(Quarantine.init())
|
quarantine = newClone(Quarantine.init())
|
||||||
rng = HmacDrbgContext.new()
|
rng = HmacDrbgContext.new()
|
||||||
var
|
var
|
||||||
verifier = BatchVerifier(rng: rng, taskpool: Taskpool.new())
|
verifier = BatchVerifier.init(rng, taskpool)
|
||||||
dag = block:
|
dag = block:
|
||||||
let
|
let
|
||||||
validatorMonitor = newClone(ValidatorMonitor.init())
|
validatorMonitor = newClone(ValidatorMonitor.init())
|
||||||
|
@ -223,7 +224,8 @@ suite "Gossip validation - Extra": # Not based on preset config
|
||||||
|
|
||||||
let batchCrypto = BatchCrypto.new(
|
let batchCrypto = BatchCrypto.new(
|
||||||
rng, eager = proc(): bool = false,
|
rng, eager = proc(): bool = false,
|
||||||
genesis_validators_root = dag.genesis_validators_root, taskpool)
|
genesis_validators_root = dag.genesis_validators_root, taskpool).expect(
|
||||||
|
"working batcher")
|
||||||
|
|
||||||
var
|
var
|
||||||
state = assignClone(dag.headState.altairData)
|
state = assignClone(dag.headState.altairData)
|
||||||
|
|
|
@ -96,7 +96,8 @@ suite "Light client" & preset():
|
||||||
quarantine = newClone(Quarantine.init())
|
quarantine = newClone(Quarantine.init())
|
||||||
rng = HmacDrbgContext.new()
|
rng = HmacDrbgContext.new()
|
||||||
taskpool = Taskpool.new()
|
taskpool = Taskpool.new()
|
||||||
var verifier = BatchVerifier(rng: rng, taskpool: taskpool)
|
var
|
||||||
|
verifier = BatchVerifier.init(rng, taskpool)
|
||||||
|
|
||||||
test "Pre-Altair":
|
test "Pre-Altair":
|
||||||
# Genesis
|
# Genesis
|
||||||
|
|
|
@ -44,7 +44,7 @@ suite "Light client processor" & preset():
|
||||||
quarantine = newClone(Quarantine.init())
|
quarantine = newClone(Quarantine.init())
|
||||||
rng = HmacDrbgContext.new()
|
rng = HmacDrbgContext.new()
|
||||||
taskpool = Taskpool.new()
|
taskpool = Taskpool.new()
|
||||||
var verifier = BatchVerifier(rng: rng, taskpool: taskpool)
|
var verifier =BatchVerifier.init(rng, taskpool)
|
||||||
|
|
||||||
var cache: StateCache
|
var cache: StateCache
|
||||||
proc addBlocks(blocks: uint64, syncCommitteeRatio: float) =
|
proc addBlocks(blocks: uint64, syncCommitteeRatio: float) =
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit b71a16312699f6d26dee6a710b4aafea6ee5680d
|
Subproject commit 5937eb9a56d8398208efdc0e60bc42dd53313a81
|
|
@ -1 +1 @@
|
||||||
Subproject commit e04c042e8acfe0025c780de8a025aa4c4e042130
|
Subproject commit 6b4f5a1d23b1583b2b0ccee409e2e7c6dc6fff93
|
|
@ -1 +1 @@
|
||||||
Subproject commit 89d693d3ffc9e53aa470a9a05166e4f2b58d282a
|
Subproject commit ffba69121689e14c0aa286c885d00b90889571e6
|
Loading…
Reference in New Issue