diff --git a/.gitmodules b/.gitmodules index 46c271b75..29bb1fc78 100644 --- a/.gitmodules +++ b/.gitmodules @@ -203,7 +203,9 @@ url = https://github.com/status-im/nim-presto.git ignore = untracked branch = master - [submodule "vendor/eth2-networks"] path = vendor/eth2-networks url = https://github.com/eth2-clients/eth2-networks.git +[submodule "vendor/nim-taskpools"] + path = vendor/nim-taskpools + url = https://github.com/status-im/nim-taskpools diff --git a/beacon_chain/beacon_chain_db_immutable.nim b/beacon_chain/beacon_chain_db_immutable.nim index 27b3e4667..89322327e 100644 --- a/beacon_chain/beacon_chain_db_immutable.nim +++ b/beacon_chain/beacon_chain_db_immutable.nim @@ -8,7 +8,7 @@ {.push raises: [Defect].} import - stew/[assign2, objects, results], + stew/[assign2, results], serialization, eth/db/kvstore, ./spec/datatypes/[base, altair], diff --git a/beacon_chain/beacon_node_common.nim b/beacon_chain/beacon_node_common.nim index 01e7ba46a..a01a1685e 100644 --- a/beacon_chain/beacon_node_common.nim +++ b/beacon_chain/beacon_node_common.nim @@ -12,6 +12,7 @@ import # Nimble packages chronos, json_rpc/servers/httpserver, presto, + taskpools, # Local modules ./conf, ./beacon_clock, ./beacon_chain_db, @@ -31,6 +32,7 @@ export type RpcServer* = RpcHttpServer + TaskPoolPtr* = TaskPool GossipState* = enum Disconnected @@ -65,6 +67,7 @@ type attachedValidatorBalanceTotal*: uint64 gossipState*: GossipState beaconClock*: BeaconClock + taskpool*: TaskPoolPtr const MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index a40c5914d..0014ada76 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -156,6 +156,11 @@ type desc: "The slashing DB flavour to use" name: "slashing-db-kind" }: SlashingDbKind + numThreads* {. + defaultValue: 1, + desc: "Number of threads used (0 to use all logical threads)" + name: "num-threads" }: int + case cmd* {. command defaultValue: noCommand }: BNStartUpCmd diff --git a/beacon_chain/consensus_object_pools/block_clearance.nim b/beacon_chain/consensus_object_pools/block_clearance.nim index 5f48e7c4a..9da057b4b 100644 --- a/beacon_chain/consensus_object_pools/block_clearance.nim +++ b/beacon_chain/consensus_object_pools/block_clearance.nim @@ -61,12 +61,13 @@ template asTrusted(x: altair.SignedBeaconBlock or altair.SigVerifiedBeaconBlock) ## This assumes that their bytes representation is the same. isomorphicCast[altair.TrustedSignedBeaconBlock](x) -func batchVerify(quarantine: QuarantineRef, sigs: openArray[SignatureSet]): bool = +proc batchVerify(quarantine: QuarantineRef, sigs: openArray[SignatureSet]): bool = var secureRandomBytes: array[32, byte] quarantine.rng[].brHmacDrbgGenerate(secureRandomBytes) - - # TODO: For now only enable serial batch verification - return batchVerifySerial(quarantine.sigVerifCache, sigs, secureRandomBytes) + try: + return quarantine.taskpool.batchVerify(quarantine.sigVerifCache, sigs, secureRandomBytes) + except Exception as exc: + raise newException(Defect, "Unexpected exception in batchVerify.") proc addRawBlock*( dag: ChainDAGRef, quarantine: QuarantineRef, diff --git a/beacon_chain/consensus_object_pools/block_pools_types.nim b/beacon_chain/consensus_object_pools/block_pools_types.nim index a8c6813ee..43b61ea0c 100644 --- a/beacon_chain/consensus_object_pools/block_pools_types.nim +++ b/beacon_chain/consensus_object_pools/block_pools_types.nim @@ -12,7 +12,7 @@ import std/[sets, tables, hashes], # Status libraries stew/endians2, chronicles, - eth/keys, + eth/keys, taskpools, # Internals ../spec/[signatures_batch, forks], ../spec/datatypes/[phase0, altair], @@ -78,6 +78,10 @@ type inAdd*: bool + taskpool*: TaskPoolPtr + + TaskPoolPtr* = TaskPool + MissingBlock* = object tries*: int diff --git a/beacon_chain/consensus_object_pools/block_quarantine.nim b/beacon_chain/consensus_object_pools/block_quarantine.nim index 2628c1e0f..e21ea024a 100644 --- a/beacon_chain/consensus_object_pools/block_quarantine.nim +++ b/beacon_chain/consensus_object_pools/block_quarantine.nim @@ -20,9 +20,8 @@ export options, block_pools_types logScope: topics = "quarant" -func init*(T: type QuarantineRef, rng: ref BrHmacDrbgContext): T = - result = T() - result.rng = rng +func init*(T: type QuarantineRef, rng: ref BrHmacDrbgContext, taskpool: TaskpoolPtr): T = + T(rng: rng, taskpool: taskpool) func checkMissing*(quarantine: QuarantineRef): seq[FetchRecord] = ## Return a list of blocks that we should try to resolve from other client - diff --git a/beacon_chain/gossip_processing/batch_validation.nim b/beacon_chain/gossip_processing/batch_validation.nim index b5082beaa..41af690b6 100644 --- a/beacon_chain/gossip_processing/batch_validation.nim +++ b/beacon_chain/gossip_processing/batch_validation.nim @@ -11,7 +11,7 @@ import # Status chronicles, chronos, stew/results, - eth/keys, + eth/keys, taskpools, # Internals ../spec/[helpers, signatures_batch], ../spec/datatypes/base, @@ -67,6 +67,10 @@ type rng: ref BrHmacDrbgContext ##\ ## A reference to the Nimbus application-wide RNG pruneTime: Moment ## :ast time we had to prune something + ## A pointer to the Nimbus application-wide threadpool + taskpool: TaskPoolPtr + + TaskPoolPtr* = TaskPool const # We cap waiting for an idle slot in case there's a lot of network traffic @@ -84,8 +88,9 @@ const BatchedCryptoSize = 16 proc new*( - T: type BatchCrypto, rng: ref BrHmacDrbgContext, eager: Eager): ref BatchCrypto = - (ref BatchCrypto)(rng: rng, eager: eager, pruneTime: Moment.now()) + T: type BatchCrypto, rng: ref BrHmacDrbgContext, + eager: Eager, taskpool: TaskPoolPtr): ref BatchCrypto = + (ref BatchCrypto)(rng: rng, eager: eager, pruneTime: Moment.now(), taskpool: taskpool) func len(batch: Batch): int = doAssert batch.resultsBuffer.len() == batch.pendingBuffer.len() @@ -146,11 +151,13 @@ proc processBatch(batchCrypto: ref BatchCrypto) = var secureRandomBytes: array[32, byte] batchCrypto[].rng[].brHmacDrbgGenerate(secureRandomBytes) - # TODO: For now only enable serial batch verification - let ok = batchVerifySerial( - batchCrypto.sigVerifCache, - batch.pendingBuffer, - secureRandomBytes) + let ok = try: + batchCrypto.taskpool.batchVerify( + batchCrypto.sigVerifCache, + batch.pendingBuffer, + secureRandomBytes) + except Exception as exc: + raise newException(Defect, "Unexpected exception in batchVerify.") trace "batch crypto - finished", batchSize, diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index a5cfb77d0..a46af2cc3 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -102,7 +102,9 @@ proc new*(T: type Eth2Processor, syncCommitteeMsgPool: SyncCommitteeMsgPoolRef, quarantine: QuarantineRef, rng: ref BrHmacDrbgContext, - getBeaconTime: GetBeaconTimeFn): ref Eth2Processor = + getBeaconTime: GetBeaconTimeFn, + taskpool: batch_validation.TaskPoolPtr + ): ref Eth2Processor = (ref Eth2Processor)( doppelGangerDetectionEnabled: doppelGangerDetectionEnabled, doppelgangerDetection: DoppelgangerProtection( @@ -119,7 +121,8 @@ proc new*(T: type Eth2Processor, rng = rng, # Only run eager attestation signature verification if we're not # processing blocks in order to give priority to block processing - eager = proc(): bool = not blockProcessor[].hasBlocks()) + eager = proc(): bool = not blockProcessor[].hasBlocks(), + taskpool) ) # Gossip Management diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 35200ebcb..405c3dbe1 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -16,6 +16,7 @@ import chronos, confutils, metrics, metrics/chronos_httpserver, chronicles, bearssl, blscurve, presto, json_serialization/std/[options, sets, net], serialization/errors, + taskpools, eth/[keys, async_utils], eth/net/nat, eth/db/[kvstore, kvstore_sqlite3], @@ -109,6 +110,22 @@ proc init*(T: type BeaconNode, genesisStateContents: string, genesisDepositsSnapshotContents: string): BeaconNode {. raises: [Defect, CatchableError].} = + + var taskpool: TaskpoolPtr + + try: + if config.numThreads < 0: + fatal "The number of threads --numThreads cannot be negative." + quit 1 + elif config.numThreads == 0: + taskpool = TaskpoolPtr.new() + else: + taskpool = TaskpoolPtr.new(numThreads = config.numThreads) + + info "Threadpool started", numThreads = taskpool.numThreads + except Exception as exc: + raise newException(Defect, "Failure in taskpool initialization.") + let db = BeaconChainDB.new(config.databaseDir, inMemory = false) @@ -239,7 +256,7 @@ proc init*(T: type BeaconNode, chainDagFlags = if config.verifyFinalization: {verifyFinalization} else: {} dag = ChainDAGRef.init(cfg, db, chainDagFlags) - quarantine = QuarantineRef.init(rng) + quarantine = QuarantineRef.init(rng, taskpool) databaseGenesisValidatorsRoot = getStateField(dag.headState.data, genesis_validators_root) @@ -345,7 +362,7 @@ proc init*(T: type BeaconNode, processor = Eth2Processor.new( config.doppelgangerDetection, blockProcessor, dag, attestationPool, exitPool, validatorPool, - syncCommitteeMsgPool, quarantine, rng, getBeaconTime) + syncCommitteeMsgPool, quarantine, rng, getBeaconTime, taskpool) var node = BeaconNode( nickname: nickname, @@ -369,7 +386,8 @@ proc init*(T: type BeaconNode, blockProcessor: blockProcessor, consensusManager: consensusManager, requestManager: RequestManager.init(network, blockProcessor), - beaconClock: beaconClock + beaconClock: beaconClock, + taskpool: taskpool ) # set topic validation routine diff --git a/research/block_sim.nim b/research/block_sim.nim index 553abc51b..9ec2e0975 100644 --- a/research/block_sim.nim +++ b/research/block_sim.nim @@ -18,7 +18,7 @@ import math, stats, times, strformat, tables, options, random, tables, os, confutils, chronicles, eth/db/kvstore_sqlite3, - chronos/timer, eth/keys, + chronos/timer, eth/keys, taskpools, ../tests/testblockutil, ../beacon_chain/spec/[ beaconstate, forks, helpers, signatures, state_transition], @@ -89,7 +89,8 @@ cli do(slots = SLOTS_PER_EPOCH * 6, dag = ChainDAGRef.init(cfg, db, {}) eth1Chain = Eth1Chain.init(cfg, db) merkleizer = depositContractSnapshot.createMerkleizer - quarantine = QuarantineRef.init(keys.newRng()) + taskpool = Taskpool.new() + quarantine = QuarantineRef.init(keys.newRng(), taskpool) attPool = AttestationPool.init(dag, quarantine) syncCommitteePool = newClone SyncCommitteeMsgPool.init() timers: array[Timers, RunningStat] diff --git a/tests/test_attestation_pool.nim b/tests/test_attestation_pool.nim index 274d54fae..1c14d70ab 100644 --- a/tests/test_attestation_pool.nim +++ b/tests/test_attestation_pool.nim @@ -13,7 +13,7 @@ import unittest2, chronicles, chronos, stew/byteutils, - eth/keys, + eth/keys, taskpools, # Internal ../beacon_chain/[beacon_node_types], ../beacon_chain/gossip_processing/[gossip_validation], @@ -60,7 +60,8 @@ suite "Attestation pool processing" & preset(): # Genesis state that results in 6 members per committee var dag = init(ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 6), {}) - quarantine = QuarantineRef.init(keys.newRng()) + taskpool = Taskpool.new() + quarantine = QuarantineRef.init(keys.newRng(), taskpool) pool = newClone(AttestationPool.init(dag, quarantine)) state = newClone(dag.headState) cache = StateCache() diff --git a/tests/test_block_pool.nim b/tests/test_block_pool.nim index a7ac7e67d..0cf30dc95 100644 --- a/tests/test_block_pool.nim +++ b/tests/test_block_pool.nim @@ -12,7 +12,7 @@ import std/[options, sequtils], unittest2, stew/assign2, - eth/keys, + eth/keys, taskpools, ../beacon_chain/spec/datatypes/base, ../beacon_chain/spec/[beaconstate, forks, helpers, state_transition], ../beacon_chain/beacon_node_types, @@ -120,7 +120,8 @@ suite "Block pool processing" & preset(): var db = makeTestDB(SLOTS_PER_EPOCH) dag = init(ChainDAGRef, defaultRuntimeConfig, db, {}) - quarantine = QuarantineRef.init(keys.newRng()) + taskpool = Taskpool.new() + quarantine = QuarantineRef.init(keys.newRng(), taskpool) nilPhase0Callback: OnPhase0BlockAdded state = newClone(dag.headState.data) cache = StateCache() @@ -347,7 +348,9 @@ suite "chain DAG finalization tests" & preset(): var db = makeTestDB(SLOTS_PER_EPOCH) dag = init(ChainDAGRef, defaultRuntimeConfig, db, {}) - quarantine = QuarantineRef.init(keys.newRng()) + taskpool = Taskpool.new() + quarantine = QuarantineRef.init(keys.newRng(), taskpool) + nilPhase0Callback: OnPhase0BlockAdded cache = StateCache() rewards = RewardInfo() @@ -539,7 +542,8 @@ suite "Old database versions" & preset(): makeInitialDeposits(SLOTS_PER_EPOCH.uint64, flags = {skipBlsValidation}), {skipBlsValidation}) genBlock = get_initial_beacon_block(genState[]) - quarantine = QuarantineRef.init(keys.newRng()) + taskpool = Taskpool.new() + quarantine = QuarantineRef.init(keys.newRng(), taskpool) test "pre-1.1.0": # only kvstore, no immutable validator keys diff --git a/tests/test_exit_pool.nim b/tests/test_exit_pool.nim index 45630ca2e..483f3375f 100644 --- a/tests/test_exit_pool.nim +++ b/tests/test_exit_pool.nim @@ -7,10 +7,10 @@ {.used.} -import chronicles -import eth/keys -import ../beacon_chain/spec/datatypes/base -import ../beacon_chain/consensus_object_pools/[blockchain_dag, exit_pool] +import chronicles, chronos +import eth/keys, taskpools +import ../beacon_chain/spec/[datatypes/base, presets] +import ../beacon_chain/consensus_object_pools/[block_quarantine, blockchain_dag, exit_pool] import "."/[testutil, testdbutil] proc getExitPool(): auto = diff --git a/tests/test_gossip_validation.nim b/tests/test_gossip_validation.nim index f343412a6..99258cf2b 100644 --- a/tests/test_gossip_validation.nim +++ b/tests/test_gossip_validation.nim @@ -11,7 +11,7 @@ import # Status lib unittest2, chronicles, chronos, - eth/keys, + eth/keys, taskpools, # Internal ../beacon_chain/[beacon_node_types, beacon_clock], ../beacon_chain/gossip_processing/[gossip_validation, batch_validation], @@ -33,12 +33,13 @@ suite "Gossip validation " & preset(): # Genesis state that results in 3 members per committee var dag = init(ChainDAGRef, defaultRuntimeConfig, makeTestDB(SLOTS_PER_EPOCH * 3), {}) - quarantine = QuarantineRef.init(keys.newRng()) + taskpool = Taskpool.new() + quarantine = QuarantineRef.init(keys.newRng(), taskpool) pool = newClone(AttestationPool.init(dag, quarantine)) state = newClone(dag.headState) cache = StateCache() rewards = RewardInfo() - batchCrypto = BatchCrypto.new(keys.newRng(), eager = proc(): bool = false) + batchCrypto = BatchCrypto.new(keys.newRng(), eager = proc(): bool = false, taskpool) # Slot 0 is a finalized slot - won't be making attestations for it.. check: process_slots( diff --git a/vendor/nim-blscurve b/vendor/nim-blscurve index e9b75abce..8cbc4ae87 160000 --- a/vendor/nim-blscurve +++ b/vendor/nim-blscurve @@ -1 +1 @@ -Subproject commit e9b75abcefc2c96a3ae2d5cd623779b9a6ee5496 +Subproject commit 8cbc4ae8709b94c026a015dd03933e5bbe8a0040 diff --git a/vendor/nim-taskpools b/vendor/nim-taskpools new file mode 160000 index 000000000..39b90fa4a --- /dev/null +++ b/vendor/nim-taskpools @@ -0,0 +1 @@ +Subproject commit 39b90fa4a31edeafa035cedfe938767b9ff23859