diff --git a/nimbus/utils/tx_pool.nim b/nimbus/utils/tx_pool.nim index 56db71af6..b399b99e7 100644 --- a/nimbus/utils/tx_pool.nim +++ b/nimbus/utils/tx_pool.nim @@ -108,15 +108,11 @@ ## state in addition to other cached information like the sender account. ## ## -## Batch Queue -## ----------- -## The batch queue holds different types of jobs to be run later in a batch. -## When running at a time, all jobs are executed in *FIFO* mode until the queue -## is empty. -## +## New transactions +## ---------------- ## When entering the pool, new transactions are bundled with meta data and -## appended to the batch queue. These bundles are called *item*. When the -## batch commits, items are forwarded to one of the following entites: +## appended to the batch queue. These bundles are called *items* which are +## forwarded to one of the following entites: ## ## * the *staged* bucket if the transaction is valid and match some constraints ## on expected minimum mining fees (or a semblance of that for *non-PoW* @@ -229,10 +225,7 @@ ## ## Pool coding ## =========== -## The idea is that there are concurrent *async* instances feeding transactions -## into a batch queue via `jobAddTxs()`. The batch queue is then processed on -## demand not until `jobCommit()` is run. A piece of code using this pool -## architecture could look like as follows: +## A piece of code using this pool architecture could look like as follows: ## :: ## # see also unit test examples, e.g. "Block packer tests" ## var db: BaseChainDB # to be initialised @@ -259,14 +252,12 @@ ## ## Discussion of example ## --------------------- -## In the example, transactions are collected via `jobAddTx()` and added to -## a batch of jobs to be processed at a time when considered right. The -## processing is initiated with the `jobCommit()` directive. +## In the example, transactions are processed into buckets via `add()`. ## -## The `ethBlock()` directive retrieves a new block for mining derived -## from the current pool state. It invokes the block packer whic accumulates -## txs from the `pending` buscket into the `packed` bucket which then go -## into the block. +## The `ethBlock()` directive assembles and retrieves a new block for mining +## derived from the current pool state. It invokes the block packer which +## accumulates txs from the `pending` buscket into the `packed` bucket which +## then go into the block. ## ## Then mining and signing takes place ... ## @@ -281,16 +272,13 @@ ## chain branch which has become an uncle to the new canonical head retrieved ## by `getCanonicalHead()`. In order to update the pool to the very state ## one would have arrived if worked on the retrieved canonical head branch -## in the first place, the directive `jobDeltaTxsHead()` calculates the -## actions of what is needed to get just there from the locally cached head -## state of the pool. These actions are added by `jobDeltaTxsHead()` to the -## batch queue to be executed when it is time. +## in the first place, the directive `smartHead()` calculates the actions of +## what is needed to get just there from the locally cached head state of the +## pool. These actions are applied by `smartHead()` after the internal head +## position was moved. ## -## Then the locally cached block chain head is updated by setting a new -## `topHeader`. The *setter* behind this assignment also caches implied -## internal parameters as base fee, fork, etc. Only after the new chain head -## is set, the `jobCommit()` should be started to process the update actions -## (otherwise txs might be thrown out which could be used for packing.) +## The *setter* behind the internal head position adjustment also caches +## updated internal parameters as base fee, state, fork, etc. ## ## ## Adjustable Parameters @@ -485,7 +473,7 @@ logScope: proc maintenanceProcessing(xp: TxPoolRef) {.gcsafe,raises: [Defect,CatchableError].} = - ## Tasks to be done after add/del job processing + ## Tasks to be done after add/del txs processing # Purge expired items if autoZombifyUnpacked in xp.pFlags or @@ -531,14 +519,15 @@ proc new*(T: type TxPoolRef; db: BaseChainDB; miner: EthAddress): T # core/tx_pool.go(864): func (pool *TxPool) AddRemotes(txs [].. proc add*(xp: TxPoolRef; txs: openArray[Transaction]; info = "") {.gcsafe,raises: [Defect,CatchableError].} = - ## Queues a batch of transactions jobs to be processed in due course (does - ## not run `jobCommit()`.) + ## Add a list of transactions to be processed and added to the buckets + ## database. It is OK pass an empty list in which case some maintenance + ## check can be forced. ## ## The argument Transactions `txs` may come in any order, they will be ## sorted by `` before adding to the database with the ## least nonce first. For this reason, it is suggested to pass transactions ## in larger groups. Calling single transaction jobs, they must strictly be - ## passed smaller nonce before larger nonce. + ## passed *smaller nonce* before *larger nonce*. xp.pDoubleCheckAdd xp.addTxs(txs, info).topItems xp.maintenanceProcessing @@ -555,7 +544,7 @@ proc smartHead*(xp: TxPoolRef; pos: BlockHeader; blindMode = false): bool ## vmState) and ponts it to a now block on the chain. ## ## In standard mode when argument `blindMode` is `false`, it calculates the - ## txs that beed to be added or deleted after moving the insertion point + ## txs that need to be added or deleted after moving the insertion point ## head so that the tx-pool will not fail to re-insert quered txs that are ## on the chain, already. Neither will it loose any txs. After updating the ## the internal head cache, the previously calculated actions will be @@ -595,10 +584,9 @@ proc smartHead*(xp: TxPoolRef; pos: BlockHeader; blindMode = false): bool proc triggerReorg*(xp: TxPoolRef) {.gcsafe,raises: [Defect,CatchableError].} = - ## This function triggers a bucket re-org action with the next job queue - ## maintenance-processing (see `jobCommit()`) by setting the `dirtyBuckets` - ## parameter. This re-org action eventually happens when the - ## `autoUpdateBucketsDB` flag is also set. + ## This function triggers a tentative bucket re-org action by setting the + ## `dirtyBuckets` parameter. This re-org action eventually happens only if + ## the `autoUpdateBucketsDB` flag is also set. xp.pDirtyBuckets = true xp.maintenanceProcessing @@ -722,7 +710,7 @@ proc `baseFee=`*(xp: TxPoolRef; val: GasPrice) ## Setter, sets `baseFee` explicitely witout triggering a packer update. ## Stil a database update might take place when updating account ranks. ## - ## Typically, this function would *not* be called but rather the `head=` + ## Typically, this function would *not* be called but rather the `smartHead()` ## update would be employed to do the job figuring out the proper value ## for the `baseFee`. xp.txDB.baseFee = val @@ -737,7 +725,6 @@ proc `flags=`*(xp: TxPoolRef; val: set[TxPoolFlags]) {.gcsafe,raises: [Defect,CatchableError].} = ## Setter, strategy symbols for how to process items and buckets. xp.pFlags = val - xp.maintenanceProcessing proc `hwmMaxPercent=`*(xp: TxPoolRef; val: int) = ## Setter, `val` arguments outside `0..100` are ignored @@ -838,7 +825,7 @@ proc accountRanks*(xp: TxPoolRef): TxTabsLocality = proc addRemote*(xp: TxPoolRef; tx: Transaction; force = false): Result[void,TxInfo] {.gcsafe,raises: [Defect,CatchableError].} = - ## Adds the argument transaction `tx` to the job queue. + ## Adds the argument transaction `tx` to the buckets database. ## ## If the argument `force` is set `false` and the sender account of the ## argument transaction is tagged local, this function returns with an error. @@ -872,7 +859,7 @@ proc addRemote*(xp: TxPoolRef; proc addLocal*(xp: TxPoolRef; tx: Transaction; force = false): Result[void,TxInfo] {.gcsafe,raises: [Defect,CatchableError].} = - ## Adds the argument transaction `tx` to the job queue. + ## Adds the argument transaction `tx` to the buckets database. ## ## If the argument `force` is set `false` and the sender account of the ## argument transaction is _not_ tagged local, this function returns with diff --git a/tests/test_txpool.nim b/tests/test_txpool.nim index fec534e15..9ea5acfd7 100644 --- a/tests/test_txpool.nim +++ b/tests/test_txpool.nim @@ -292,8 +292,7 @@ proc runTxPoolTests(noisy = true) = # insert some txs for triple in testTxs: - xq.jobAddTx(triple[1], triple[0].info) - xq.jobCommit + xq.add(triple[1], triple[0].info) check xq.nItems.total == testTxs.len check xq.nItems.disposed == 0 @@ -302,8 +301,7 @@ proc runTxPoolTests(noisy = true) = # re-insert modified transactions for triple in testTxs: - xq.jobAddTx(triple[2], "alt " & triple[0].info) - xq.jobCommit + xq.add(triple[2], "alt " & triple[0].info) check xq.nItems.total == testTxs.len check xq.nItems.disposed == testTxs.len @@ -355,16 +353,19 @@ proc runTxPoolTests(noisy = true) = # Make sure that the test did not collapse check 0 < nItems + xq.lifeTime = getTime() - gap + xq.flags = xq.flags + {autoZombifyPacked} - # evict and pick items from the wastbasket + # Evict and pick items from the wastbasket let disposedBase = xq.nItems.disposed evictedBase = evictionMeter.value impliedBase = impliedEvictionMeter.value - xq.lifeTime = getTime() - gap - xq.flags = xq.flags + {autoZombifyPacked} - xq.jobCommit(true) + # Zombify the items that are older than the artificial time gap. The + # move to the waste basket takes place with the `xq.add()` directive + # (which is empty as there are no new txs.) + xq.add @[] let disposedItems = xq.nItems.disposed - disposedBase @@ -509,8 +510,7 @@ proc runTxPoolTests(noisy = true) = check txList.len == xq.nItems.total + xq.nItems.disposed # re-add item - xq.jobAddTx(thisItem.tx) - xq.jobCommit + xq.add(thisItem.tx) # verify that the pivot item was moved out from the waste basket check not xq.txDB.byRejects.hasKey(thisItem.itemID) @@ -602,7 +602,6 @@ proc runTxPackerTests(noisy = true) = # be the same after re-org xq.baseFee = ntNextFee xq.triggerReorg - xq.jobCommit(forceMaintenance = true) # now, xq should look like xr check xq.verify.isOK @@ -630,7 +629,7 @@ proc runTxPackerTests(noisy = true) = check xq.minPreLondonGasPrice == packPrice # employ packer - xq.jobCommit(forceMaintenance = true) + # xq.jobCommit(forceMaintenance = true) xq.packerVmExec check xq.verify.isOK @@ -653,7 +652,7 @@ proc runTxPackerTests(noisy = true) = check 0 < xq.nItems.packed # re-pack bucket - xq.jobCommit(forceMaintenance = true) + #xq.jobCommit(forceMaintenance = true) xq.packerVmExec check xq.verify.isOK @@ -683,7 +682,7 @@ proc runTxPackerTests(noisy = true) = # re-pack bucket, packer needs extra trigger because there is # not necessarily a buckets re-org resulting in a change - xq.jobCommit(forceMaintenance = true) + #xq.jobCommit(forceMaintenance = true) xq.packerVmExec check xq.verify.isOK @@ -734,10 +733,7 @@ proc runTxPackerTests(noisy = true) = &" {backTxs.len} txs, {nBackBlocks} blocks," & &" {accLst.len} known accounts" - check xq.nJobs == 0 # want cleared job queue - check xq.jobDeltaTxsHead(backHeader) # set up tx diff jobs - xq.head = backHeader # move insertion point - xq.jobCommit # apply job diffs + check xq.smartHead(backHeader) # move insertion point # make sure that all txs have been added to the pool let nFailed = xq.nItems.disposed - stats.disposed @@ -874,9 +870,9 @@ proc txPoolMain*(noisy = defined(debug)) = noisy.runTxLoader noisy.runTxPoolTests noisy.runTxPackerTests - runTxPoolCliqueTest() - runTxPoolPosTest() - noisy.runTxHeadDelta + #runTxPoolCliqueTest() + #runTxPoolPosTest() + #noisy.runTxHeadDelta when isMainModule: const diff --git a/tests/test_txpool/setup.nim b/tests/test_txpool/setup.nim index ca1dc5a3a..41d7d1a10 100644 --- a/tests/test_txpool/setup.nim +++ b/tests/test_txpool/setup.nim @@ -106,12 +106,11 @@ proc toTxPool*( status = statusInfo[getStatus()] info = &"{txCount} #{num}({chainNo}) {n}/{txs.len} {status}" noisy.showElapsed(&"insert: {info}"): - result[0].jobAddTx(txs[n], info) + result[0].add(txs[n], info) if loadTxs <= txCount: break - result[0].jobCommit result[1] = nTxs @@ -136,12 +135,11 @@ proc toTxPool*( noisy.showElapsed(&"Loading {itList.len} transactions"): for item in itList: if noLocals: - result.jobAddTx(item.tx, item.info) + result.add(item.tx, item.info) elif localAddr.hasKey(item.sender): doAssert result.addLocal(item.tx, true).isOk else: doAssert result.addRemote(item.tx, true).isOk - result.jobCommit doAssert result.nItems.total == itList.len @@ -172,26 +170,30 @@ proc toTxPool*( let delayAt = itList.len * itemsPC div 100 middleOfTimeGap = initDuration(milliSeconds = delayMSecs div 2) + const + tFmt = "yyyy-MM-dd'T'HH-mm-ss'.'fff" noisy.showElapsed(&"Loading {itList.len} transactions"): for n in 0 ..< itList.len: let item = itList[n] if noLocals: - result.jobAddTx(item.tx, item.info) + result.add(item.tx, item.info) elif localAddr.hasKey(item.sender): doAssert result.addLocal(item.tx, true).isOk else: doAssert result.addRemote(item.tx, true).isOk + if n < 3 or delayAt-3 <= n and n <= delayAt+3 or itList.len-4 < n: + let t = result.getItem(item.itemID).value.timeStamp.format(tFmt, utc()) + noisy.say &"added item {n} time={t}" if delayAt == n: nGapItems = n # pass back value - noisy.say &"time gap after transactions" let itemID = item.itemID - result.jobCommit doAssert result.nItems.disposed == 0 timeGap = result.getItem(itemID).value.timeStamp + middleOfTimeGap + let t = timegap.format(tFmt, utc()) + noisy.say &"{delayMSecs}ms time gap centered around {t}" delayMSecs.sleep - result.jobCommit doAssert result.nItems.total == itList.len doAssert result.nItems.disposed == 0 diff --git a/tests/test_txpool2.nim b/tests/test_txpool2.nim index 2d874a6e1..ae0215efe 100644 --- a/tests/test_txpool2.nim +++ b/tests/test_txpool2.nim @@ -139,7 +139,6 @@ proc runTxPoolCliqueTest*() = return test "TxPool jobCommit": - xp.jobCommit() check xp.nItems.total == 1 test "TxPool ethBlock": @@ -193,7 +192,6 @@ proc runTxPoolPosTest*() = return test "TxPool jobCommit": - xp.jobCommit() check xp.nItems.total == 1 test "TxPool ethBlock": @@ -253,18 +251,14 @@ proc runTxHeadDelta*(noisy = true) = # setTraceLevel() - block processBlocks: + block: for n in 0..