Parallel attestation verification (#2718)
* Add parallel attestation verification * Update tests, batchVerify doesn't use the threadpool with only single core (nim-blscurve update) * bump nim-blscurve * Debug info for failing eth2 test vectors * remove submodule eth2-testnets * verbose debugging of make failure on Windows (libbacktrace?) * Remove CI debug mode * initialization convention * Fix new altair tests
This commit is contained in:
parent
7bc1737653
commit
d1cb5b7220
|
@ -203,7 +203,9 @@
|
|||
url = https://github.com/status-im/nim-presto.git
|
||||
ignore = untracked
|
||||
branch = master
|
||||
|
||||
[submodule "vendor/eth2-networks"]
|
||||
path = vendor/eth2-networks
|
||||
url = https://github.com/eth2-clients/eth2-networks.git
|
||||
[submodule "vendor/nim-taskpools"]
|
||||
path = vendor/nim-taskpools
|
||||
url = https://github.com/status-im/nim-taskpools
|
||||
|
|
|
@ -8,7 +8,7 @@
|
|||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
stew/[assign2, objects, results],
|
||||
stew/[assign2, results],
|
||||
serialization,
|
||||
eth/db/kvstore,
|
||||
./spec/datatypes/[base, altair],
|
||||
|
|
|
@ -12,6 +12,7 @@ import
|
|||
|
||||
# Nimble packages
|
||||
chronos, json_rpc/servers/httpserver, presto,
|
||||
taskpools,
|
||||
|
||||
# Local modules
|
||||
./conf, ./beacon_clock, ./beacon_chain_db,
|
||||
|
@ -31,6 +32,7 @@ export
|
|||
|
||||
type
|
||||
RpcServer* = RpcHttpServer
|
||||
TaskPoolPtr* = TaskPool
|
||||
|
||||
GossipState* = enum
|
||||
Disconnected
|
||||
|
@ -65,6 +67,7 @@ type
|
|||
attachedValidatorBalanceTotal*: uint64
|
||||
gossipState*: GossipState
|
||||
beaconClock*: BeaconClock
|
||||
taskpool*: TaskPoolPtr
|
||||
|
||||
const
|
||||
MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT
|
||||
|
|
|
@ -156,6 +156,11 @@ type
|
|||
desc: "The slashing DB flavour to use"
|
||||
name: "slashing-db-kind" }: SlashingDbKind
|
||||
|
||||
numThreads* {.
|
||||
defaultValue: 1,
|
||||
desc: "Number of threads used (0 to use all logical threads)"
|
||||
name: "num-threads" }: int
|
||||
|
||||
case cmd* {.
|
||||
command
|
||||
defaultValue: noCommand }: BNStartUpCmd
|
||||
|
|
|
@ -61,12 +61,13 @@ template asTrusted(x: altair.SignedBeaconBlock or altair.SigVerifiedBeaconBlock)
|
|||
## This assumes that their bytes representation is the same.
|
||||
isomorphicCast[altair.TrustedSignedBeaconBlock](x)
|
||||
|
||||
func batchVerify(quarantine: QuarantineRef, sigs: openArray[SignatureSet]): bool =
|
||||
proc batchVerify(quarantine: QuarantineRef, sigs: openArray[SignatureSet]): bool =
|
||||
var secureRandomBytes: array[32, byte]
|
||||
quarantine.rng[].brHmacDrbgGenerate(secureRandomBytes)
|
||||
|
||||
# TODO: For now only enable serial batch verification
|
||||
return batchVerifySerial(quarantine.sigVerifCache, sigs, secureRandomBytes)
|
||||
try:
|
||||
return quarantine.taskpool.batchVerify(quarantine.sigVerifCache, sigs, secureRandomBytes)
|
||||
except Exception as exc:
|
||||
raise newException(Defect, "Unexpected exception in batchVerify.")
|
||||
|
||||
proc addRawBlock*(
|
||||
dag: ChainDAGRef, quarantine: QuarantineRef,
|
||||
|
|
|
@ -12,7 +12,7 @@ import
|
|||
std/[sets, tables, hashes],
|
||||
# Status libraries
|
||||
stew/endians2, chronicles,
|
||||
eth/keys,
|
||||
eth/keys, taskpools,
|
||||
# Internals
|
||||
../spec/[signatures_batch, forks],
|
||||
../spec/datatypes/[phase0, altair],
|
||||
|
@ -78,6 +78,10 @@ type
|
|||
|
||||
inAdd*: bool
|
||||
|
||||
taskpool*: TaskPoolPtr
|
||||
|
||||
TaskPoolPtr* = TaskPool
|
||||
|
||||
MissingBlock* = object
|
||||
tries*: int
|
||||
|
||||
|
|
|
@ -20,9 +20,8 @@ export options, block_pools_types
|
|||
logScope:
|
||||
topics = "quarant"
|
||||
|
||||
func init*(T: type QuarantineRef, rng: ref BrHmacDrbgContext): T =
|
||||
result = T()
|
||||
result.rng = rng
|
||||
func init*(T: type QuarantineRef, rng: ref BrHmacDrbgContext, taskpool: TaskpoolPtr): T =
|
||||
T(rng: rng, taskpool: taskpool)
|
||||
|
||||
func checkMissing*(quarantine: QuarantineRef): seq[FetchRecord] =
|
||||
## Return a list of blocks that we should try to resolve from other client -
|
||||
|
|
|
@ -11,7 +11,7 @@ import
|
|||
# Status
|
||||
chronicles, chronos,
|
||||
stew/results,
|
||||
eth/keys,
|
||||
eth/keys, taskpools,
|
||||
# Internals
|
||||
../spec/[helpers, signatures_batch],
|
||||
../spec/datatypes/base,
|
||||
|
@ -67,6 +67,10 @@ type
|
|||
rng: ref BrHmacDrbgContext ##\
|
||||
## A reference to the Nimbus application-wide RNG
|
||||
pruneTime: Moment ## :ast time we had to prune something
|
||||
## A pointer to the Nimbus application-wide threadpool
|
||||
taskpool: TaskPoolPtr
|
||||
|
||||
TaskPoolPtr* = TaskPool
|
||||
|
||||
const
|
||||
# We cap waiting for an idle slot in case there's a lot of network traffic
|
||||
|
@ -84,8 +88,9 @@ const
|
|||
BatchedCryptoSize = 16
|
||||
|
||||
proc new*(
|
||||
T: type BatchCrypto, rng: ref BrHmacDrbgContext, eager: Eager): ref BatchCrypto =
|
||||
(ref BatchCrypto)(rng: rng, eager: eager, pruneTime: Moment.now())
|
||||
T: type BatchCrypto, rng: ref BrHmacDrbgContext,
|
||||
eager: Eager, taskpool: TaskPoolPtr): ref BatchCrypto =
|
||||
(ref BatchCrypto)(rng: rng, eager: eager, pruneTime: Moment.now(), taskpool: taskpool)
|
||||
|
||||
func len(batch: Batch): int =
|
||||
doAssert batch.resultsBuffer.len() == batch.pendingBuffer.len()
|
||||
|
@ -146,11 +151,13 @@ proc processBatch(batchCrypto: ref BatchCrypto) =
|
|||
var secureRandomBytes: array[32, byte]
|
||||
batchCrypto[].rng[].brHmacDrbgGenerate(secureRandomBytes)
|
||||
|
||||
# TODO: For now only enable serial batch verification
|
||||
let ok = batchVerifySerial(
|
||||
batchCrypto.sigVerifCache,
|
||||
batch.pendingBuffer,
|
||||
secureRandomBytes)
|
||||
let ok = try:
|
||||
batchCrypto.taskpool.batchVerify(
|
||||
batchCrypto.sigVerifCache,
|
||||
batch.pendingBuffer,
|
||||
secureRandomBytes)
|
||||
except Exception as exc:
|
||||
raise newException(Defect, "Unexpected exception in batchVerify.")
|
||||
|
||||
trace "batch crypto - finished",
|
||||
batchSize,
|
||||
|
|
|
@ -102,7 +102,9 @@ proc new*(T: type Eth2Processor,
|
|||
syncCommitteeMsgPool: SyncCommitteeMsgPoolRef,
|
||||
quarantine: QuarantineRef,
|
||||
rng: ref BrHmacDrbgContext,
|
||||
getBeaconTime: GetBeaconTimeFn): ref Eth2Processor =
|
||||
getBeaconTime: GetBeaconTimeFn,
|
||||
taskpool: batch_validation.TaskPoolPtr
|
||||
): ref Eth2Processor =
|
||||
(ref Eth2Processor)(
|
||||
doppelGangerDetectionEnabled: doppelGangerDetectionEnabled,
|
||||
doppelgangerDetection: DoppelgangerProtection(
|
||||
|
@ -119,7 +121,8 @@ proc new*(T: type Eth2Processor,
|
|||
rng = rng,
|
||||
# Only run eager attestation signature verification if we're not
|
||||
# processing blocks in order to give priority to block processing
|
||||
eager = proc(): bool = not blockProcessor[].hasBlocks())
|
||||
eager = proc(): bool = not blockProcessor[].hasBlocks(),
|
||||
taskpool)
|
||||
)
|
||||
|
||||
# Gossip Management
|
||||
|
|
|
@ -16,6 +16,7 @@ import
|
|||
chronos, confutils, metrics, metrics/chronos_httpserver,
|
||||
chronicles, bearssl, blscurve, presto,
|
||||
json_serialization/std/[options, sets, net], serialization/errors,
|
||||
taskpools,
|
||||
|
||||
eth/[keys, async_utils], eth/net/nat,
|
||||
eth/db/[kvstore, kvstore_sqlite3],
|
||||
|
@ -109,6 +110,22 @@ proc init*(T: type BeaconNode,
|
|||
genesisStateContents: string,
|
||||
genesisDepositsSnapshotContents: string): BeaconNode {.
|
||||
raises: [Defect, CatchableError].} =
|
||||
|
||||
var taskpool: TaskpoolPtr
|
||||
|
||||
try:
|
||||
if config.numThreads < 0:
|
||||
fatal "The number of threads --numThreads cannot be negative."
|
||||
quit 1
|
||||
elif config.numThreads == 0:
|
||||
taskpool = TaskpoolPtr.new()
|
||||
else:
|
||||
taskpool = TaskpoolPtr.new(numThreads = config.numThreads)
|
||||
|
||||
info "Threadpool started", numThreads = taskpool.numThreads
|
||||
except Exception as exc:
|
||||
raise newException(Defect, "Failure in taskpool initialization.")
|
||||
|
||||
let
|
||||
db = BeaconChainDB.new(config.databaseDir, inMemory = false)
|
||||
|
||||
|
@ -239,7 +256,7 @@ proc init*(T: type BeaconNode,
|
|||
chainDagFlags = if config.verifyFinalization: {verifyFinalization}
|
||||
else: {}
|
||||
dag = ChainDAGRef.init(cfg, db, chainDagFlags)
|
||||
quarantine = QuarantineRef.init(rng)
|
||||
quarantine = QuarantineRef.init(rng, taskpool)
|
||||
databaseGenesisValidatorsRoot =
|
||||
getStateField(dag.headState.data, genesis_validators_root)
|
||||
|
||||
|
@ -345,7 +362,7 @@ proc init*(T: type BeaconNode,
|
|||
processor = Eth2Processor.new(
|
||||
config.doppelgangerDetection,
|
||||
blockProcessor, dag, attestationPool, exitPool, validatorPool,
|
||||
syncCommitteeMsgPool, quarantine, rng, getBeaconTime)
|
||||
syncCommitteeMsgPool, quarantine, rng, getBeaconTime, taskpool)
|
||||
|
||||
var node = BeaconNode(
|
||||
nickname: nickname,
|
||||
|
@ -369,7 +386,8 @@ proc init*(T: type BeaconNode,
|
|||
blockProcessor: blockProcessor,
|
||||
consensusManager: consensusManager,
|
||||
requestManager: RequestManager.init(network, blockProcessor),
|
||||
beaconClock: beaconClock
|
||||
beaconClock: beaconClock,
|
||||
taskpool: taskpool
|
||||
)
|
||||
|
||||
# set topic validation routine
|
||||
|
|
|
@ -18,7 +18,7 @@ import
|
|||
math, stats, times, strformat,
|
||||
tables, options, random, tables, os,
|
||||
confutils, chronicles, eth/db/kvstore_sqlite3,
|
||||
chronos/timer, eth/keys,
|
||||
chronos/timer, eth/keys, taskpools,
|
||||
../tests/testblockutil,
|
||||
../beacon_chain/spec/[
|
||||
beaconstate, forks, helpers, signatures, state_transition],
|
||||
|
@ -89,7 +89,8 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
|
|||
dag = ChainDAGRef.init(cfg, db, {})
|
||||
eth1Chain = Eth1Chain.init(cfg, db)
|
||||
merkleizer = depositContractSnapshot.createMerkleizer
|
||||
quarantine = QuarantineRef.init(keys.newRng())
|
||||
taskpool = Taskpool.new()
|
||||
quarantine = QuarantineRef.init(keys.newRng(), taskpool)
|
||||
attPool = AttestationPool.init(dag, quarantine)
|
||||
syncCommitteePool = newClone SyncCommitteeMsgPool.init()
|
||||
timers: array[Timers, RunningStat]
|
||||
|
|
|
@ -13,7 +13,7 @@ import
|
|||
unittest2,
|
||||
chronicles, chronos,
|
||||
stew/byteutils,
|
||||
eth/keys,
|
||||
eth/keys, taskpools,
|
||||
# Internal
|
||||
../beacon_chain/[beacon_node_types],
|
||||
../beacon_chain/gossip_processing/[gossip_validation],
|
||||
|
@ -60,7 +60,8 @@ suite "Attestation pool processing" & preset():
|
|||
# Genesis state that results in 6 members per committee
|
||||
var
|
||||
dag = init(ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 6), {})
|
||||
quarantine = QuarantineRef.init(keys.newRng())
|
||||
taskpool = Taskpool.new()
|
||||
quarantine = QuarantineRef.init(keys.newRng(), taskpool)
|
||||
pool = newClone(AttestationPool.init(dag, quarantine))
|
||||
state = newClone(dag.headState)
|
||||
cache = StateCache()
|
||||
|
|
|
@ -12,7 +12,7 @@ import
|
|||
std/[options, sequtils],
|
||||
unittest2,
|
||||
stew/assign2,
|
||||
eth/keys,
|
||||
eth/keys, taskpools,
|
||||
../beacon_chain/spec/datatypes/base,
|
||||
../beacon_chain/spec/[beaconstate, forks, helpers, state_transition],
|
||||
../beacon_chain/beacon_node_types,
|
||||
|
@ -120,7 +120,8 @@ suite "Block pool processing" & preset():
|
|||
var
|
||||
db = makeTestDB(SLOTS_PER_EPOCH)
|
||||
dag = init(ChainDAGRef, defaultRuntimeConfig, db, {})
|
||||
quarantine = QuarantineRef.init(keys.newRng())
|
||||
taskpool = Taskpool.new()
|
||||
quarantine = QuarantineRef.init(keys.newRng(), taskpool)
|
||||
nilPhase0Callback: OnPhase0BlockAdded
|
||||
state = newClone(dag.headState.data)
|
||||
cache = StateCache()
|
||||
|
@ -347,7 +348,9 @@ suite "chain DAG finalization tests" & preset():
|
|||
var
|
||||
db = makeTestDB(SLOTS_PER_EPOCH)
|
||||
dag = init(ChainDAGRef, defaultRuntimeConfig, db, {})
|
||||
quarantine = QuarantineRef.init(keys.newRng())
|
||||
taskpool = Taskpool.new()
|
||||
quarantine = QuarantineRef.init(keys.newRng(), taskpool)
|
||||
nilPhase0Callback: OnPhase0BlockAdded
|
||||
cache = StateCache()
|
||||
rewards = RewardInfo()
|
||||
|
||||
|
@ -539,7 +542,8 @@ suite "Old database versions" & preset():
|
|||
makeInitialDeposits(SLOTS_PER_EPOCH.uint64, flags = {skipBlsValidation}),
|
||||
{skipBlsValidation})
|
||||
genBlock = get_initial_beacon_block(genState[])
|
||||
quarantine = QuarantineRef.init(keys.newRng())
|
||||
taskpool = Taskpool.new()
|
||||
quarantine = QuarantineRef.init(keys.newRng(), taskpool)
|
||||
|
||||
test "pre-1.1.0":
|
||||
# only kvstore, no immutable validator keys
|
||||
|
|
|
@ -7,10 +7,10 @@
|
|||
|
||||
{.used.}
|
||||
|
||||
import chronicles
|
||||
import eth/keys
|
||||
import ../beacon_chain/spec/datatypes/base
|
||||
import ../beacon_chain/consensus_object_pools/[blockchain_dag, exit_pool]
|
||||
import chronicles, chronos
|
||||
import eth/keys, taskpools
|
||||
import ../beacon_chain/spec/[datatypes/base, presets]
|
||||
import ../beacon_chain/consensus_object_pools/[block_quarantine, blockchain_dag, exit_pool]
|
||||
import "."/[testutil, testdbutil]
|
||||
|
||||
proc getExitPool(): auto =
|
||||
|
|
|
@ -11,7 +11,7 @@ import
|
|||
# Status lib
|
||||
unittest2,
|
||||
chronicles, chronos,
|
||||
eth/keys,
|
||||
eth/keys, taskpools,
|
||||
# Internal
|
||||
../beacon_chain/[beacon_node_types, beacon_clock],
|
||||
../beacon_chain/gossip_processing/[gossip_validation, batch_validation],
|
||||
|
@ -33,12 +33,13 @@ suite "Gossip validation " & preset():
|
|||
# Genesis state that results in 3 members per committee
|
||||
var
|
||||
dag = init(ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 3), {})
|
||||
quarantine = QuarantineRef.init(keys.newRng())
|
||||
taskpool = Taskpool.new()
|
||||
quarantine = QuarantineRef.init(keys.newRng(), taskpool)
|
||||
pool = newClone(AttestationPool.init(dag, quarantine))
|
||||
state = newClone(dag.headState)
|
||||
cache = StateCache()
|
||||
rewards = RewardInfo()
|
||||
batchCrypto = BatchCrypto.new(keys.newRng(), eager = proc(): bool = false)
|
||||
batchCrypto = BatchCrypto.new(keys.newRng(), eager = proc(): bool = false, taskpool)
|
||||
# Slot 0 is a finalized slot - won't be making attestations for it..
|
||||
check:
|
||||
process_slots(
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit e9b75abcefc2c96a3ae2d5cd623779b9a6ee5496
|
||||
Subproject commit 8cbc4ae8709b94c026a015dd03933e5bbe8a0040
|
|
@ -0,0 +1 @@
|
|||
Subproject commit 39b90fa4a31edeafa035cedfe938767b9ff23859
|
Loading…
Reference in New Issue