From 2eb3f414d86cf9d4645d39cd770485c90c988ce5 Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Thu, 31 Mar 2022 09:21:36 +0100 Subject: [PATCH] Support for local accounts (#1019) * Support for local accounts why: Accounts tagged local will be packed with priority over untagged accounts * Added functions for queuing txs and simultaneously setting account locality why: Might be a popular task, in particular for unconditionally adding txs to a local (aka prioritised account) via "xp.addLocal(tx,true)" caveat: Untested yet * fix typo * backup * No baseFee for pre-London tx in verifier why: The packer would wrongly discard valid legacy txs. --- nimbus/utils/tx_pool.nim | 100 ++++++++++++++---- nimbus/utils/tx_pool/tx_info.nim | 13 +++ nimbus/utils/tx_pool/tx_tabs.nim | 27 ++++- nimbus/utils/tx_pool/tx_tasks/tx_classify.nim | 12 ++- nimbus/utils/tx_pool/tx_tasks/tx_packer.nim | 14 ++- nimbus/utils/tx_pool/tx_tasks/tx_recover.nim | 2 +- tests/replay/pp.nim | 4 + tests/test_txpool.nim | 19 +++- tests/test_txpool/setup.nim | 57 ++++++---- 9 files changed, 193 insertions(+), 55 deletions(-) diff --git a/nimbus/utils/tx_pool.nim b/nimbus/utils/tx_pool.nim index bd6e4ad7b..cdda4e7ce 100644 --- a/nimbus/utils/tx_pool.nim +++ b/nimbus/utils/tx_pool.nim @@ -10,10 +10,6 @@ ## TODO: ## ===== -## * Support `local` accounts the txs of which would be prioritised. This is -## currently unsupported. For now, all txs are considered from `remote` -## accounts. -## ## * No uncles are handled by this pool ## ## * Impose a size limit to the bucket database. Which items would be removed? @@ -446,7 +442,13 @@ import ../db/db_chain, ./tx_pool/[tx_chain, tx_desc, tx_info, tx_item, tx_job], ./tx_pool/tx_tabs, - ./tx_pool/tx_tasks/[tx_add, tx_bucket, tx_head, tx_dispose, tx_packer], + ./tx_pool/tx_tasks/[ + tx_add, + tx_bucket, + tx_head, + tx_dispose, + tx_packer, + tx_recover], chronicles, eth/[common, keys], stew/[keyed_queue, results], @@ -478,7 +480,9 @@ export tx_item.sender, tx_item.status, tx_item.timeStamp, - tx_item.tx + tx_item.tx, + tx_tabs.local, + tx_tabs.remote {.push raises: [Defect].} @@ -845,30 +849,80 @@ proc disposeItems*(xp: TxPoolRef; item: TxItemRef; xp.disposeItemAndHigherNonces(item, reason, otherReason) # ------------------------------------------------------------------------------ -# Public functions, more immediate actions deemed not so important yet +# Public functions, local/remote accounts # ------------------------------------------------------------------------------ -#[ +proc setLocal*(xp: TxPoolRef; account: EthAddress) = + ## Tag argument `account` local which means that the transactions from this + ## account -- together with all other local accounts -- will be considered + ## first for packing. + xp.txDB.setLocal(account) -# core/tx_pool.go(561): func (pool *TxPool) Locals() []common.Address { -proc getAccounts*(xp: TxPoolRef; local: bool): seq[EthAddress] +proc resLocal*(xp: TxPoolRef; account: EthAddress) = + ## Untag argument `account` as local which means that the transactions from + ## this account -- together with all other untagged accounts -- will be + ## considered for packing after the locally tagged accounts. + xp.txDB.resLocal(account) + +proc accountRanks*(xp: TxPoolRef): TxTabsLocality = + ## Returns two lists, one for local and the other for non-local accounts. + ## Any of these lists is sorted by the highest rank first. This sorting + ## means that the order may be out-dated after adding transactions. + xp.txDB.locality + +proc addRemote*(xp: TxPoolRef; + tx: Transaction; force = false): Result[void,TxInfo] {.gcsafe,raises: [Defect,CatchableError].} = - ## Retrieves the accounts currently considered `local` or `remote` (i.e. - ## the have txs of that kind) destaged on request arguments. - if local: - result = xp.txDB.locals - else: - result = xp.txDB.remotes + ## Adds the argument transaction `tx` to the job queue. + ## + ## If the argument `force` is set `false` and the sender account of the + ## argument transaction is tagged local, this function returns with an error. + ## If the argument `force` is set `true`, the sender account will be untagged, + ## i.e. made non-local. + # Create or recover new item + let rc = xp.recoverItem(tx, txItemPending, "remote tx peek") + if rc.isErr: + return err(rc.error) -# core/tx_pool.go(1797): func (t *txLookup) RemoteToLocals(locals .. -proc remoteToLocals*(xp: TxPoolRef; signer: EthAddress): int + # Temporarily stash the item in the rubbish bin to be recovered, later + let sender = rc.value.sender + discard xp.txDB.dispose(rc.value, txInfoTxStashed) + + # Verify local/remote account + if force: + xp.txDB.resLocal(sender) + elif xp.txDB.isLocal(sender): + return err(txInfoTxErrorRemoteExpected) + + xp.jobAddTx(tx, "remote tx") + ok() + +proc addLocal*(xp: TxPoolRef; + tx: Transaction; force = false): Result[void,TxInfo] {.gcsafe,raises: [Defect,CatchableError].} = - ## For given account, remote transactions are migrated to local transactions. - ## The function returns the number of transactions migrated. - xp.txDB.setLocal(signer) - xp.txDB.bySender.eq(signer).nItems + ## Adds the argument transaction `tx` to the job queue. + ## + ## If the argument `force` is set `false` and the sender account of the + ## argument transaction is _not_ tagged local, this function returns with + ## an error. If the argument `force` is set `true`, the sender account will + ## be tagged local. + # Create or recover new item + let rc = xp.recoverItem(tx, txItemPending, "local tx peek") + if rc.isErr: + return err(rc.error) -]# + # Temporarily stash the item in the rubbish bin to be recovered, later + let sender = rc.value.sender + discard xp.txDB.dispose(rc.value, txInfoTxStashed) + + # Verify local/remote account + if force: + xp.txDB.setLocal(sender) + elif not xp.txDB.isLocal(sender): + return err(txInfoTxErrorLocalExpected) + + xp.jobAddTx(tx, "local tx") + ok() # ------------------------------------------------------------------------------ # End diff --git a/nimbus/utils/tx_pool/tx_info.nim b/nimbus/utils/tx_pool/tx_info.nim index 77b7628af..1b933d56f 100644 --- a/nimbus/utils/tx_pool/tx_info.nim +++ b/nimbus/utils/tx_pool/tx_info.nim @@ -143,6 +143,19 @@ type ## tx was removed. "Tx expired implied" + txInfoTxStashed = ##\ + ## A transaction was item was created and stored in the disposal bin + ## to be recycled and processed later. + "Tx stashed" + + txInfoTxErrorRemoteExpected = ##\ + ## The sender account of a transaction was expected non-local. + "Tx non-local expected" + + txInfoTxErrorLocalExpected = ##\ + ## The sender account of a transaction was expected local. + "Tx local expected" + # ------- update/move block chain head ------------------------------------- txInfoErrAncestorMissing = ##\ diff --git a/nimbus/utils/tx_pool/tx_tabs.nim b/nimbus/utils/tx_pool/tx_tabs.nim index 3f8f146cf..8307c9c44 100644 --- a/nimbus/utils/tx_pool/tx_tabs.nim +++ b/nimbus/utils/tx_pool/tx_tabs.nim @@ -35,6 +35,14 @@ type TxTabsGasTotals* = tuple pending, staged, packed: GasInt ## sum => total + TxTabsLocality* = object ##\ + ## Return value for `locality()` function + local: seq[EthAddress] ##\ + ## List of local accounts, higest rank first + + remote: seq[EthAddress] ##\ + ## List of non-local accounts, higest rank first + TxTabsRef* = ref object ##\ ## Base descriptor maxRejects: int ##\ @@ -253,6 +261,14 @@ proc maxRejects*(xp: TxTabsRef): int = ## Getter xp.maxRejects +proc local*(lc: TxTabsLocality): seq[EthAddress] = + ## Getter + lc.local + +proc remote*(lc: TxTabsLocality): seq[EthAddress] = + ## Getter + lc.remote + # ------------------------------------------------------------------------------ # Public functions, setters # ------------------------------------------------------------------------------ @@ -310,14 +326,17 @@ proc locals*(xp: TxTabsRef): seq[EthAddress] = ## Returns an unsorted list of addresses tagged *local* toSeq(xp.byLocal.keys) -proc remotes*(xp: TxTabsRef): seq[EthAddress] = - ## Returns an sorted list of untagged addresses, highest address rank first +proc locality*(xp: TxTabsRef): TxTabsLocality = + ## Returns a pair of sorted lists of account addresses, + ## highest address rank first var rcRank = xp.byRank.le(TxRank.high) while rcRank.isOK: let (rank, addrList) = (rcRank.value.key, rcRank.value.data) for account in addrList.keys: - if not xp.byLocal.hasKey(account): - result.add account + if xp.byLocal.hasKey(account): + result.local.add account + else: + result.remote.add account rcRank = xp.byRank.lt(rank) proc setLocal*(xp: TxTabsRef; sender: EthAddress) = diff --git a/nimbus/utils/tx_pool/tx_tasks/tx_classify.nim b/nimbus/utils/tx_pool/tx_tasks/tx_classify.nim index 0fb2abdde..97500ac3b 100644 --- a/nimbus/utils/tx_pool/tx_tasks/tx_classify.nim +++ b/nimbus/utils/tx_pool/tx_tasks/tx_classify.nim @@ -86,6 +86,12 @@ proc checkTxNonce(xp: TxPoolRef; item: TxItemRef): bool true +proc eip1559TxNormalization(tx: Transaction): Transaction = + result = tx + if tx.txType < TxEip1559: + result.maxPriorityFee = tx.gasPrice + result.maxFee = tx.gasPrice + # ------------------------------------------------------------------------------ # Private function: active tx classifier check helpers # ------------------------------------------------------------------------------ @@ -233,7 +239,11 @@ proc classifyValidatePacked*(xp: TxPoolRef; else: xp.chain.limits.trgLimit - roDB.validateTransaction(item.tx, item.sender, gasLimit, baseFee, fork) + var tx = item.tx.eip1559TxNormalization + if FkLondon <= fork: + tx.gasPrice = min(tx.maxPriorityFee + baseFee.truncate(int64), tx.maxFee) + + roDB.validateTransaction(tx, item.sender, gasLimit, baseFee, fork) proc classifyPacked*(xp: TxPoolRef; gasBurned, moreBurned: GasInt): bool = ## Classifier for *packing* (i.e. adding up `gasUsed` values after executing diff --git a/nimbus/utils/tx_pool/tx_tasks/tx_packer.nim b/nimbus/utils/tx_pool/tx_tasks/tx_packer.nim index b0bae3ec4..9b39a9a75 100644 --- a/nimbus/utils/tx_pool/tx_tasks/tx_packer.nim +++ b/nimbus/utils/tx_pool/tx_tasks/tx_packer.nim @@ -252,6 +252,18 @@ proc vmExecCommit(pst: TxPackerStateRef) xp.chain.profit = balanceDelta() xp.chain.reward = balanceDelta() +iterator rankedAccounts(xp: TxPoolRef): TxStatusNonceRef + {.gcsafe,raises: [Defect,KeyError].} = + ## Loop over staged accounts ordered by + ## + local ranks, higest one first + ## + remote ranks, higest one first + for (account,nonceList) in xp.txDB.decAccount(txItemStaged): + if xp.txDB.isLocal(account): + yield nonceList + for (account,nonceList) in xp.txDB.decAccount(txItemStaged): + if not xp.txDB.isLocal(account): + yield nonceList + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ @@ -265,7 +277,7 @@ proc packerVmExec*(xp: TxPoolRef) {.gcsafe,raises: [Defect,CatchableError].} = var pst = xp.vmExecInit block loop: - for (_,nonceList) in pst.xp.txDB.decAccount(txItemStaged): + for nonceList in pst.xp.rankedAccounts: block account: for item in nonceList.incNonce: diff --git a/nimbus/utils/tx_pool/tx_tasks/tx_recover.nim b/nimbus/utils/tx_pool/tx_tasks/tx_recover.nim index 2a02c3a1a..9b8f07968 100644 --- a/nimbus/utils/tx_pool/tx_tasks/tx_recover.nim +++ b/nimbus/utils/tx_pool/tx_tasks/tx_recover.nim @@ -35,7 +35,7 @@ let # Public functions # ------------------------------------------------------------------------------ -proc recoverItem*(xp: TxPoolRef; tx: var Transaction; +proc recoverItem*(xp: TxPoolRef; tx: Transaction; status = txItemPending; info = ""): Result[TxItemRef,TxInfo] {.gcsafe,raises: [Defect,CatchableError].} = ## Recover item from waste basket or create new. It is an error if the item diff --git a/tests/replay/pp.nim b/tests/replay/pp.nim index 66a644440..f810da13e 100644 --- a/tests/replay/pp.nim +++ b/tests/replay/pp.nim @@ -1,3 +1,4 @@ + # Nimbus # Copyright (c) 2018-2019 Status Research & Development GmbH # Licensed under either of @@ -30,6 +31,9 @@ proc pp*(b: Blob): string = proc pp*(a: EthAddress): string = a.mapIt(it.toHex(2)).join[32 .. 39].toLowerAscii +proc pp*(a: openArray[EthAddress]): string = + "[" & a.mapIt(it.pp).join(" ") & "]" + proc pp*(a: BlockNonce): string = a.mapIt(it.toHex(2)).join.toLowerAscii diff --git a/tests/test_txpool.nim b/tests/test_txpool.nim index 2c5a80fae..9942e9379 100644 --- a/tests/test_txpool.nim +++ b/tests/test_txpool.nim @@ -602,8 +602,8 @@ proc runTxPackerTests(noisy = true) = block: var - xq = bcDB.toTxPool(txList, ntBaseFee, noisy) - xr = bcDB.toTxPool(txList, ntNextFee, noisy) + xq = bcDB.toTxPool(txList, ntBaseFee, noisy = noisy) + xr = bcDB.toTxPool(txList, ntNextFee, noisy = noisy) block: let pending = xq.nItems.pending @@ -742,7 +742,7 @@ proc runTxPackerTests(noisy = true) = block: var - xq = bcDB.toTxPool(txList, ntBaseFee, noisy) + xq = bcDB.toTxPool(txList, ntBaseFee, noisy = noisy) let (nMinTxs, nTrgTxs) = (15, 15) (nMinAccounts, nTrgAccounts) = (1, 8) @@ -814,6 +814,19 @@ proc runTxPackerTests(noisy = true) = " increase=", xq.gasCumulative - smallerBlockSize, " trg/max=", blockProfitRatio, "%" + let + accountExtractPC = 10 + acc = xq.accountRanks + nExtract = (acc.remote.len * accountExtractPC + 50) div 100 + accExtracted = acc.remote[acc.remote.len - nExtract .. ^1] + + noisy.say "***", "accounts", + " local=", acc.local.pp, + " remote=", acc.remote.pp, + " remote.len=", acc.remote.len, + " nExtract=", nExtract, + " accExtracted=", accExtracted.pp + # if true: return test "Store generated block in block chain database": diff --git a/tests/test_txpool/setup.nim b/tests/test_txpool/setup.nim index 99ae4d561..ca1dc5a3a 100644 --- a/tests/test_txpool/setup.nim +++ b/tests/test_txpool/setup.nim @@ -116,9 +116,10 @@ proc toTxPool*( proc toTxPool*( - db: BaseChainDB; ## to be modified, initialisier for `TxPool` - itList: var seq[TxItemRef]; ## import items into new `TxPool` (read only) - baseFee = 0.GasPrice; ## initalise with `baseFee` (unless 0) + db: BaseChainDB; ## to be modified, initialisier for `TxPool` + itList: seq[TxItemRef]; ## import items into new `TxPool` (read only) + baseFee = 0.GasPrice; ## initalise with `baseFee` (unless 0) + local: seq[EthAddress] = @[]; ## local addresses noisy = true): TxPoolRef = doAssert not db.isNil @@ -127,30 +128,32 @@ proc toTxPool*( result.baseFee = baseFee result.maxRejects = itList.len + let noLocals = local.len == 0 + var localAddr: Table[EthAddress,bool] + for a in local: + localAddr[a] = true + noisy.showElapsed(&"Loading {itList.len} transactions"): for item in itList: - result.jobAddTx(item.tx, item.info) - result.jobCommit + if noLocals: + result.jobAddTx(item.tx, item.info) + elif localAddr.hasKey(item.sender): + doAssert result.addLocal(item.tx, true).isOk + else: + doAssert result.addRemote(item.tx, true).isOk + result.jobCommit doAssert result.nItems.total == itList.len proc toTxPool*( - db: BaseChainDB; - itList: seq[TxItemRef]; - baseFee = 0.GasPrice; - noisy = true): TxPoolRef = - var newList = itList - db.toTxPool(newList, baseFee, noisy) - - -proc toTxPool*( - db: BaseChainDB; ## to be modified, initialisier for `TxPool` - timeGap: var Time; ## to be set, time in the middle of time gap - nGapItems: var int; ## to be set, # items before time gap - itList: var seq[TxItemRef]; ## import items into new `TxPool` (read only) - baseFee = 0.GasPrice; ## initalise with `baseFee` (unless 0) - itemsPC = 30; ## % number if items befor time gap - delayMSecs = 200; ## size of time vap + db: BaseChainDB; ## to be modified, initialisier for `TxPool` + timeGap: var Time; ## to be set, time in the middle of time gap + nGapItems: var int; ## to be set, # items before time gap + itList: var seq[TxItemRef]; ## import items into new `TxPool` (read only) + baseFee = 0.GasPrice; ## initalise with `baseFee` (unless 0) + itemsPC = 30; ## % number if items befor time gap + delayMSecs = 200; ## size of time vap + local: seq[EthAddress] = @[]; ## local addresses noisy = true): TxPoolRef = ## Variant of `toTxPoolFromSeq()` with a time gap between consecutive ## items on the `remote` queue @@ -161,6 +164,11 @@ proc toTxPool*( result.baseFee = baseFee result.maxRejects = itList.len + let noLocals = local.len == 0 + var localAddr: Table[EthAddress,bool] + for a in local: + localAddr[a] = true + let delayAt = itList.len * itemsPC div 100 middleOfTimeGap = initDuration(milliSeconds = delayMSecs div 2) @@ -168,7 +176,12 @@ proc toTxPool*( noisy.showElapsed(&"Loading {itList.len} transactions"): for n in 0 ..< itList.len: let item = itList[n] - result.jobAddTx(item.tx, item.info) + if noLocals: + result.jobAddTx(item.tx, item.info) + elif localAddr.hasKey(item.sender): + doAssert result.addLocal(item.tx, true).isOk + else: + doAssert result.addRemote(item.tx, true).isOk if delayAt == n: nGapItems = n # pass back value noisy.say &"time gap after transactions"