setup nimbus tx pool for app wide usage

currently the txpool is used at:
- sealer engine
- graphql ethapi
- json rpc ethapi
- ws rpc ethapi
This commit is contained in:
jangko 2022-01-23 18:39:43 +07:00
parent 137eb97766
commit 01980041d5
No known key found for this signature in database
GPG Key ID: 31702AE10541E6B9
12 changed files with 54 additions and 25 deletions

View File

@ -13,6 +13,7 @@ import
../../../nimbus/sync/protocol_eth65, ../../../nimbus/sync/protocol_eth65,
../../../nimbus/[genesis, config, conf_utils, context], ../../../nimbus/[genesis, config, conf_utils, context],
../../../nimbus/graphql/ethapi, ../../../tests/test_helpers, ../../../nimbus/graphql/ethapi, ../../../tests/test_helpers,
../../../nimbus/utils/tx_pool,
graphql, ../sim_utils graphql, ../sim_utils
const const
@ -75,10 +76,11 @@ proc main() =
conf.networkId, conf.networkId,
conf.networkParams conf.networkParams
) )
txPool = TxPoolRef.new(chainDB, conf.engineSigner)
initializeEmptyDb(chainDB) initializeEmptyDb(chainDB)
discard importRlpBlock(blocksFile, chainDB) discard importRlpBlock(blocksFile, chainDB)
let ctx = setupGraphqlContext(chainDB, ethNode) let ctx = setupGraphqlContext(chainDB, ethNode, txPool)
runTest("GraphQL", caseFolder): runTest("GraphQL", caseFolder):
let node = parseFile(fileName) let node = parseFile(fileName)

View File

@ -17,7 +17,8 @@ import
graphql/instruments/query_complexity, graphql/instruments/query_complexity,
../db/[db_chain, state_db], ../rpc/rpc_utils, ../db/[db_chain, state_db], ../rpc/rpc_utils,
".."/[utils, transaction, vm_state, config, constants], ".."/[utils, transaction, vm_state, config, constants],
../transaction/call_evm ../transaction/call_evm,
../utils/tx_pool
from eth/p2p import EthereumNode from eth/p2p import EthereumNode
export httpserver export httpserver
@ -64,6 +65,7 @@ type
ids: array[EthTypes, Name] ids: array[EthTypes, Name]
chainDB: BaseChainDB chainDB: BaseChainDB
ethNode: EthereumNode ethNode: EthereumNode
txPool: TxPoolRef
proc toHash(n: Node): Hash256 = proc toHash(n: Node): Hash256 =
result.data = hexToByteArray[32](n.stringVal) result.data = hexToByteArray[32](n.stringVal)
@ -1295,19 +1297,24 @@ proc initEthApi(ctx: GraphqlContextRef) =
echo res.error echo res.error
quit(QuitFailure) quit(QuitFailure)
proc setupGraphqlContext*(chainDB: BaseChainDB, ethNode: EthereumNode): GraphqlContextRef = proc setupGraphqlContext*(chainDB: BaseChainDB,
ethNode: EthereumNode,
txPool: TxPoolRef): GraphqlContextRef =
let ctx = GraphqlContextRef( let ctx = GraphqlContextRef(
chainDB: chainDB, chainDB: chainDB,
ethNode: ethNode ethNode: ethNode,
txPool: txPool
) )
graphql.init(ctx) graphql.init(ctx)
ctx.initEthApi() ctx.initEthApi()
ctx ctx
proc setupGraphqlHttpServer*(conf: NimbusConf, proc setupGraphqlHttpServer*(conf: NimbusConf,
chainDB: BaseChainDB, ethNode: EthereumNode): GraphqlHttpServerRef = chainDB: BaseChainDB,
ethNode: EthereumNode,
txPool: TxPoolRef): GraphqlHttpServerRef =
let socketFlags = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr} let socketFlags = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr}
let ctx = setupGraphqlContext(chainDB, ethNode) let ctx = setupGraphqlContext(chainDB, ethNode, txPool)
let address = initTAddress(conf.graphqlAddress, conf.graphqlPort) let address = initTAddress(conf.graphqlAddress, conf.graphqlPort)
let sres = GraphqlHttpServerRef.new(ctx, address, socketFlags = socketFlags) let sres = GraphqlHttpServerRef.new(ctx, address, socketFlags = socketFlags)
if sres.isErr(): if sres.isErr():

View File

@ -22,7 +22,7 @@ import
./sync/protocol_eth65, ./sync/protocol_eth65,
config, genesis, rpc/[common, p2p, debug, engine_api], p2p/chain, config, genesis, rpc/[common, p2p, debug, engine_api], p2p/chain,
eth/trie/db, metrics, metrics/[chronos_httpserver, chronicles_support], eth/trie/db, metrics, metrics/[chronos_httpserver, chronicles_support],
graphql/ethapi, context, graphql/ethapi, context, utils/tx_pool,
"."/[conf_utils, sealer, constants, utils] "."/[conf_utils, sealer, constants, utils]
when defined(evmc_enabled): when defined(evmc_enabled):
@ -47,6 +47,7 @@ type
sealingEngine: SealingEngineRef sealingEngine: SealingEngineRef
ctx: EthContext ctx: EthContext
chainRef: Chain chainRef: Chain
txPool: TxPoolRef
proc importBlocks(conf: NimbusConf, chainDB: BaseChainDB) = proc importBlocks(conf: NimbusConf, chainDB: BaseChainDB) =
if string(conf.blocksFile).len > 0: if string(conf.blocksFile).len > 0:
@ -132,6 +133,12 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
proc localServices(nimbus: NimbusNode, conf: NimbusConf, proc localServices(nimbus: NimbusNode, conf: NimbusConf,
chainDB: BaseChainDB, protocols: set[ProtocolFlag]) = chainDB: BaseChainDB, protocols: set[ProtocolFlag]) =
# app wide TxPool singleton
# TODO: disable some of txPool internal mechanism if
# the engineSigner is zero.
nimbus.txPool = TxPoolRef.new(chainDB, conf.engineSigner)
# metrics logging # metrics logging
if conf.logMetricsEnabled: if conf.logMetricsEnabled:
# https://github.com/nim-lang/Nim/issues/17369 # https://github.com/nim-lang/Nim/issues/17369
@ -151,7 +158,7 @@ proc localServices(nimbus: NimbusNode, conf: NimbusConf,
# Enable RPC APIs based on RPC flags and protocol flags # Enable RPC APIs based on RPC flags and protocol flags
let rpcFlags = conf.getRpcFlags() let rpcFlags = conf.getRpcFlags()
if RpcFlag.Eth in rpcFlags and ProtocolFlag.Eth in protocols: if RpcFlag.Eth in rpcFlags and ProtocolFlag.Eth in protocols:
setupEthRpc(nimbus.ethNode, nimbus.ctx, chainDB, nimbus.rpcServer) setupEthRpc(nimbus.ethNode, nimbus.ctx, chainDB, nimbus.txPool, nimbus.rpcServer)
if RpcFlag.Debug in rpcFlags: if RpcFlag.Debug in rpcFlags:
setupDebugRpc(chainDB, nimbus.rpcServer) setupDebugRpc(chainDB, nimbus.rpcServer)
@ -170,14 +177,14 @@ proc localServices(nimbus: NimbusNode, conf: NimbusConf,
# Enable Websocket RPC APIs based on RPC flags and protocol flags # Enable Websocket RPC APIs based on RPC flags and protocol flags
let wsFlags = conf.getWsFlags() let wsFlags = conf.getWsFlags()
if RpcFlag.Eth in wsFlags and ProtocolFlag.Eth in protocols: if RpcFlag.Eth in wsFlags and ProtocolFlag.Eth in protocols:
setupEthRpc(nimbus.ethNode, nimbus.ctx, chainDB, nimbus.wsRpcServer) setupEthRpc(nimbus.ethNode, nimbus.ctx, chainDB, nimbus.txPool, nimbus.wsRpcServer)
if RpcFlag.Debug in wsFlags: if RpcFlag.Debug in wsFlags:
setupDebugRpc(chainDB, nimbus.wsRpcServer) setupDebugRpc(chainDB, nimbus.wsRpcServer)
nimbus.wsRpcServer.start() nimbus.wsRpcServer.start()
if conf.graphqlEnabled: if conf.graphqlEnabled:
nimbus.graphqlServer = setupGraphqlHttpServer(conf, chainDB, nimbus.ethNode) nimbus.graphqlServer = setupGraphqlHttpServer(conf, chainDB, nimbus.ethNode, nimbus.txPool)
nimbus.graphqlServer.start() nimbus.graphqlServer.start()
if conf.engineSigner != ZERO_ADDRESS: if conf.engineSigner != ZERO_ADDRESS:
@ -206,7 +213,9 @@ proc localServices(nimbus: NimbusNode, conf: NimbusConf,
EngineStopped EngineStopped
nimbus.sealingEngine = SealingEngineRef.new( nimbus.sealingEngine = SealingEngineRef.new(
# TODO: Implement the initial state correctly # TODO: Implement the initial state correctly
nimbus.chainRef, nimbus.ctx, conf.engineSigner, initialSealingEngineState) nimbus.chainRef, nimbus.ctx, conf.engineSigner,
nimbus.txPool, initialSealingEngineState
)
nimbus.sealingEngine.start() nimbus.sealingEngine.start()
if conf.engineApiEnabled: if conf.engineApiEnabled:

View File

@ -60,6 +60,7 @@ proc validateSeal(pow: PoWRef; header: BlockHeader): Result[void,string] =
miningHash = header.getPowSpecs.miningHash miningHash = header.getPowSpecs.miningHash
(size, cachedHash) = try: pow.getPowCacheLookup(header.blockNumber) (size, cachedHash) = try: pow.getPowCacheLookup(header.blockNumber)
except KeyError: return err("Unknown block") except KeyError: return err("Unknown block")
except CatchableError as e: return err(e.msg)
debug "mixHash mismatch", debug "mixHash mismatch",
actual = header.mixDigest, actual = header.mixDigest,
expected = expMixDigest, expected = expMixDigest,

View File

@ -14,7 +14,8 @@ import
".."/[transaction, vm_state, constants, utils, context], ".."/[transaction, vm_state, constants, utils, context],
../db/[db_chain, state_db], ../db/[db_chain, state_db],
rpc_types, rpc_utils, rpc_types, rpc_utils,
../transaction/call_evm ../transaction/call_evm,
../utils/tx_pool
#[ #[
Note: Note:
@ -25,7 +26,7 @@ import
type cast to avoid extra processing. type cast to avoid extra processing.
]# ]#
proc setupEthRpc*(node: EthereumNode, ctx: EthContext, chain: BaseChainDB , server: RpcServer) = proc setupEthRpc*(node: EthereumNode, ctx: EthContext, chain: BaseChainDB, txPool: TxPoolRef, server: RpcServer) =
proc getStateDB(header: BlockHeader): ReadOnlyStateDB = proc getStateDB(header: BlockHeader): ReadOnlyStateDB =
## Retrieves the account db from canonical head ## Retrieves the account db from canonical head

View File

@ -17,7 +17,8 @@ import
clique_cfg, clique_cfg,
clique_sealer], clique_sealer],
./p2p/gaslimit, ./p2p/gaslimit,
"."/[chain_config, utils, context] "."/[chain_config, utils, context],
"."/utils/tx_pool
from web3/ethtypes as web3types import nil from web3/ethtypes as web3types import nil
from web3/engine_api_types import ExecutionPayload, PayloadAttributes from web3/engine_api_types import ExecutionPayload, PayloadAttributes
@ -40,6 +41,7 @@ type
chain*: Chain chain*: Chain
ctx: EthContext ctx: EthContext
signer: EthAddress signer: EthAddress
txPool: TxPoolRef
template asEthHash*(hash: Web3BlockHash): Hash256 = template asEthHash*(hash: Web3BlockHash): Hash256 =
Hash256(data: distinctBase(hash)) Hash256(data: distinctBase(hash))
@ -253,11 +255,13 @@ proc new*(_: type SealingEngineRef,
chain: Chain, chain: Chain,
ctx: EthContext, ctx: EthContext,
signer: EthAddress, signer: EthAddress,
txPool: TxPoolRef,
initialState: EngineState): SealingEngineRef = initialState: EngineState): SealingEngineRef =
SealingEngineRef( SealingEngineRef(
chain: chain, chain: chain,
ctx: ctx, ctx: ctx,
signer: signer, signer: signer,
txPool: txPool,
state: initialState state: initialState
) )

View File

@ -228,7 +228,7 @@ proc getPowSpecs*(header: BlockHeader): PowSpecs =
proc getPowCacheLookup*(tm: PowRef; proc getPowCacheLookup*(tm: PowRef;
blockNumber: BlockNumber): (uint64, Hash256) blockNumber: BlockNumber): (uint64, Hash256)
{.gcsafe, raises: [KeyError, Defect].} = {.gcsafe, raises: [KeyError, Defect, CatchableError].} =
## Returns the pair `(size,digest)` derived from the lookup cache for the ## Returns the pair `(size,digest)` derived from the lookup cache for the
## `hashimotoLight()` function for the given block number. The `size` is the ## `hashimotoLight()` function for the given block number. The `size` is the
## full size of the dataset (the cache represents) as passed on to the ## full size of the dataset (the cache represents) as passed on to the

View File

@ -71,7 +71,7 @@ proc new*(T: type PowCacheRef; maxItems = nItemsMax): T =
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc get*(pc: var PowCache; bn: BlockNumber): PowCacheItemRef proc get*(pc: var PowCache; bn: BlockNumber): PowCacheItemRef
{.gcsafe, raises: [Defect].} = {.gcsafe, raises: [Defect, CatchableError].} =
## Return a cache derived from argument `blockNumber` ready to be used ## Return a cache derived from argument `blockNumber` ready to be used
## for the `hashimotoLight()` method. ## for the `hashimotoLight()` method.
let let
@ -91,7 +91,8 @@ proc get*(pc: var PowCache; bn: BlockNumber): PowCacheItemRef
pc.cache.lruAppend(key, pair, pc.cacheMax) pc.cache.lruAppend(key, pair, pc.cacheMax)
proc get*(pcr: PowCacheRef; bn: BlockNumber): PowCacheItemRef = proc get*(pcr: PowCacheRef; bn: BlockNumber): PowCacheItemRef
{.gcsafe, raises: [Defect, CatchableError].} =
## Variant of `getCache()` ## Variant of `getCache()`
pcr[].get(bn) pcr[].get(bn)

View File

@ -47,7 +47,7 @@ proc checkTxBasic(xp: TxPoolRef; item: TxItemRef): bool =
if item.tx.gasLimit < item.tx.intrinsicGas(xp.chain.nextFork): if item.tx.gasLimit < item.tx.intrinsicGas(xp.chain.nextFork):
debug "invalid tx: not enough gas to perform calculation", debug "invalid tx: not enough gas to perform calculation",
available = item.tx.gasLimit, available = item.tx.gasLimit,
require = item.tx.intrinsicGas(xp.chain.fork) require = item.tx.intrinsicGas(xp.chain.nextFork)
return false return false
if item.tx.txType == TxEip1559: if item.tx.txType == TxEip1559:

View File

@ -137,9 +137,9 @@ proc headDiff*(xp: TxPoolRef;
return err(txInfoErrAncestorMissing) return err(txInfoErrAncestorMissing)
debug "Tx-pool reset with detached current head", debug "Tx-pool reset with detached current head",
curHeader = curHash, curHeader = curHash,
curNumber = curHeader.blockNumber, curNumber = curHead.blockNumber,
newHeader = newHash, newHeader = newHash,
newNumber = newHeader.blockNumber newNumber = newHead.blockNumber
return err(txInfoErrChainHeadMissing) return err(txInfoErrChainHeadMissing)
# Equalise block numbers between branches (typically, these btanches # Equalise block numbers between branches (typically, these btanches

View File

@ -15,7 +15,8 @@ import
../nimbus/sync/protocol_eth65, ../nimbus/sync/protocol_eth65,
../nimbus/[genesis, config, chain_config, context], ../nimbus/[genesis, config, chain_config, context],
../nimbus/db/[db_chain], ../nimbus/db/[db_chain],
../nimbus/p2p/chain, ./test_helpers ../nimbus/p2p/chain, ./test_helpers,
../nimbus/utils/tx_pool
type type
EthBlock = object EthBlock = object
@ -100,8 +101,9 @@ proc graphqlMain*() =
ethCtx = newEthContext() ethCtx = newEthContext()
ethNode = setupEthNode(conf, ethCtx, eth) ethNode = setupEthNode(conf, ethCtx, eth)
chainDB = setupChain() chainDB = setupChain()
txPool = TxPoolRef.new(chainDB, conf.engineSigner)
let ctx = setupGraphqlContext(chainDB, ethNode) let ctx = setupGraphqlContext(chainDB, ethNode, txPool)
when isMainModule: when isMainModule:
ctx.main(caseFolder, purgeSchema = false) ctx.main(caseFolder, purgeSchema = false)
else: else:

View File

@ -16,7 +16,7 @@ import
../nimbus/db/[accounts_cache, db_chain], ../nimbus/db/[accounts_cache, db_chain],
../nimbus/p2p/[chain, executor, executor/executor_helpers], ../nimbus/p2p/[chain, executor, executor/executor_helpers],
../nimbus/sync/protocol_eth65, ../nimbus/sync/protocol_eth65,
../nimbus/utils/difficulty, ../nimbus/utils/[difficulty, tx_pool],
../nimbus/[context, chain_config], ../nimbus/[context, chain_config],
./test_helpers, ./macro_assembler ./test_helpers, ./macro_assembler
@ -106,7 +106,7 @@ proc setupEnv(chainDB: BaseChainDB, signer, ks2: EthAddress, ctx: EthContext): T
# call persist() before we get the rootHash # call persist() before we get the rootHash
vmState.stateDB.persist() vmState.stateDB.persist()
var header = BlockHeader( var header = BlockHeader(
parentHash : parentHash, parentHash : parentHash,
#coinbase*: EthAddress #coinbase*: EthAddress
@ -171,8 +171,10 @@ proc rpcMain*() =
var var
rpcServer = newRpcSocketServer(["localhost:" & $RPC_PORT]) rpcServer = newRpcSocketServer(["localhost:" & $RPC_PORT])
client = newRpcSocketClient() client = newRpcSocketClient()
txPool = TxPoolRef.new(chain, conf.engineSigner)
setupCommonRpc(ethNode, conf, rpcServer) setupCommonRpc(ethNode, conf, rpcServer)
setupEthRpc(ethNode, ctx, chain, rpcServer) setupEthRpc(ethNode, ctx, chain, txPool, rpcServer)
# Begin tests # Begin tests
rpcServer.start() rpcServer.start()