Jordan/dissolve txpool jobs queue (#1047)

* Prepare unit tests for running without tx-pool job queue

why:
  Most of the job queue logic can be emulated. This adapts to a few
  pathological test cases.

* Replace tx-pool job queue logic with in-place actions

why:
  This additional execution layer is not needed, anymore which has been
  learned from working with the integration/hive tests.

details:
  Execution of add or deletion jobs are executed in-place. Some actions
  -- as in smartHead() -- have been combined for facilitating the correct
  order actions

* Update production functions, remove txpool legacy stuff
This commit is contained in:
Jordan Hrycaj 2022-04-08 09:38:47 +01:00 committed by GitHub
parent 0365d030fb
commit fd6fcc6cc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 179 additions and 479 deletions

View File

@ -248,11 +248,8 @@ proc setupEthRpc*(node: EthereumNode, ctx: EthContext, chain: BaseChainDB, txPoo
eip155 = chain.currentBlock >= chain.config.eip155Block eip155 = chain.currentBlock >= chain.config.eip155Block
signedTx = signTransaction(tx, acc.privateKey, chain.config.chainId, eip155) signedTx = signTransaction(tx, acc.privateKey, chain.config.chainId, eip155)
rlpTx = rlp.encode(signedTx) rlpTx = rlp.encode(signedTx)
res = txPool.addLocal(signedTx, force = true)
if res.isErr:
raise newException(ValueError, $res.error)
txPool.add(signedTx)
result = keccak_256.digest(rlpTx).ethHashStr result = keccak_256.digest(rlpTx).ethHashStr
server.rpc("eth_sendRawTransaction") do(data: HexDataStr) -> EthHashStr: server.rpc("eth_sendRawTransaction") do(data: HexDataStr) -> EthHashStr:
@ -264,11 +261,8 @@ proc setupEthRpc*(node: EthereumNode, ctx: EthContext, chain: BaseChainDB, txPoo
let let
txBytes = hexToSeqByte(data.string) txBytes = hexToSeqByte(data.string)
signedTx = rlp.decode(txBytes, Transaction) signedTx = rlp.decode(txBytes, Transaction)
res = txPool.addLocal(signedTx, force = true)
if res.isErr:
raise newException(ValueError, $res.error)
txPool.add(signedTx)
result = keccak_256.digest(txBytes).ethHashStr result = keccak_256.digest(txBytes).ethHashStr
server.rpc("eth_call") do(call: EthCall, quantityTag: string) -> HexDataStr: server.rpc("eth_call") do(call: EthCall, quantityTag: string) -> HexDataStr:

View File

@ -89,7 +89,6 @@ proc prepareBlock(engine: SealingEngineRef,
time time
engine.txPool.prevRandao = prevRandao engine.txPool.prevRandao = prevRandao
engine.txPool.jobCommit()
var blk = engine.txPool.ethBlock() var blk = engine.txPool.ethBlock()
@ -224,10 +223,7 @@ proc sealingLoop(engine: SealingEngineRef): Future[void] {.async.} =
error "sealing engine: persistBlocks error" error "sealing engine: persistBlocks error"
break break
discard engine.txPool.jobDeltaTxsHead(blk.header) # add transactions update jobs discard engine.txPool.smartHead(blk.header) # add transactions update jobs
engine.txPool.head = blk.header # adjust block insertion point
engine.txPool.jobCommit()
info "block generated", number=blk.header.blockNumber info "block generated", number=blk.header.blockNumber
template unsafeQuantityToInt64(q: web3types.Quantity): int64 = template unsafeQuantityToInt64(q: web3types.Quantity): int64 =
@ -278,9 +274,7 @@ proc generateExecutionPayload*(engine: SealingEngineRef,
if blk.header.extraData.len > 32: if blk.header.extraData.len > 32:
return err "extraData length should not exceed 32 bytes" return err "extraData length should not exceed 32 bytes"
discard engine.txPool.jobDeltaTxsHead(blk.header) # add transactions update jobs discard engine.txPool.smartHead(blk.header) # add transactions update jobs
engine.txPool.head = blk.header # adjust block insertion point
engine.txPool.jobCommit()
payloadRes.parentHash = Web3BlockHash blk.header.parentHash.data payloadRes.parentHash = Web3BlockHash blk.header.parentHash.data
payloadRes.feeRecipient = Web3Address blk.header.coinbase payloadRes.feeRecipient = Web3Address blk.header.coinbase

View File

@ -16,7 +16,7 @@
## ##
## * There is a conceivable problem with the per-account optimisation. The ## * There is a conceivable problem with the per-account optimisation. The
## algorithm chooses an account and does not stop packing until all txs ## algorithm chooses an account and does not stop packing until all txs
## of the account are packed or the block is full. In the lattter case, ## of the account are packed or the block is full. In the latter case,
## there might be some txs left unpacked from the account which might be ## there might be some txs left unpacked from the account which might be
## the most lucrative ones. Should this be tackled (see also next item)? ## the most lucrative ones. Should this be tackled (see also next item)?
## ##
@ -81,25 +81,25 @@
## Pool database: ## Pool database:
## -------------- ## --------------
## :: ## ::
## <Batch queue> . <Status buckets> . <Terminal state> ## <Transactions> . <Status buckets> . <Terminal state>
## . . ## . .
## . . +----------+ ## . . +----------+
## --> txJobAddTxs -------------------------------> | | ## add() ----+---------------------------------> | |
## | . +-----------+ . | disposed | ## | . +-----------+ . | disposed |
## +------------> | pending | ------> | | ## +-----------> | pending | ------> | |
## . +-----------+ . | | ## . +-----------+ . | |
## . | ^ ^ . | waste | ## . | ^ ^ . | waste |
## . v | | . | basket | ## . v | | . | basket |
## . +----------+ | . | | ## . +----------+ | . | |
## . | staged | | . | | ## . | staged | | . | |
## . +----------+ | . | | ## . +----------+ | . | |
## . | | ^ | . | | ## . | | ^ | . | |
## . | v | | . | | ## . | v | | . | |
## . | +----------+ . | | ## . | +----------+ . | |
## . | | packed | -------> | | ## . | | packed | -------> | |
## . | +----------+ . | | ## . | +----------+ . | |
## . +----------------------> | | ## . +----------------------> | |
## . . +----------+ ## . . +----------+
## ##
## The three columns *Batch queue*, *State bucket*, and *Terminal state* ## The three columns *Batch queue*, *State bucket*, and *Terminal state*
## represent three different accounting (or database) systems. The pool ## represent three different accounting (or database) systems. The pool
@ -245,20 +245,16 @@
## var xq = TxPoolRef.new(db) # initialise tx-pool ## var xq = TxPoolRef.new(db) # initialise tx-pool
## .. ## ..
## ##
## xq.jobAddTxs(txs) # add transactions to be held ## xq.add(txs) # add transactions ..
## .. # .. on the batch queue ## .. # .. into the buckets
## ##
## xq.jobCommit # run batch queue worker/processor
## let newBlock = xq.ethBlock # fetch current mining block ## let newBlock = xq.ethBlock # fetch current mining block
## ##
## .. ## ..
## mineThatBlock(newBlock) ... # external mining & signing process ## mineThatBlock(newBlock) ... # external mining & signing process
## .. ## ..
## ##
## let newTopHeader = db.getCanonicalHead # new head after mining ## xp.smartHead(newBlock.header) # update pool, new insertion point
## xp.jobDeltaTxsHead(newTopHeader) # add transactions update jobs
## xp.head = newTopHeader # adjust block insertion point
## xp.jobCommit # run batch queue worker/processor
## ##
## ##
## Discussion of example ## Discussion of example
@ -350,10 +346,6 @@
## ##
## *..there might be more strategy symbols..* ## *..there might be more strategy symbols..*
## ##
## head
## Cached block chain insertion point. Typocally, this should be the the
## same header as retrieved by the `getCanonicalHead()`.
##
## hwmTrgPercent ## hwmTrgPercent
## This parameter implies the size of `hwmGasLimit` which is calculated ## This parameter implies the size of `hwmGasLimit` which is calculated
## as `max(trgGasLimit, maxGasLimit * lwmTrgPercent / 100)`. ## as `max(trgGasLimit, maxGasLimit * lwmTrgPercent / 100)`.
@ -405,6 +397,11 @@
## eligible for staging at the next cycle when the internally cached block ## eligible for staging at the next cycle when the internally cached block
## chain state is updated.) ## chain state is updated.)
## ##
## head
## Cached block chain insertion point, not necessarily the same header as
## retrieved by the `getCanonicalHead()`. This insertion point can be
## adjusted with the `smartHead()` function.
##
## hwmGasLimit ## hwmGasLimit
## This parameter is at least `trgGasLimit` and does not exceed ## This parameter is at least `trgGasLimit` and does not exceed
## `maxGasLimit` and can be adjusted by means of setting `hwmMaxPercent`. It ## `maxGasLimit` and can be adjusted by means of setting `hwmMaxPercent`. It
@ -440,7 +437,7 @@
import import
std/[sequtils, tables], std/[sequtils, tables],
../db/db_chain, ../db/db_chain,
./tx_pool/[tx_chain, tx_desc, tx_info, tx_item, tx_job], ./tx_pool/[tx_chain, tx_desc, tx_info, tx_item],
./tx_pool/tx_tabs, ./tx_pool/tx_tabs,
./tx_pool/tx_tasks/[ ./tx_pool/tx_tasks/[
tx_add, tx_add,
@ -454,16 +451,9 @@ import
stew/[keyed_queue, results], stew/[keyed_queue, results],
stint stint
# hide complexity unless really needed
when JobWaitEnabled:
import chronos
export export
TxItemRef, TxItemRef,
TxItemStatus, TxItemStatus,
TxJobDataRef,
TxJobID,
TxJobKind,
TxPoolFlags, TxPoolFlags,
TxPoolRef, TxPoolRef,
TxTabsGasTotals, TxTabsGasTotals,
@ -495,7 +485,7 @@ logScope:
proc maintenanceProcessing(xp: TxPoolRef) proc maintenanceProcessing(xp: TxPoolRef)
{.gcsafe,raises: [Defect,CatchableError].} = {.gcsafe,raises: [Defect,CatchableError].} =
## Tasks to be done after job processing ## Tasks to be done after add/del job processing
# Purge expired items # Purge expired items
if autoZombifyUnpacked in xp.pFlags or if autoZombifyUnpacked in xp.pFlags or
@ -512,34 +502,15 @@ proc maintenanceProcessing(xp: TxPoolRef)
discard xp.bucketUpdateAll discard xp.bucketUpdateAll
xp.pDirtyBuckets = false xp.pDirtyBuckets = false
proc setHead(xp: TxPoolRef; val: BlockHeader)
proc processJobs(xp: TxPoolRef): int
{.gcsafe,raises: [Defect,CatchableError].} = {.gcsafe,raises: [Defect,CatchableError].} =
## Job queue processor ## Update cached block chain insertion point. This will also update the
var rc = xp.byJob.fetch ## internally cached `baseFee` (depends on the block chain state.)
while rc.isOK: if xp.chain.head != val:
let task = rc.value xp.chain.head = val # calculates the new baseFee
rc = xp.byJob.fetch xp.txDB.baseFee = xp.chain.baseFee
result.inc xp.pDirtyBuckets = true
xp.bucketFlushPacked
case task.data.kind
of txJobNone:
# No action
discard
of txJobAddTxs:
# Add a batch of txs to the database
var args = task.data.addTxsArgs
let (_,topItems) = xp.addTxs(args.txs, args.info)
xp.pDoubleCheckAdd topItems
of txJobDelItemIDs:
# Dispose a batch of items
var args = task.data.delItemIDsArgs
for itemID in args.itemIDs:
let rcItem = xp.txDB.byItemID.eq(itemID)
if rcItem.isOK:
discard xp.txDB.dispose(rcItem.value, reason = args.reason)
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Public constructor/destructor # Public constructor/destructor
@ -556,14 +527,9 @@ proc new*(T: type TxPoolRef; db: BaseChainDB; miner: EthAddress): T
# Public functions, task manager, pool actions serialiser # Public functions, task manager, pool actions serialiser
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc job*(xp: TxPoolRef; job: TxJobDataRef): TxJobID
{.discardable,gcsafe,raises: [Defect,CatchableError].} =
## Queue a new generic job (does not run `jobCommit()`.)
xp.byJob.add(job)
# core/tx_pool.go(848): func (pool *TxPool) AddLocals(txs [].. # core/tx_pool.go(848): func (pool *TxPool) AddLocals(txs []..
# core/tx_pool.go(864): func (pool *TxPool) AddRemotes(txs [].. # core/tx_pool.go(864): func (pool *TxPool) AddRemotes(txs []..
proc jobAddTxs*(xp: TxPoolRef; txs: openArray[Transaction]; info = "") proc add*(xp: TxPoolRef; txs: openArray[Transaction]; info = "")
{.gcsafe,raises: [Defect,CatchableError].} = {.gcsafe,raises: [Defect,CatchableError].} =
## Queues a batch of transactions jobs to be processed in due course (does ## Queues a batch of transactions jobs to be processed in due course (does
## not run `jobCommit()`.) ## not run `jobCommit()`.)
@ -573,88 +539,68 @@ proc jobAddTxs*(xp: TxPoolRef; txs: openArray[Transaction]; info = "")
## least nonce first. For this reason, it is suggested to pass transactions ## least nonce first. For this reason, it is suggested to pass transactions
## in larger groups. Calling single transaction jobs, they must strictly be ## in larger groups. Calling single transaction jobs, they must strictly be
## passed smaller nonce before larger nonce. ## passed smaller nonce before larger nonce.
discard xp.job(TxJobDataRef( xp.pDoubleCheckAdd xp.addTxs(txs, info).topItems
kind: txJobAddTxs, xp.maintenanceProcessing
addTxsArgs: (
txs: toSeq(txs),
info: info)))
# core/tx_pool.go(854): func (pool *TxPool) AddLocals(txs [].. # core/tx_pool.go(854): func (pool *TxPool) AddLocals(txs []..
# core/tx_pool.go(883): func (pool *TxPool) AddRemotes(txs [].. # core/tx_pool.go(883): func (pool *TxPool) AddRemotes(txs []..
proc jobAddTx*(xp: TxPoolRef; tx: Transaction; info = "") proc add*(xp: TxPoolRef; tx: Transaction; info = "")
{.gcsafe,raises: [Defect,CatchableError].} = {.gcsafe,raises: [Defect,CatchableError].} =
## Variant of `jobAddTxs()` for a single transaction. ## Variant of `add()` for a single transaction.
xp.jobAddTxs(@[tx], info) xp.add(@[tx], info)
proc smartHead*(xp: TxPoolRef; pos: BlockHeader; blindMode = false): bool
proc jobDeltaTxsHead*(xp: TxPoolRef; newHead: BlockHeader): bool
{.gcsafe,raises: [Defect,CatchableError].} = {.gcsafe,raises: [Defect,CatchableError].} =
## This function calculates the txs to add or delete that need to take place ## This function moves the internal head cache (i.e. tx insertion point,
## after the cached block chain head is set to the position implied by the ## vmState) and ponts it to a now block on the chain.
## argument `newHead`. If successful, the txs to add or delete are queued ##
## on the job queue (run `jobCommit()` to execute) and `true` is returned. ## In standard mode when argument `blindMode` is `false`, it calculates the
## Otherwise nothing is done and `false` is returned. ## txs that beed to be added or deleted after moving the insertion point
let rcDiff = xp.headDiff(newHead) ## head so that the tx-pool will not fail to re-insert quered txs that are
## on the chain, already. Neither will it loose any txs. After updating the
## the internal head cache, the previously calculated actions will be
## applied.
##
## If the argument `blindMode` is passed `true`, the insertion head is
## simply set ignoring all changes. This mode makes sense only in very
## particular circumstances.
if blindMode:
xp.sethead(pos)
return true
let rcDiff = xp.headDiff(pos)
if rcDiff.isOk: if rcDiff.isOk:
let changes = rcDiff.value let changes = rcDiff.value
# Need to move head before adding txs which may rightly be rejected in
# `addTxs()` otherwise.
xp.sethead(pos)
# Re-inject transactions, do that via job queue # Re-inject transactions, do that via job queue
if 0 < changes.addTxs.len: if 0 < changes.addTxs.len:
debug "queuing delta txs", debug "queuing delta txs",
mode = "inject", mode = "inject",
num = changes.addTxs.len num = changes.addTxs.len
xp.pDoubleCheckAdd xp.addTxs(toSeq(changes.addTxs.nextValues)).topItems
discard xp.job(TxJobDataRef(
kind: txJobAddTxs,
addTxsArgs: (
txs: toSeq(changes.addTxs.nextValues),
info: "")))
# Delete already *mined* transactions # Delete already *mined* transactions
if 0 < changes.remTxs.len: if 0 < changes.remTxs.len:
debug "queuing delta txs", debug "queuing delta txs",
mode = "remove", mode = "remove",
num = changes.remTxs.len num = changes.remTxs.len
xp.disposeById(toSeq(changes.remTxs.keys), txInfoChainHeadUpdate)
discard xp.job(TxJobDataRef( xp.maintenanceProcessing
kind: txJobDelItemIDs,
delItemIDsArgs: (
itemIDs: toSeq(changes.remTxs.keys),
reason: txInfoChainHeadUpdate)))
return true return true
proc triggerReorg*(xp: TxPoolRef)
proc jobCommit*(xp: TxPoolRef; forceMaintenance = false)
{.gcsafe,raises: [Defect,CatchableError].} = {.gcsafe,raises: [Defect,CatchableError].} =
## This function processes all jobs currently queued. If the the argument
## `forceMaintenance` is set `true`, mainenance processing is always run.
## Otherwise it is only run if there were active jobs.
let nJobs = xp.processJobs
if 0 < nJobs or forceMaintenance:
xp.maintenanceProcessing
debug "processed jobs", nJobs
proc nJobs*(xp: TxPoolRef): int
{.gcsafe,raises: [Defect,CatchableError].} =
## Return the number of jobs currently unprocessed, waiting.
xp.byJob.len
# hide complexity unless really needed
when JobWaitEnabled:
proc jobWait*(xp: TxPoolRef) {.async,raises: [Defect,CatchableError].} =
## Asynchronously wait until at least one job is queued and available.
## This function might be useful for testing (available only if the
## `JobWaitEnabled` compile time constant is set.)
await xp.byJob.waitAvail
proc triggerReorg*(xp: TxPoolRef) =
## This function triggers a bucket re-org action with the next job queue ## This function triggers a bucket re-org action with the next job queue
## maintenance-processing (see `jobCommit()`) by setting the `dirtyBuckets` ## maintenance-processing (see `jobCommit()`) by setting the `dirtyBuckets`
## parameter. This re-org action eventually happens when the ## parameter. This re-org action eventually happens when the
## `autoUpdateBucketsDB` flag is also set. ## `autoUpdateBucketsDB` flag is also set.
xp.pDirtyBuckets = true xp.pDirtyBuckets = true
xp.maintenanceProcessing
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Public functions, getters # Public functions, getters
@ -787,9 +733,11 @@ proc `lwmTrgPercent=`*(xp: TxPoolRef; val: int) =
if 0 <= val and val <= 100: if 0 <= val and val <= 100:
xp.chain.lhwm = (lwmTrg: val, hwmMax: xp.chain.lhwm.hwmMax) xp.chain.lhwm = (lwmTrg: val, hwmMax: xp.chain.lhwm.hwmMax)
proc `flags=`*(xp: TxPoolRef; val: set[TxPoolFlags]) = proc `flags=`*(xp: TxPoolRef; val: set[TxPoolFlags])
{.gcsafe,raises: [Defect,CatchableError].} =
## Setter, strategy symbols for how to process items and buckets. ## Setter, strategy symbols for how to process items and buckets.
xp.pFlags = val xp.pFlags = val
xp.maintenanceProcessing
proc `hwmMaxPercent=`*(xp: TxPoolRef; val: int) = proc `hwmMaxPercent=`*(xp: TxPoolRef; val: int) =
## Setter, `val` arguments outside `0..100` are ignored ## Setter, `val` arguments outside `0..100` are ignored
@ -826,16 +774,6 @@ proc `minTipPrice=`*(xp: TxPoolRef; val: GasPrice) =
xp.pMinTipPrice = val xp.pMinTipPrice = val
xp.pDirtyBuckets = true xp.pDirtyBuckets = true
proc `head=`*(xp: TxPoolRef; val: BlockHeader)
{.gcsafe,raises: [Defect,CatchableError].} =
## Setter, cached block chain insertion point. This will also update the
## internally cached `baseFee` (depends on the block chain state.)
if xp.chain.head != val:
xp.chain.head = val # calculates the new baseFee
xp.txDB.baseFee = xp.chain.baseFee
xp.pDirtyBuckets = true
xp.bucketFlushPacked
proc `prevRandao=`*(xp: TxPoolRef; val: Hash256) = proc `prevRandao=`*(xp: TxPoolRef; val: Hash256) =
## Setter, PoS block randomness ## Setter, PoS block randomness
## Used by `prevRandao` op code in EVM after transition to PoS ## Used by `prevRandao` op code in EVM after transition to PoS
@ -871,6 +809,10 @@ proc disposeItems*(xp: TxPoolRef; item: TxItemRef;
# Public functions, local/remote accounts # Public functions, local/remote accounts
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc isLocal*(xp: TxPoolRef; account: EthAddress): bool =
## This function returns `true` if argument `account` is tagged local.
xp.txDB.isLocal(account)
proc setLocal*(xp: TxPoolRef; account: EthAddress) = proc setLocal*(xp: TxPoolRef; account: EthAddress) =
## Tag argument `account` local which means that the transactions from this ## Tag argument `account` local which means that the transactions from this
## account -- together with all other local accounts -- will be considered ## account -- together with all other local accounts -- will be considered
@ -902,8 +844,15 @@ proc addRemote*(xp: TxPoolRef;
## argument transaction is tagged local, this function returns with an error. ## argument transaction is tagged local, this function returns with an error.
## If the argument `force` is set `true`, the sender account will be untagged, ## If the argument `force` is set `true`, the sender account will be untagged,
## i.e. made non-local. ## i.e. made non-local.
# Create or recover new item ##
let rc = xp.recoverItem(tx, txItemPending, "remote tx peek") ## Note: This function is rather inefficient if there are more than one
## txs to be added for a known account. The preferable way to do this
## would be to use a combination of `xp.add()` and `xp.resLocal()` in any
## order.
# Create or recover new item. This will wrap the argument `tx` and cache
# the sender account and other derived data accessible.
let rc = xp.recoverItem(
tx, txItemPending, "remote tx peek", acceptExisting = true)
if rc.isErr: if rc.isErr:
return err(rc.error) return err(rc.error)
@ -917,7 +866,7 @@ proc addRemote*(xp: TxPoolRef;
elif xp.txDB.isLocal(sender): elif xp.txDB.isLocal(sender):
return err(txInfoTxErrorRemoteExpected) return err(txInfoTxErrorRemoteExpected)
xp.jobAddTx(tx, "remote tx") xp.add(tx, "remote tx")
ok() ok()
proc addLocal*(xp: TxPoolRef; proc addLocal*(xp: TxPoolRef;
@ -929,8 +878,15 @@ proc addLocal*(xp: TxPoolRef;
## argument transaction is _not_ tagged local, this function returns with ## argument transaction is _not_ tagged local, this function returns with
## an error. If the argument `force` is set `true`, the sender account will ## an error. If the argument `force` is set `true`, the sender account will
## be tagged local. ## be tagged local.
# Create or recover new item ##
let rc = xp.recoverItem(tx, txItemPending, "local tx peek") ## Note: This function is rather inefficient if there are more than one
## txs to be added for a known account. The preferable way to do this
## would be to use a combination of `xp.add()` and `xp.setLocal()` in any
## order.
# Create or recover new item. This will wrap the argument `tx` and cache
# the sender account and other derived data accessible.
let rc = xp.recoverItem(
tx, txItemPending, "local tx peek", acceptExisting = true)
if rc.isErr: if rc.isErr:
return err(rc.error) return err(rc.error)
@ -944,9 +900,43 @@ proc addLocal*(xp: TxPoolRef;
elif not xp.txDB.isLocal(sender): elif not xp.txDB.isLocal(sender):
return err(txInfoTxErrorLocalExpected) return err(txInfoTxErrorLocalExpected)
xp.jobAddTx(tx, "local tx") xp.add(tx, "local tx")
ok() ok()
# ------------------------------------------------------------------------------
# Legacy stuff -- will be removed, soon
# ------------------------------------------------------------------------------
proc jobAddTxs*(xp: TxPoolRef; txs: openArray[Transaction]; info = "")
{.gcsafe, raises: [Defect,CatchableError],
deprecated: "use add() instead".} =
xp.add(txs,info)
proc jobAddTx*(xp: TxPoolRef; tx: Transaction; info = "")
{.gcsafe,raises: [Defect,CatchableError],
deprecated: "use add() instead".} =
xp.add(tx,info)
proc jobDeltaTxsHead*(xp: TxPoolRef; newHead: BlockHeader): bool
{.gcsafe,raises: [Defect,CatchableError],
deprecated: "use smartHead() instead " &
"and remove the head= directive follwoing".} =
xp.smartHead(newHead)
proc jobCommit*(xp: TxPoolRef; forceMaintenance = false)
{.deprecated: "this function does nothing and can savely be removed".} =
discard
proc nJobs*(xp: TxPoolRef): int
{.deprecated: "this function returns always 0 and can savely be removed".} =
0
proc `head=`*(xp: TxPoolRef; val: BlockHeader)
{.gcsafe,raises: [Defect,CatchableError],
deprecated: "use smartHead(val,blindMode=true) instead although " &
"this function is unneccesary after running smartHead()".} =
discard xp.smartHead(val, true)
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# End # End
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------

View File

@ -18,7 +18,6 @@ import
./tx_chain, ./tx_chain,
./tx_info, ./tx_info,
./tx_item, ./tx_item,
./tx_job,
./tx_tabs, ./tx_tabs,
./tx_tabs/tx_sender, # for verify() ./tx_tabs/tx_sender, # for verify()
eth/[common, keys] eth/[common, keys]
@ -94,7 +93,6 @@ type
startDate: Time ## Start date (read-only) startDate: Time ## Start date (read-only)
chain: TxChainRef ## block chain state chain: TxChainRef ## block chain state
byJob: TxJobRef ## Job batch list
txDB: TxTabsRef ## Transaction lists & tables txDB: TxTabsRef ## Transaction lists & tables
lifeTime*: times.Duration ## Maximum life time of a tx in the system lifeTime*: times.Duration ## Maximum life time of a tx in the system
@ -136,7 +134,6 @@ proc init*(xp: TxPoolRef; db: BaseChainDB; miner: EthAddress)
xp.chain = TxChainRef.new(db, miner) xp.chain = TxChainRef.new(db, miner)
xp.txDB = TxTabsRef.new xp.txDB = TxTabsRef.new
xp.byJob = TxJobRef.new
xp.lifeTime = txItemLifeTime xp.lifeTime = txItemLifeTime
xp.priceBump = txPriceBump xp.priceBump = txPriceBump
@ -150,10 +147,6 @@ proc init*(xp: TxPoolRef; db: BaseChainDB; miner: EthAddress)
# Public functions, getters # Public functions, getters
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc byJob*(xp: TxPoolRef): TxJobRef =
## Getter, job queue
xp.byJob
proc chain*(xp: TxPoolRef): TxChainRef = proc chain*(xp: TxPoolRef): TxChainRef =
## Getter, block chain DB ## Getter, block chain DB
xp.chain xp.chain
@ -231,10 +224,6 @@ proc verify*(xp: TxPoolRef): Result[void,TxInfo]
{.gcsafe, raises: [Defect,CatchableError].} = {.gcsafe, raises: [Defect,CatchableError].} =
## Verify descriptor and subsequent data structures. ## Verify descriptor and subsequent data structures.
block:
let rc = xp.byJob.verify
if rc.isErr:
return rc
block: block:
let rc = xp.txDB.verify let rc = xp.txDB.verify
if rc.isErr: if rc.isErr:

View File

@ -1,263 +0,0 @@
# Nimbus
# Copyright (c) 2018 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed except
# according to those terms.
## Jobs Queue For Transaction Pool
## ===============================
##
import
std/[hashes, tables],
./tx_info,
./tx_item,
./tx_tabs,
eth/[common, keys],
stew/[keyed_queue, keyed_queue/kq_debug, results]
{.push raises: [Defect].}
# hide complexity unless really needed
const
jobWaitCompilerFlag = defined(job_wait_enabled) or defined(debug)
JobWaitEnabled* = ##\
## Compiler flag: fire *chronos* event if job queue becomes populated
jobWaitCompilerFlag
when JobWaitEnabled:
import chronos
type
TxJobID* = ##\
## Valid interval: *1 .. TxJobIdMax*, the value `0` corresponds to\
## `TxJobIdMax` and is internally accepted only right after initialisation.
distinct uint
TxJobKind* = enum ##\
## Types of batch job data. See `txJobPriorityKind` for the list of\
## *out-of-band* jobs.
txJobNone = 0 ##\
## no action
txJobAddTxs ##\
## Enqueues a batch of transactions
txJobDelItemIDs ##\
## Enqueues a batch of itemIDs the items of which to be disposed
const
txJobPriorityKind*: set[TxJobKind] = ##\
## Prioritised jobs, either small or important ones.
{}
type
TxJobDataRef* = ref object
case kind*: TxJobKind
of txJobNone:
discard
of txJobAddTxs:
addTxsArgs*: tuple[
txs: seq[Transaction],
info: string]
of txJobDelItemIDs:
delItemIDsArgs*: tuple[
itemIDs: seq[Hash256],
reason: TxInfo]
TxJobPair* = object ## Responding to a job queue query
id*: TxJobID ## Job ID, queue database key
data*: TxJobDataRef ## Data record
TxJobRef* = ref object ##\
## Job queue with increasing job *ID* numbers (wrapping around at\
## `TxJobIdMax`.)
topID: TxJobID ## Next job will have `topID+1`
jobs: KeyedQueue[TxJobID,TxJobDataRef] ## Job queue
# hide complexity unless really needed
when JobWaitEnabled:
jobsAvail: AsyncEvent ## Fired if there is a job available
const
txJobIdMax* = ##\
## Wraps around to `1` after last ID
999999.TxJobID
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc hash(id: TxJobID): Hash =
## Needed if `TxJobID` is used as hash-`Table` index.
id.uint.hash
proc `+`(a, b: TxJobID): TxJobID {.borrow.}
proc `-`(a, b: TxJobID): TxJobID {.borrow.}
proc `+`(a: TxJobID; b: int): TxJobID = a + b.TxJobID
proc `-`(a: TxJobID; b: int): TxJobID = a - b.TxJobID
# ------------------------------------------------------------------------------
# Public helpers (operators needed in jobAppend() and jobUnshift() functions)
# ------------------------------------------------------------------------------
proc `<=`*(a, b: TxJobID): bool {.borrow.}
proc `==`*(a, b: TxJobID): bool {.borrow.}
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc jobAppend(jq: TxJobRef; data: TxJobDataRef): TxJobID
{.gcsafe,raises: [Defect,KeyError].} =
## Appends a job to the *FIFO*. This function returns a non-zero *ID* if
## successful.
##
## :Note:
## An error can only occur if the *ID* of the first job follows the *ID*
## of the last job (*modulo* `TxJobIdMax`). This occurs when
## * there are `TxJobIdMax` jobs already on the queue
## * some jobs were deleted in the middle of the queue and the *ID*
## gap was not shifted out yet.
var id: TxJobID
if txJobIdMax <= jq.topID:
id = 1.TxJobID
else:
id = jq.topID + 1
if jq.jobs.append(id, data):
jq.topID = id
return id
proc jobUnshift(jq: TxJobRef; data: TxJobDataRef): TxJobID
{.gcsafe,raises: [Defect,KeyError].} =
## Stores *back* a job to to the *FIFO* front end be re-fetched next. This
## function returns a non-zero *ID* if successful.
##
## See also the **Note* at the comment for `txAdd()`.
var id: TxJobID
if jq.jobs.len == 0:
if jq.topID == 0.TxJobID:
jq.topID = txJobIdMax # must be non-zero after first use
id = jq.topID
else:
id = jq.jobs.firstKey.value - 1
if id == 0.TxJobID:
id = txJobIdMax
if jq.jobs.unshift(id, data):
return id
# ------------------------------------------------------------------------------
# Public functions, constructor
# ------------------------------------------------------------------------------
proc new*(T: type TxJobRef; initSize = 10): T =
## Constructor variant
new result
result.jobs.init(initSize)
# hide complexity unless really needed
when JobWaitEnabled:
result.jobsAvail = newAsyncEvent()
proc clear*(jq: TxJobRef) =
## Re-initilaise variant
jq.jobs.clear
# hide complexity unless really needed
when JobWaitEnabled:
jq.jobsAvail.clear
# ------------------------------------------------------------------------------
# Public functions, add/remove entry
# ------------------------------------------------------------------------------
proc add*(jq: TxJobRef; data: TxJobDataRef): TxJobID
{.gcsafe,raises: [Defect,KeyError].} =
## Add a new job to the *FIFO*.
if data.kind in txJobPriorityKind:
result = jq.jobUnshift(data)
else:
result = jq.jobAppend(data)
# hide complexity unless really needed
when JobWaitEnabled:
# update event
jq.jobsAvail.fire
proc fetch*(jq: TxJobRef): Result[TxJobPair,void]
{.gcsafe,raises: [Defect,KeyError].} =
## Fetches (and deletes) the next job from the *FIFO*.
# first item from queue
let rc = jq.jobs.shift
if rc.isErr:
return err()
# hide complexity unless really needed
when JobWaitEnabled:
# update event
jq.jobsAvail.clear
# result
ok(TxJobPair(id: rc.value.key, data: rc.value.data))
# hide complexity unless really needed
when JobWaitEnabled:
proc waitAvail*(jq: TxJobRef) {.async,raises: [Defect,CatchableError].} =
## Asynchronously wait until at least one job is available (available
## only if the `JobWaitEnabled` compile time constant is set.)
if jq.jobs.len == 0:
await jq.jobsAvail.wait
else:
proc waitAvail*(jq: TxJobRef)
{.deprecated: "will raise exception unless JobWaitEnabled is set",
raises: [Defect,CatchableError].} =
raiseAssert "Must not be called unless JobWaitEnabled is set"
# ------------------------------------------------------------------------------
# Public queue/table ops
# ------------------------------------------------------------------------------
proc`[]`*(jq: TxJobRef; id: TxJobID): TxJobDataRef
{.gcsafe,raises: [Defect,KeyError].} =
jq.jobs[id]
proc hasKey*(jq: TxJobRef; id: TxJobID): bool
{.gcsafe,raises: [Defect,KeyError].} =
jq.jobs.hasKey(id)
proc len*(jq: TxJobRef): int
{.gcsafe,raises: [Defect,KeyError].} =
jq.jobs.len
# ------------------------------------------------------------------------------
# Public functions, debugging
# ------------------------------------------------------------------------------
proc verify*(jq: TxJobRef): Result[void,TxInfo]
{.gcsafe,raises: [Defect,KeyError].} =
block:
let rc = jq.jobs.verify
if rc.isErr:
return err(txInfoVfyJobQueue)
ok()
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -28,6 +28,18 @@ import
{.push raises: [Defect].} {.push raises: [Defect].}
type type
TxAddStats* = tuple ##\
## Status code returned from the `addTxs()` function
stagedIndicator: bool ##\
## If `true`, this value indicates that at least one item was added to\
## the `staged` bucket (which suggest a re-run of the packer.)
topItems: seq[TxItemRef] ##\
## For each sender where txs were added to the bucket database or waste\
## basket, this list keeps the items with the highest nonce (handy for\
## chasing nonce gaps after a back-move of the block chain head.)
NonceList = ##\ NonceList = ##\
## Temporary sorter list ## Temporary sorter list
SortedSet[AccountNonce,TxItemRef] SortedSet[AccountNonce,TxItemRef]
@ -147,25 +159,25 @@ proc addTx*(xp: TxPoolRef; item: TxItemRef): bool
# core/tx_pool.go(883): func (pool *TxPool) AddRemotes(txs [].. # core/tx_pool.go(883): func (pool *TxPool) AddRemotes(txs []..
# core/tx_pool.go(889): func (pool *TxPool) addTxs(txs []*types.Transaction, .. # core/tx_pool.go(889): func (pool *TxPool) addTxs(txs []*types.Transaction, ..
proc addTxs*(xp: TxPoolRef; proc addTxs*(xp: TxPoolRef;
txs: var openArray[Transaction]; info = ""): (bool,seq[TxItemRef]) txs: openArray[Transaction]; info = ""): TxAddStats
{.discardable,gcsafe,raises: [Defect,CatchableError].} = {.discardable,gcsafe,raises: [Defect,CatchableError].} =
## Add a list of transactions. The list is sorted after nonces and txs are ## Add a list of transactions. The list is sorted after nonces and txs are
## tested and stored into either of the `pending` or `staged` buckets, or ## tested and stored into either of the `pending` or `staged` buckets, or
## disposed o the waste basket. The function returns the tuple ## disposed o the waste basket. The function returns the tuple
## `(staged-indicator,top-items)` as explained below. ## `(staged-indicator,top-items)` as explained below.
## ##
## *staged-indicator* ## *stagedIndicator*
## If `true`, this value indicates that at least one item was added to ## If `true`, this value indicates that at least one item was added to
## the `staged` bucket (which suggest a re-run of the packer.) ## the `staged` bucket (which suggest a re-run of the packer.)
## ##
## *top-items* ## *topItems*
## For each sender where txs were added to the bucket database or waste ## For each sender where txs were added to the bucket database or waste
## basket, this list keeps the items with the highest nonce (handy for ## basket, this list keeps the items with the highest nonce (handy for
## chasing nonce gaps after a back-move of the block chain head.) ## chasing nonce gaps after a back-move of the block chain head.)
## ##
var accTab: AccouuntNonceTab var accTab: AccouuntNonceTab
for tx in txs.mitems: for tx in txs.items:
var reason: TxInfo var reason: TxInfo
# Create tx item wrapper, preferably recovered from waste basket # Create tx item wrapper, preferably recovered from waste basket
@ -203,7 +215,7 @@ proc addTxs*(xp: TxPoolRef;
while rc.isOK: while rc.isOK:
let (nonce,item) = (rc.value.key,rc.value.data) let (nonce,item) = (rc.value.key,rc.value.data)
if xp.addTx(item): if xp.addTx(item):
result[0] = true result.stagedIndicator = true
# Make sure that there is at least one item per sender, prefereably # Make sure that there is at least one item per sender, prefereably
# a non-error item. # a non-error item.
@ -213,7 +225,7 @@ proc addTxs*(xp: TxPoolRef;
# return the last one in the series # return the last one in the series
if not lastItem.isNil: if not lastItem.isNil:
result[1].add lastItem result.topItems.add lastItem
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# End # End

View File

@ -109,6 +109,16 @@ proc disposeItemAndHigherNonces*(xp: TxPoolRef; item: TxItemRef;
if xp.txDB.dispose(otherItem, otherReason): if xp.txDB.dispose(otherItem, otherReason):
result.inc result.inc
proc disposeById*(xp: TxPoolRef; itemIDs: openArray[Hash256]; reason: TxInfo)
{.gcsafe,raises: [Defect,KeyError].}=
## Dispose items by item ID wihtout checking whether this makes other items
## unusable (e.g. with higher nonces for the same sender.)
for itemID in itemIDs:
let rcItem = xp.txDB.byItemID.eq(itemID)
if rcItem.isOK:
discard xp.txDB.dispose(rcItem.value, reason)
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# End # End
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------

View File

@ -35,16 +35,22 @@ let
# Public functions # Public functions
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc recoverItem*(xp: TxPoolRef; tx: Transaction; proc recoverItem*(xp: TxPoolRef; tx: Transaction; status = txItemPending;
status = txItemPending; info = ""): Result[TxItemRef,TxInfo] info = ""; acceptExisting = false): Result[TxItemRef,TxInfo]
{.gcsafe,raises: [Defect,CatchableError].} = {.gcsafe,raises: [Defect,CatchableError].} =
## Recover item from waste basket or create new. It is an error if the item ## Recover item from waste basket or create new. It is an error if the item
## is in the buckets database, already. ## is in the buckets database, already.
##
## If thy argument `acceptExisting` is set `true` and the tx item is in the
## bucket database already for any bucket, the fuction successds ok.
let itemID = tx.itemID let itemID = tx.itemID
# Test whether the item is in the database, already # Test whether the item is in the database, already
if xp.txDB.byItemID.hasKey(itemID): if xp.txDB.byItemID.hasKey(itemID):
return err(txInfoErrAlreadyKnown) if acceptExisting:
return ok(xp.txDB.byItemID.eq(itemID).value)
else:
return err(txInfoErrAlreadyKnown)
# Check whether the tx can be re-cycled from waste basket # Check whether the tx can be re-cycled from waste basket
block: block:

View File

@ -216,40 +216,6 @@ proc runTxLoader(noisy = true; capture = loadSpecs) =
check 0.GasPrice <= minGasPrice check 0.GasPrice <= minGasPrice
check minGasPrice <= maxGasPrice check minGasPrice <= maxGasPrice
test &"Concurrent job processing example":
var log = ""
# This test does not verify anything but rather shows how the pool
# primitives could be used in an async context.
proc delayJob(xp: TxPoolRef; waitMs: int) {.async.} =
let n = xp.nJobs
xp.job(TxJobDataRef(kind: txJobNone))
xp.job(TxJobDataRef(kind: txJobNone))
xp.job(TxJobDataRef(kind: txJobNone))
log &= " wait-" & $waitMs & "-" & $(xp.nJobs - n)
await chronos.milliseconds(waitMs).sleepAsync
xp.jobCommit
log &= " done-" & $waitMs
# run async jobs, completion should be sorted by timeout argument
proc runJobs(xp: TxPoolRef) {.async.} =
let
p1 = xp.delayJob(900)
p2 = xp.delayJob(1)
p3 = xp.delayJob(700)
await p3
await p2
await p1
waitFor xp.runJobs
check xp.nJobs == 0
check log == " wait-900-3 wait-1-3 wait-700-3 done-1 done-700 done-900"
# Cannot rely on boundary conditions regarding nonces. So xp.verify()
# will not work here => xp.txDB.verify()
check xp.txDB.verify.isOK
proc runTxPoolTests(noisy = true) = proc runTxPoolTests(noisy = true) =
let elapNoisy = false let elapNoisy = false
@ -387,16 +353,19 @@ proc runTxPoolTests(noisy = true) =
test &"Auto delete about {nItems} expired txs out of {xq.nItems.total}": test &"Auto delete about {nItems} expired txs out of {xq.nItems.total}":
# Make sure that the test did not collapse
check 0 < nItems check 0 < nItems
xq.lifeTime = getTime() - gap
xq.flags = xq.flags + {autoZombifyPacked}
# evict and pick items from the wastbasket # evict and pick items from the wastbasket
let let
disposedBase = xq.nItems.disposed disposedBase = xq.nItems.disposed
evictedBase = evictionMeter.value evictedBase = evictionMeter.value
impliedBase = impliedEvictionMeter.value impliedBase = impliedEvictionMeter.value
xq.lifeTime = getTime() - gap
xq.flags = xq.flags + {autoZombifyPacked}
xq.jobCommit(true) xq.jobCommit(true)
let let
disposedItems = xq.nItems.disposed - disposedBase disposedItems = xq.nItems.disposed - disposedBase
evictedItems = (evictionMeter.value - evictedBase).int evictedItems = (evictionMeter.value - evictedBase).int

View File

@ -300,7 +300,6 @@ proc runTxHeadDelta*(noisy = true) =
# In this particular case, these differences will simply flush the # In this particular case, these differences will simply flush the
# packer bucket. # packer bucket.
check xp.jobDeltaTxsHead(blk.header) check xp.jobDeltaTxsHead(blk.header)
check xp.nJobs == 1
# Move TxPool chain head to new chain head and apply delta jobs # Move TxPool chain head to new chain head and apply delta jobs
xp.head = blk.header xp.head = blk.header