Jordan/remove unit tests txpool legacy stuff (#1048)

* Update/clarify docu

* Remove legacy stuff for unit tests
This commit is contained in:
Jordan Hrycaj 2022-04-08 15:05:30 +01:00 committed by GitHub
parent fd6fcc6cc4
commit ded38128b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 65 additions and 87 deletions

View File

@ -108,15 +108,11 @@
## state in addition to other cached information like the sender account.
##
##
## Batch Queue
## -----------
## The batch queue holds different types of jobs to be run later in a batch.
## When running at a time, all jobs are executed in *FIFO* mode until the queue
## is empty.
##
## New transactions
## ----------------
## When entering the pool, new transactions are bundled with meta data and
## appended to the batch queue. These bundles are called *item*. When the
## batch commits, items are forwarded to one of the following entites:
## appended to the batch queue. These bundles are called *items* which are
## forwarded to one of the following entites:
##
## * the *staged* bucket if the transaction is valid and match some constraints
## on expected minimum mining fees (or a semblance of that for *non-PoW*
@ -229,10 +225,7 @@
##
## Pool coding
## ===========
## The idea is that there are concurrent *async* instances feeding transactions
## into a batch queue via `jobAddTxs()`. The batch queue is then processed on
## demand not until `jobCommit()` is run. A piece of code using this pool
## architecture could look like as follows:
## A piece of code using this pool architecture could look like as follows:
## ::
## # see also unit test examples, e.g. "Block packer tests"
## var db: BaseChainDB # to be initialised
@ -259,14 +252,12 @@
##
## Discussion of example
## ---------------------
## In the example, transactions are collected via `jobAddTx()` and added to
## a batch of jobs to be processed at a time when considered right. The
## processing is initiated with the `jobCommit()` directive.
## In the example, transactions are processed into buckets via `add()`.
##
## The `ethBlock()` directive retrieves a new block for mining derived
## from the current pool state. It invokes the block packer whic accumulates
## txs from the `pending` buscket into the `packed` bucket which then go
## into the block.
## The `ethBlock()` directive assembles and retrieves a new block for mining
## derived from the current pool state. It invokes the block packer which
## accumulates txs from the `pending` buscket into the `packed` bucket which
## then go into the block.
##
## Then mining and signing takes place ...
##
@ -281,16 +272,13 @@
## chain branch which has become an uncle to the new canonical head retrieved
## by `getCanonicalHead()`. In order to update the pool to the very state
## one would have arrived if worked on the retrieved canonical head branch
## in the first place, the directive `jobDeltaTxsHead()` calculates the
## actions of what is needed to get just there from the locally cached head
## state of the pool. These actions are added by `jobDeltaTxsHead()` to the
## batch queue to be executed when it is time.
## in the first place, the directive `smartHead()` calculates the actions of
## what is needed to get just there from the locally cached head state of the
## pool. These actions are applied by `smartHead()` after the internal head
## position was moved.
##
## Then the locally cached block chain head is updated by setting a new
## `topHeader`. The *setter* behind this assignment also caches implied
## internal parameters as base fee, fork, etc. Only after the new chain head
## is set, the `jobCommit()` should be started to process the update actions
## (otherwise txs might be thrown out which could be used for packing.)
## The *setter* behind the internal head position adjustment also caches
## updated internal parameters as base fee, state, fork, etc.
##
##
## Adjustable Parameters
@ -485,7 +473,7 @@ logScope:
proc maintenanceProcessing(xp: TxPoolRef)
{.gcsafe,raises: [Defect,CatchableError].} =
## Tasks to be done after add/del job processing
## Tasks to be done after add/del txs processing
# Purge expired items
if autoZombifyUnpacked in xp.pFlags or
@ -531,14 +519,15 @@ proc new*(T: type TxPoolRef; db: BaseChainDB; miner: EthAddress): T
# core/tx_pool.go(864): func (pool *TxPool) AddRemotes(txs []..
proc add*(xp: TxPoolRef; txs: openArray[Transaction]; info = "")
{.gcsafe,raises: [Defect,CatchableError].} =
## Queues a batch of transactions jobs to be processed in due course (does
## not run `jobCommit()`.)
## Add a list of transactions to be processed and added to the buckets
## database. It is OK pass an empty list in which case some maintenance
## check can be forced.
##
## The argument Transactions `txs` may come in any order, they will be
## sorted by `<account,nonce>` before adding to the database with the
## least nonce first. For this reason, it is suggested to pass transactions
## in larger groups. Calling single transaction jobs, they must strictly be
## passed smaller nonce before larger nonce.
## passed *smaller nonce* before *larger nonce*.
xp.pDoubleCheckAdd xp.addTxs(txs, info).topItems
xp.maintenanceProcessing
@ -555,7 +544,7 @@ proc smartHead*(xp: TxPoolRef; pos: BlockHeader; blindMode = false): bool
## vmState) and ponts it to a now block on the chain.
##
## In standard mode when argument `blindMode` is `false`, it calculates the
## txs that beed to be added or deleted after moving the insertion point
## txs that need to be added or deleted after moving the insertion point
## head so that the tx-pool will not fail to re-insert quered txs that are
## on the chain, already. Neither will it loose any txs. After updating the
## the internal head cache, the previously calculated actions will be
@ -595,10 +584,9 @@ proc smartHead*(xp: TxPoolRef; pos: BlockHeader; blindMode = false): bool
proc triggerReorg*(xp: TxPoolRef)
{.gcsafe,raises: [Defect,CatchableError].} =
## This function triggers a bucket re-org action with the next job queue
## maintenance-processing (see `jobCommit()`) by setting the `dirtyBuckets`
## parameter. This re-org action eventually happens when the
## `autoUpdateBucketsDB` flag is also set.
## This function triggers a tentative bucket re-org action by setting the
## `dirtyBuckets` parameter. This re-org action eventually happens only if
## the `autoUpdateBucketsDB` flag is also set.
xp.pDirtyBuckets = true
xp.maintenanceProcessing
@ -722,7 +710,7 @@ proc `baseFee=`*(xp: TxPoolRef; val: GasPrice)
## Setter, sets `baseFee` explicitely witout triggering a packer update.
## Stil a database update might take place when updating account ranks.
##
## Typically, this function would *not* be called but rather the `head=`
## Typically, this function would *not* be called but rather the `smartHead()`
## update would be employed to do the job figuring out the proper value
## for the `baseFee`.
xp.txDB.baseFee = val
@ -737,7 +725,6 @@ proc `flags=`*(xp: TxPoolRef; val: set[TxPoolFlags])
{.gcsafe,raises: [Defect,CatchableError].} =
## Setter, strategy symbols for how to process items and buckets.
xp.pFlags = val
xp.maintenanceProcessing
proc `hwmMaxPercent=`*(xp: TxPoolRef; val: int) =
## Setter, `val` arguments outside `0..100` are ignored
@ -838,7 +825,7 @@ proc accountRanks*(xp: TxPoolRef): TxTabsLocality =
proc addRemote*(xp: TxPoolRef;
tx: Transaction; force = false): Result[void,TxInfo]
{.gcsafe,raises: [Defect,CatchableError].} =
## Adds the argument transaction `tx` to the job queue.
## Adds the argument transaction `tx` to the buckets database.
##
## If the argument `force` is set `false` and the sender account of the
## argument transaction is tagged local, this function returns with an error.
@ -872,7 +859,7 @@ proc addRemote*(xp: TxPoolRef;
proc addLocal*(xp: TxPoolRef;
tx: Transaction; force = false): Result[void,TxInfo]
{.gcsafe,raises: [Defect,CatchableError].} =
## Adds the argument transaction `tx` to the job queue.
## Adds the argument transaction `tx` to the buckets database.
##
## If the argument `force` is set `false` and the sender account of the
## argument transaction is _not_ tagged local, this function returns with

View File

@ -292,8 +292,7 @@ proc runTxPoolTests(noisy = true) =
# insert some txs
for triple in testTxs:
xq.jobAddTx(triple[1], triple[0].info)
xq.jobCommit
xq.add(triple[1], triple[0].info)
check xq.nItems.total == testTxs.len
check xq.nItems.disposed == 0
@ -302,8 +301,7 @@ proc runTxPoolTests(noisy = true) =
# re-insert modified transactions
for triple in testTxs:
xq.jobAddTx(triple[2], "alt " & triple[0].info)
xq.jobCommit
xq.add(triple[2], "alt " & triple[0].info)
check xq.nItems.total == testTxs.len
check xq.nItems.disposed == testTxs.len
@ -355,16 +353,19 @@ proc runTxPoolTests(noisy = true) =
# Make sure that the test did not collapse
check 0 < nItems
xq.lifeTime = getTime() - gap
xq.flags = xq.flags + {autoZombifyPacked}
# evict and pick items from the wastbasket
# Evict and pick items from the wastbasket
let
disposedBase = xq.nItems.disposed
evictedBase = evictionMeter.value
impliedBase = impliedEvictionMeter.value
xq.lifeTime = getTime() - gap
xq.flags = xq.flags + {autoZombifyPacked}
xq.jobCommit(true)
# Zombify the items that are older than the artificial time gap. The
# move to the waste basket takes place with the `xq.add()` directive
# (which is empty as there are no new txs.)
xq.add @[]
let
disposedItems = xq.nItems.disposed - disposedBase
@ -509,8 +510,7 @@ proc runTxPoolTests(noisy = true) =
check txList.len == xq.nItems.total + xq.nItems.disposed
# re-add item
xq.jobAddTx(thisItem.tx)
xq.jobCommit
xq.add(thisItem.tx)
# verify that the pivot item was moved out from the waste basket
check not xq.txDB.byRejects.hasKey(thisItem.itemID)
@ -602,7 +602,6 @@ proc runTxPackerTests(noisy = true) =
# be the same after re-org
xq.baseFee = ntNextFee
xq.triggerReorg
xq.jobCommit(forceMaintenance = true)
# now, xq should look like xr
check xq.verify.isOK
@ -630,7 +629,7 @@ proc runTxPackerTests(noisy = true) =
check xq.minPreLondonGasPrice == packPrice
# employ packer
xq.jobCommit(forceMaintenance = true)
# xq.jobCommit(forceMaintenance = true)
xq.packerVmExec
check xq.verify.isOK
@ -653,7 +652,7 @@ proc runTxPackerTests(noisy = true) =
check 0 < xq.nItems.packed
# re-pack bucket
xq.jobCommit(forceMaintenance = true)
#xq.jobCommit(forceMaintenance = true)
xq.packerVmExec
check xq.verify.isOK
@ -683,7 +682,7 @@ proc runTxPackerTests(noisy = true) =
# re-pack bucket, packer needs extra trigger because there is
# not necessarily a buckets re-org resulting in a change
xq.jobCommit(forceMaintenance = true)
#xq.jobCommit(forceMaintenance = true)
xq.packerVmExec
check xq.verify.isOK
@ -734,10 +733,7 @@ proc runTxPackerTests(noisy = true) =
&" {backTxs.len} txs, {nBackBlocks} blocks," &
&" {accLst.len} known accounts"
check xq.nJobs == 0 # want cleared job queue
check xq.jobDeltaTxsHead(backHeader) # set up tx diff jobs
xq.head = backHeader # move insertion point
xq.jobCommit # apply job diffs
check xq.smartHead(backHeader) # move insertion point
# make sure that all txs have been added to the pool
let nFailed = xq.nItems.disposed - stats.disposed
@ -874,9 +870,9 @@ proc txPoolMain*(noisy = defined(debug)) =
noisy.runTxLoader
noisy.runTxPoolTests
noisy.runTxPackerTests
runTxPoolCliqueTest()
runTxPoolPosTest()
noisy.runTxHeadDelta
#runTxPoolCliqueTest()
#runTxPoolPosTest()
#noisy.runTxHeadDelta
when isMainModule:
const

View File

@ -106,12 +106,11 @@ proc toTxPool*(
status = statusInfo[getStatus()]
info = &"{txCount} #{num}({chainNo}) {n}/{txs.len} {status}"
noisy.showElapsed(&"insert: {info}"):
result[0].jobAddTx(txs[n], info)
result[0].add(txs[n], info)
if loadTxs <= txCount:
break
result[0].jobCommit
result[1] = nTxs
@ -136,12 +135,11 @@ proc toTxPool*(
noisy.showElapsed(&"Loading {itList.len} transactions"):
for item in itList:
if noLocals:
result.jobAddTx(item.tx, item.info)
result.add(item.tx, item.info)
elif localAddr.hasKey(item.sender):
doAssert result.addLocal(item.tx, true).isOk
else:
doAssert result.addRemote(item.tx, true).isOk
result.jobCommit
doAssert result.nItems.total == itList.len
@ -172,26 +170,30 @@ proc toTxPool*(
let
delayAt = itList.len * itemsPC div 100
middleOfTimeGap = initDuration(milliSeconds = delayMSecs div 2)
const
tFmt = "yyyy-MM-dd'T'HH-mm-ss'.'fff"
noisy.showElapsed(&"Loading {itList.len} transactions"):
for n in 0 ..< itList.len:
let item = itList[n]
if noLocals:
result.jobAddTx(item.tx, item.info)
result.add(item.tx, item.info)
elif localAddr.hasKey(item.sender):
doAssert result.addLocal(item.tx, true).isOk
else:
doAssert result.addRemote(item.tx, true).isOk
if n < 3 or delayAt-3 <= n and n <= delayAt+3 or itList.len-4 < n:
let t = result.getItem(item.itemID).value.timeStamp.format(tFmt, utc())
noisy.say &"added item {n} time={t}"
if delayAt == n:
nGapItems = n # pass back value
noisy.say &"time gap after transactions"
let itemID = item.itemID
result.jobCommit
doAssert result.nItems.disposed == 0
timeGap = result.getItem(itemID).value.timeStamp + middleOfTimeGap
let t = timegap.format(tFmt, utc())
noisy.say &"{delayMSecs}ms time gap centered around {t}"
delayMSecs.sleep
result.jobCommit
doAssert result.nItems.total == itList.len
doAssert result.nItems.disposed == 0

View File

@ -139,7 +139,6 @@ proc runTxPoolCliqueTest*() =
return
test "TxPool jobCommit":
xp.jobCommit()
check xp.nItems.total == 1
test "TxPool ethBlock":
@ -193,7 +192,6 @@ proc runTxPoolPosTest*() =
return
test "TxPool jobCommit":
xp.jobCommit()
check xp.nItems.total == 1
test "TxPool ethBlock":
@ -253,18 +251,14 @@ proc runTxHeadDelta*(noisy = true) =
# setTraceLevel()
block processBlocks:
block:
for n in 0..<numBlocks:
for tn in 0..<txPerblock:
let tx = env.makeTx(recipient, amount)
let res = xp.addLocal(tx, force = true)
if res.isErr:
noisy.say "***", "loading blocks failed",
" error=", res.error
break processBlocks
xp.jobCommit()
# Instead of `add()`, the functions `addRemote()` or `addLocal()`
# also would do.
xp.add(tx)
noisy.say "***", "txDB",
&" n={n}",
@ -289,7 +283,7 @@ proc runTxHeadDelta*(noisy = true) =
# Commit to block chain
check chain.persistBlocks([blk.header], [body]).isOk
# I not for other reason, setting head is irrelevant for this test
# If not for other reason, setting head is irrelevant for this test
#
# # PoS block canonical head must be explicitly set using setHead.
# # The function `persistHeaderToDb()` used in `persistBlocks()`
@ -299,11 +293,9 @@ proc runTxHeadDelta*(noisy = true) =
# Synchronise TxPool against new chain head, register txs differences.
# In this particular case, these differences will simply flush the
# packer bucket.
check xp.jobDeltaTxsHead(blk.header)
check xp.smartHead(blk.header)
# Move TxPool chain head to new chain head and apply delta jobs
xp.head = blk.header
xp.jobCommit()
check xp.nItems.staged == 0
check xp.nItems.packed == 0
@ -325,7 +317,8 @@ when isMainModule:
setErrorLevel() # mute logger
#runTxPoolCliqueTest()
#runTxPoolPosTest()
runTxPoolCliqueTest()
runTxPoolPosTest()
noisy.runTxHeadDelta
true.runTxHeadDelta
# End