Support for local accounts (#1019)

* Support for local accounts

why:
  Accounts tagged local will be packed with priority over untagged
  accounts

* Added functions for queuing txs and simultaneously setting account locality

why:
  Might be a popular task, in particular for unconditionally adding txs to
  a local (aka prioritised account) via "xp.addLocal(tx,true)"

caveat:
  Untested yet

* fix typo

* backup

* No baseFee for pre-London tx in verifier

why:
  The packer would wrongly discard valid legacy txs.
This commit is contained in:
Jordan Hrycaj 2022-03-31 09:21:36 +01:00 committed by GitHub
parent 6c94291eee
commit 2eb3f414d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 193 additions and 55 deletions

View File

@ -10,10 +10,6 @@
## TODO: ## TODO:
## ===== ## =====
## * Support `local` accounts the txs of which would be prioritised. This is
## currently unsupported. For now, all txs are considered from `remote`
## accounts.
##
## * No uncles are handled by this pool ## * No uncles are handled by this pool
## ##
## * Impose a size limit to the bucket database. Which items would be removed? ## * Impose a size limit to the bucket database. Which items would be removed?
@ -446,7 +442,13 @@ import
../db/db_chain, ../db/db_chain,
./tx_pool/[tx_chain, tx_desc, tx_info, tx_item, tx_job], ./tx_pool/[tx_chain, tx_desc, tx_info, tx_item, tx_job],
./tx_pool/tx_tabs, ./tx_pool/tx_tabs,
./tx_pool/tx_tasks/[tx_add, tx_bucket, tx_head, tx_dispose, tx_packer], ./tx_pool/tx_tasks/[
tx_add,
tx_bucket,
tx_head,
tx_dispose,
tx_packer,
tx_recover],
chronicles, chronicles,
eth/[common, keys], eth/[common, keys],
stew/[keyed_queue, results], stew/[keyed_queue, results],
@ -478,7 +480,9 @@ export
tx_item.sender, tx_item.sender,
tx_item.status, tx_item.status,
tx_item.timeStamp, tx_item.timeStamp,
tx_item.tx tx_item.tx,
tx_tabs.local,
tx_tabs.remote
{.push raises: [Defect].} {.push raises: [Defect].}
@ -845,30 +849,80 @@ proc disposeItems*(xp: TxPoolRef; item: TxItemRef;
xp.disposeItemAndHigherNonces(item, reason, otherReason) xp.disposeItemAndHigherNonces(item, reason, otherReason)
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Public functions, more immediate actions deemed not so important yet # Public functions, local/remote accounts
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
#[ proc setLocal*(xp: TxPoolRef; account: EthAddress) =
## Tag argument `account` local which means that the transactions from this
## account -- together with all other local accounts -- will be considered
## first for packing.
xp.txDB.setLocal(account)
# core/tx_pool.go(561): func (pool *TxPool) Locals() []common.Address { proc resLocal*(xp: TxPoolRef; account: EthAddress) =
proc getAccounts*(xp: TxPoolRef; local: bool): seq[EthAddress] ## Untag argument `account` as local which means that the transactions from
## this account -- together with all other untagged accounts -- will be
## considered for packing after the locally tagged accounts.
xp.txDB.resLocal(account)
proc accountRanks*(xp: TxPoolRef): TxTabsLocality =
## Returns two lists, one for local and the other for non-local accounts.
## Any of these lists is sorted by the highest rank first. This sorting
## means that the order may be out-dated after adding transactions.
xp.txDB.locality
proc addRemote*(xp: TxPoolRef;
tx: Transaction; force = false): Result[void,TxInfo]
{.gcsafe,raises: [Defect,CatchableError].} = {.gcsafe,raises: [Defect,CatchableError].} =
## Retrieves the accounts currently considered `local` or `remote` (i.e. ## Adds the argument transaction `tx` to the job queue.
## the have txs of that kind) destaged on request arguments. ##
if local: ## If the argument `force` is set `false` and the sender account of the
result = xp.txDB.locals ## argument transaction is tagged local, this function returns with an error.
else: ## If the argument `force` is set `true`, the sender account will be untagged,
result = xp.txDB.remotes ## i.e. made non-local.
# Create or recover new item
let rc = xp.recoverItem(tx, txItemPending, "remote tx peek")
if rc.isErr:
return err(rc.error)
# core/tx_pool.go(1797): func (t *txLookup) RemoteToLocals(locals .. # Temporarily stash the item in the rubbish bin to be recovered, later
proc remoteToLocals*(xp: TxPoolRef; signer: EthAddress): int let sender = rc.value.sender
discard xp.txDB.dispose(rc.value, txInfoTxStashed)
# Verify local/remote account
if force:
xp.txDB.resLocal(sender)
elif xp.txDB.isLocal(sender):
return err(txInfoTxErrorRemoteExpected)
xp.jobAddTx(tx, "remote tx")
ok()
proc addLocal*(xp: TxPoolRef;
tx: Transaction; force = false): Result[void,TxInfo]
{.gcsafe,raises: [Defect,CatchableError].} = {.gcsafe,raises: [Defect,CatchableError].} =
## For given account, remote transactions are migrated to local transactions. ## Adds the argument transaction `tx` to the job queue.
## The function returns the number of transactions migrated. ##
xp.txDB.setLocal(signer) ## If the argument `force` is set `false` and the sender account of the
xp.txDB.bySender.eq(signer).nItems ## argument transaction is _not_ tagged local, this function returns with
## an error. If the argument `force` is set `true`, the sender account will
## be tagged local.
# Create or recover new item
let rc = xp.recoverItem(tx, txItemPending, "local tx peek")
if rc.isErr:
return err(rc.error)
]# # Temporarily stash the item in the rubbish bin to be recovered, later
let sender = rc.value.sender
discard xp.txDB.dispose(rc.value, txInfoTxStashed)
# Verify local/remote account
if force:
xp.txDB.setLocal(sender)
elif not xp.txDB.isLocal(sender):
return err(txInfoTxErrorLocalExpected)
xp.jobAddTx(tx, "local tx")
ok()
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# End # End

View File

@ -143,6 +143,19 @@ type
## tx was removed. ## tx was removed.
"Tx expired implied" "Tx expired implied"
txInfoTxStashed = ##\
## A transaction was item was created and stored in the disposal bin
## to be recycled and processed later.
"Tx stashed"
txInfoTxErrorRemoteExpected = ##\
## The sender account of a transaction was expected non-local.
"Tx non-local expected"
txInfoTxErrorLocalExpected = ##\
## The sender account of a transaction was expected local.
"Tx local expected"
# ------- update/move block chain head ------------------------------------- # ------- update/move block chain head -------------------------------------
txInfoErrAncestorMissing = ##\ txInfoErrAncestorMissing = ##\

View File

@ -35,6 +35,14 @@ type
TxTabsGasTotals* = tuple TxTabsGasTotals* = tuple
pending, staged, packed: GasInt ## sum => total pending, staged, packed: GasInt ## sum => total
TxTabsLocality* = object ##\
## Return value for `locality()` function
local: seq[EthAddress] ##\
## List of local accounts, higest rank first
remote: seq[EthAddress] ##\
## List of non-local accounts, higest rank first
TxTabsRef* = ref object ##\ TxTabsRef* = ref object ##\
## Base descriptor ## Base descriptor
maxRejects: int ##\ maxRejects: int ##\
@ -253,6 +261,14 @@ proc maxRejects*(xp: TxTabsRef): int =
## Getter ## Getter
xp.maxRejects xp.maxRejects
proc local*(lc: TxTabsLocality): seq[EthAddress] =
## Getter
lc.local
proc remote*(lc: TxTabsLocality): seq[EthAddress] =
## Getter
lc.remote
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Public functions, setters # Public functions, setters
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
@ -310,14 +326,17 @@ proc locals*(xp: TxTabsRef): seq[EthAddress] =
## Returns an unsorted list of addresses tagged *local* ## Returns an unsorted list of addresses tagged *local*
toSeq(xp.byLocal.keys) toSeq(xp.byLocal.keys)
proc remotes*(xp: TxTabsRef): seq[EthAddress] = proc locality*(xp: TxTabsRef): TxTabsLocality =
## Returns an sorted list of untagged addresses, highest address rank first ## Returns a pair of sorted lists of account addresses,
## highest address rank first
var rcRank = xp.byRank.le(TxRank.high) var rcRank = xp.byRank.le(TxRank.high)
while rcRank.isOK: while rcRank.isOK:
let (rank, addrList) = (rcRank.value.key, rcRank.value.data) let (rank, addrList) = (rcRank.value.key, rcRank.value.data)
for account in addrList.keys: for account in addrList.keys:
if not xp.byLocal.hasKey(account): if xp.byLocal.hasKey(account):
result.add account result.local.add account
else:
result.remote.add account
rcRank = xp.byRank.lt(rank) rcRank = xp.byRank.lt(rank)
proc setLocal*(xp: TxTabsRef; sender: EthAddress) = proc setLocal*(xp: TxTabsRef; sender: EthAddress) =

View File

@ -86,6 +86,12 @@ proc checkTxNonce(xp: TxPoolRef; item: TxItemRef): bool
true true
proc eip1559TxNormalization(tx: Transaction): Transaction =
result = tx
if tx.txType < TxEip1559:
result.maxPriorityFee = tx.gasPrice
result.maxFee = tx.gasPrice
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Private function: active tx classifier check helpers # Private function: active tx classifier check helpers
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
@ -233,7 +239,11 @@ proc classifyValidatePacked*(xp: TxPoolRef;
else: else:
xp.chain.limits.trgLimit xp.chain.limits.trgLimit
roDB.validateTransaction(item.tx, item.sender, gasLimit, baseFee, fork) var tx = item.tx.eip1559TxNormalization
if FkLondon <= fork:
tx.gasPrice = min(tx.maxPriorityFee + baseFee.truncate(int64), tx.maxFee)
roDB.validateTransaction(tx, item.sender, gasLimit, baseFee, fork)
proc classifyPacked*(xp: TxPoolRef; gasBurned, moreBurned: GasInt): bool = proc classifyPacked*(xp: TxPoolRef; gasBurned, moreBurned: GasInt): bool =
## Classifier for *packing* (i.e. adding up `gasUsed` values after executing ## Classifier for *packing* (i.e. adding up `gasUsed` values after executing

View File

@ -252,6 +252,18 @@ proc vmExecCommit(pst: TxPackerStateRef)
xp.chain.profit = balanceDelta() xp.chain.profit = balanceDelta()
xp.chain.reward = balanceDelta() xp.chain.reward = balanceDelta()
iterator rankedAccounts(xp: TxPoolRef): TxStatusNonceRef
{.gcsafe,raises: [Defect,KeyError].} =
## Loop over staged accounts ordered by
## + local ranks, higest one first
## + remote ranks, higest one first
for (account,nonceList) in xp.txDB.decAccount(txItemStaged):
if xp.txDB.isLocal(account):
yield nonceList
for (account,nonceList) in xp.txDB.decAccount(txItemStaged):
if not xp.txDB.isLocal(account):
yield nonceList
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Public functions # Public functions
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
@ -265,7 +277,7 @@ proc packerVmExec*(xp: TxPoolRef) {.gcsafe,raises: [Defect,CatchableError].} =
var pst = xp.vmExecInit var pst = xp.vmExecInit
block loop: block loop:
for (_,nonceList) in pst.xp.txDB.decAccount(txItemStaged): for nonceList in pst.xp.rankedAccounts:
block account: block account:
for item in nonceList.incNonce: for item in nonceList.incNonce:

View File

@ -35,7 +35,7 @@ let
# Public functions # Public functions
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc recoverItem*(xp: TxPoolRef; tx: var Transaction; proc recoverItem*(xp: TxPoolRef; tx: Transaction;
status = txItemPending; info = ""): Result[TxItemRef,TxInfo] status = txItemPending; info = ""): Result[TxItemRef,TxInfo]
{.gcsafe,raises: [Defect,CatchableError].} = {.gcsafe,raises: [Defect,CatchableError].} =
## Recover item from waste basket or create new. It is an error if the item ## Recover item from waste basket or create new. It is an error if the item

View File

@ -1,3 +1,4 @@
# Nimbus # Nimbus
# Copyright (c) 2018-2019 Status Research & Development GmbH # Copyright (c) 2018-2019 Status Research & Development GmbH
# Licensed under either of # Licensed under either of
@ -30,6 +31,9 @@ proc pp*(b: Blob): string =
proc pp*(a: EthAddress): string = proc pp*(a: EthAddress): string =
a.mapIt(it.toHex(2)).join[32 .. 39].toLowerAscii a.mapIt(it.toHex(2)).join[32 .. 39].toLowerAscii
proc pp*(a: openArray[EthAddress]): string =
"[" & a.mapIt(it.pp).join(" ") & "]"
proc pp*(a: BlockNonce): string = proc pp*(a: BlockNonce): string =
a.mapIt(it.toHex(2)).join.toLowerAscii a.mapIt(it.toHex(2)).join.toLowerAscii

View File

@ -602,8 +602,8 @@ proc runTxPackerTests(noisy = true) =
block: block:
var var
xq = bcDB.toTxPool(txList, ntBaseFee, noisy) xq = bcDB.toTxPool(txList, ntBaseFee, noisy = noisy)
xr = bcDB.toTxPool(txList, ntNextFee, noisy) xr = bcDB.toTxPool(txList, ntNextFee, noisy = noisy)
block: block:
let let
pending = xq.nItems.pending pending = xq.nItems.pending
@ -742,7 +742,7 @@ proc runTxPackerTests(noisy = true) =
block: block:
var var
xq = bcDB.toTxPool(txList, ntBaseFee, noisy) xq = bcDB.toTxPool(txList, ntBaseFee, noisy = noisy)
let let
(nMinTxs, nTrgTxs) = (15, 15) (nMinTxs, nTrgTxs) = (15, 15)
(nMinAccounts, nTrgAccounts) = (1, 8) (nMinAccounts, nTrgAccounts) = (1, 8)
@ -814,6 +814,19 @@ proc runTxPackerTests(noisy = true) =
" increase=", xq.gasCumulative - smallerBlockSize, " increase=", xq.gasCumulative - smallerBlockSize,
" trg/max=", blockProfitRatio, "%" " trg/max=", blockProfitRatio, "%"
let
accountExtractPC = 10
acc = xq.accountRanks
nExtract = (acc.remote.len * accountExtractPC + 50) div 100
accExtracted = acc.remote[acc.remote.len - nExtract .. ^1]
noisy.say "***", "accounts",
" local=", acc.local.pp,
" remote=", acc.remote.pp,
" remote.len=", acc.remote.len,
" nExtract=", nExtract,
" accExtracted=", accExtracted.pp
# if true: return # if true: return
test "Store generated block in block chain database": test "Store generated block in block chain database":

View File

@ -116,9 +116,10 @@ proc toTxPool*(
proc toTxPool*( proc toTxPool*(
db: BaseChainDB; ## to be modified, initialisier for `TxPool` db: BaseChainDB; ## to be modified, initialisier for `TxPool`
itList: var seq[TxItemRef]; ## import items into new `TxPool` (read only) itList: seq[TxItemRef]; ## import items into new `TxPool` (read only)
baseFee = 0.GasPrice; ## initalise with `baseFee` (unless 0) baseFee = 0.GasPrice; ## initalise with `baseFee` (unless 0)
local: seq[EthAddress] = @[]; ## local addresses
noisy = true): TxPoolRef = noisy = true): TxPoolRef =
doAssert not db.isNil doAssert not db.isNil
@ -127,30 +128,32 @@ proc toTxPool*(
result.baseFee = baseFee result.baseFee = baseFee
result.maxRejects = itList.len result.maxRejects = itList.len
let noLocals = local.len == 0
var localAddr: Table[EthAddress,bool]
for a in local:
localAddr[a] = true
noisy.showElapsed(&"Loading {itList.len} transactions"): noisy.showElapsed(&"Loading {itList.len} transactions"):
for item in itList: for item in itList:
result.jobAddTx(item.tx, item.info) if noLocals:
result.jobCommit result.jobAddTx(item.tx, item.info)
elif localAddr.hasKey(item.sender):
doAssert result.addLocal(item.tx, true).isOk
else:
doAssert result.addRemote(item.tx, true).isOk
result.jobCommit
doAssert result.nItems.total == itList.len doAssert result.nItems.total == itList.len
proc toTxPool*( proc toTxPool*(
db: BaseChainDB; db: BaseChainDB; ## to be modified, initialisier for `TxPool`
itList: seq[TxItemRef]; timeGap: var Time; ## to be set, time in the middle of time gap
baseFee = 0.GasPrice; nGapItems: var int; ## to be set, # items before time gap
noisy = true): TxPoolRef = itList: var seq[TxItemRef]; ## import items into new `TxPool` (read only)
var newList = itList baseFee = 0.GasPrice; ## initalise with `baseFee` (unless 0)
db.toTxPool(newList, baseFee, noisy) itemsPC = 30; ## % number if items befor time gap
delayMSecs = 200; ## size of time vap
local: seq[EthAddress] = @[]; ## local addresses
proc toTxPool*(
db: BaseChainDB; ## to be modified, initialisier for `TxPool`
timeGap: var Time; ## to be set, time in the middle of time gap
nGapItems: var int; ## to be set, # items before time gap
itList: var seq[TxItemRef]; ## import items into new `TxPool` (read only)
baseFee = 0.GasPrice; ## initalise with `baseFee` (unless 0)
itemsPC = 30; ## % number if items befor time gap
delayMSecs = 200; ## size of time vap
noisy = true): TxPoolRef = noisy = true): TxPoolRef =
## Variant of `toTxPoolFromSeq()` with a time gap between consecutive ## Variant of `toTxPoolFromSeq()` with a time gap between consecutive
## items on the `remote` queue ## items on the `remote` queue
@ -161,6 +164,11 @@ proc toTxPool*(
result.baseFee = baseFee result.baseFee = baseFee
result.maxRejects = itList.len result.maxRejects = itList.len
let noLocals = local.len == 0
var localAddr: Table[EthAddress,bool]
for a in local:
localAddr[a] = true
let let
delayAt = itList.len * itemsPC div 100 delayAt = itList.len * itemsPC div 100
middleOfTimeGap = initDuration(milliSeconds = delayMSecs div 2) middleOfTimeGap = initDuration(milliSeconds = delayMSecs div 2)
@ -168,7 +176,12 @@ proc toTxPool*(
noisy.showElapsed(&"Loading {itList.len} transactions"): noisy.showElapsed(&"Loading {itList.len} transactions"):
for n in 0 ..< itList.len: for n in 0 ..< itList.len:
let item = itList[n] let item = itList[n]
result.jobAddTx(item.tx, item.info) if noLocals:
result.jobAddTx(item.tx, item.info)
elif localAddr.hasKey(item.sender):
doAssert result.addLocal(item.tx, true).isOk
else:
doAssert result.addRemote(item.tx, true).isOk
if delayAt == n: if delayAt == n:
nGapItems = n # pass back value nGapItems = n # pass back value
noisy.say &"time gap after transactions" noisy.say &"time gap after transactions"