Remove txpool sender locality (#2525)

* Remove txpool sender locality

We no longer distinct local or remote sender

* Fix copyright year
This commit is contained in:
andri lim 2024-07-25 22:36:08 +07:00 committed by GitHub
parent 942cf7e447
commit 254bda365f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 21 additions and 334 deletions

View File

@ -431,8 +431,7 @@ import
tx_bucket,
tx_head,
tx_dispose,
tx_packer,
tx_recover],
tx_packer],
chronicles,
eth/keys,
stew/keyed_queue,
@ -459,9 +458,7 @@ export
tx_item.sender,
tx_item.status,
tx_item.timeStamp,
tx_item.tx,
tx_tabs.local,
tx_tabs.remote
tx_item.tx
{.push raises: [].}
@ -800,100 +797,6 @@ func disposeAll*(xp: TxPoolRef) {.raises: [CatchableError].} =
# Public functions, local/remote accounts
# ------------------------------------------------------------------------------
func isLocal*(xp: TxPoolRef; account: EthAddress): bool =
## This function returns `true` if argument `account` is tagged local.
xp.txDB.isLocal(account)
func setLocal*(xp: TxPoolRef; account: EthAddress) =
## Tag argument `account` local which means that the transactions from this
## account -- together with all other local accounts -- will be considered
## first for packing.
xp.txDB.setLocal(account)
func resLocal*(xp: TxPoolRef; account: EthAddress) =
## Untag argument `account` as local which means that the transactions from
## this account -- together with all other untagged accounts -- will be
## considered for packing after the locally tagged accounts.
xp.txDB.resLocal(account)
func flushLocals*(xp: TxPoolRef) =
## Untag all *local* addresses on the system.
xp.txDB.flushLocals
func accountRanks*(xp: TxPoolRef): TxTabsLocality =
## Returns two lists, one for local and the other for non-local accounts.
## Any of these lists is sorted by the highest rank first. This sorting
## means that the order may be out-dated after adding transactions.
xp.txDB.locality
proc addRemote*(xp: TxPoolRef;
tx: PooledTransaction; force = false): Result[void,TxInfo]
{.gcsafe,raises: [CatchableError].} =
## Adds the argument transaction `tx` to the buckets database.
##
## If the argument `force` is set `false` and the sender account of the
## argument transaction is tagged local, this function returns with an error.
## If the argument `force` is set `true`, the sender account will be untagged,
## i.e. made non-local.
##
## Note: This function is rather inefficient if there are more than one
## txs to be added for a known account. The preferable way to do this
## would be to use a combination of `xp.add()` and `xp.resLocal()` in any
## order.
# Create or recover new item. This will wrap the argument `tx` and cache
# the sender account and other derived data accessible.
let rc = xp.recoverItem(
tx, txItemPending, "remote tx peek", acceptExisting = true)
if rc.isErr:
return err(rc.error)
# Temporarily stash the item in the rubbish bin to be recovered, later
let sender = rc.value.sender
discard xp.txDB.dispose(rc.value, txInfoTxStashed)
# Verify local/remote account
if force:
xp.txDB.resLocal(sender)
elif xp.txDB.isLocal(sender):
return err(txInfoTxErrorRemoteExpected)
xp.add(tx, "remote tx")
ok()
proc addLocal*(xp: TxPoolRef;
tx: PooledTransaction; force = false): Result[void,TxInfo]
{.gcsafe,raises: [CatchableError].} =
## Adds the argument transaction `tx` to the buckets database.
##
## If the argument `force` is set `false` and the sender account of the
## argument transaction is _not_ tagged local, this function returns with
## an error. If the argument `force` is set `true`, the sender account will
## be tagged local.
##
## Note: This function is rather inefficient if there are more than one
## txs to be added for a known account. The preferable way to do this
## would be to use a combination of `xp.add()` and `xp.setLocal()` in any
## order.
# Create or recover new item. This will wrap the argument `tx` and cache
# the sender account and other derived data accessible.
let rc = xp.recoverItem(
tx, txItemPending, "local tx peek", acceptExisting = true)
if rc.isErr:
return err(rc.error)
# Temporarily stash the item in the rubbish bin to be recovered, later
let sender = rc.value.sender
discard xp.txDB.dispose(rc.value, txInfoTxStashed)
# Verify local/remote account
if force:
xp.txDB.setLocal(sender)
elif not xp.txDB.isLocal(sender):
return err(txInfoTxErrorLocalExpected)
xp.add(tx, "local tx")
ok()
func inPoolAndOk*(xp: TxPoolRef; txHash: Hash256): bool =
let res = xp.getItem(txHash)
if res.isErr: return false

View File

@ -1,5 +1,5 @@
# Nimbus
# Copyright (c) 2018 Status Research & Development GmbH
# Copyright (c) 2018-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
@ -18,10 +18,6 @@ type
txInfoOk =
(0, "no error")
txInfoPackedBlockIncluded = ##\
## The transaction was disposed after packing into block
"not needed anymore"
txInfoSenderNonceSuperseded = ##\
## Tx superseded by another one with same <sender,nonce> index
"Sender/nonce index superseded"
@ -66,40 +62,8 @@ type
## <sender,nonce> index for transaction exists, already.
"Sender/nonce index error"
txInfoErrTxPoolOverflow = ##\
## The transaction pool is full and can't accpet another remote
## transaction.
"txpool is full"
# ------ Transaction format/parsing problems -------------------------------
txInfoErrOversizedData = ##\
## The input data of a transaction is greater than some meaningful
## limit a user might use. This is not a consensus error making the
## transaction invalid, rather a DOS protection.
"Oversized tx data"
txInfoErrNegativeValue = ##\
## A sanity error to ensure no one is able to specify a transaction
## with a negative value.
"Negative value in tx"
txInfoErrUnexpectedProtection = ##\
## Transaction type does not supported EIP-1559 protected signature
"Unsupported EIP-1559 signature protection"
txInfoErrInvalidTxType = ##\
## Transaction type not valid in this context
"Unsupported tx type"
txInfoErrTxTypeNotSupported = ##\
## Transaction type not supported
"Unsupported transaction type"
txInfoErrEmptyTypedTx = ##\
## Typed transaction, missing data
"Empty typed transaction bytes"
txInfoErrBasicValidatorFailed = ##\
## Running basic validator failed on current transaction
"Tx rejected by basic validator"
@ -114,31 +78,14 @@ type
## The transaction contains an invalid signature.
"invalid sender"
txInfoErrInvalidSig = ##\
## invalid transaction v, r, s values
"Invalid transaction signature"
# ------ Gas fee and selection problems ------------------------------------
txInfoErrUnderpriced = ##\
## A transaction's gas price is below the minimum configured for the
## transaction pool.
"Tx underpriced"
txInfoErrReplaceUnderpriced = ##\
## A transaction is attempted to be replaced with a different one
## without the required price bump.
"Replacement tx underpriced"
txInfoErrGasLimit = ##\
## A transaction's requested gas limit exceeds the maximum allowance
## of the current block.
"Tx exceeds block gasLimit"
txInfoErrGasFeeCapTooLow = ##\
## Gase fee cap less than base fee
"Tx has feeCap < baseFee"
# ------- operational events related to transactions -----------------------
txInfoErrTxExpired = ##\
@ -150,19 +97,6 @@ type
## tx was removed.
"Tx expired implied"
txInfoTxStashed = ##\
## A transaction was item was created and stored in the disposal bin
## to be recycled and processed later.
"Tx stashed"
txInfoTxErrorRemoteExpected = ##\
## The sender account of a transaction was expected non-local.
"Tx non-local expected"
txInfoTxErrorLocalExpected = ##\
## The sender account of a transaction was expected local.
"Tx local expected"
# ------- update/move block chain head -------------------------------------
txInfoErrAncestorMissing = ##\
@ -192,7 +126,6 @@ type
# ---------- debugging error codes as used in verifier functions -----------
# failed verifier codes
txInfoVfyLeafQueue ## Corrupted leaf item queue
txInfoVfyItemIdList ## Corrupted ID queue/fifo structure
txInfoVfyRejectsList ## Corrupted waste basket structure
@ -200,12 +133,9 @@ type
txInfoVfySenderRbTree ## Corrupted sender list structure
txInfoVfySenderLeafEmpty ## Empty sender list leaf record
txInfoVfySenderLeafQueue ## Corrupted sender leaf queue
txInfoVfySenderTotal ## Wrong number of leaves
txInfoVfySenderGasLimits ## Wrong gas accu values
txInfoVfySenderProfits ## Profits calculation error
txInfoVfyStatusRbTree ## Corrupted status list structure
txInfoVfyStatusTotal ## Wrong number of leaves
txInfoVfyStatusGasLimits ## Wrong gas accu values
txInfoVfyStatusSenderList ## Corrupted status-sender sub-list
@ -221,8 +151,4 @@ type
txInfoVfyRankDuplicateAddr ## Same address with different ranks
txInfoVfyRankTotal ## Wrong number of leaves (i.e. adresses)
# codes provided for other modules
txInfoVfyJobQueue ## Corrupted jobs queue/fifo structure
txInfoVfyJobEvent ## Event table sync error
# End

View File

@ -15,7 +15,7 @@
{.push raises: [].}
import
std/[sequtils, tables],
std/[tables],
./tx_info,
./tx_item,
./tx_tabs/[tx_sender, tx_rank, tx_status],
@ -36,14 +36,6 @@ type
TxTabsGasTotals* = tuple
pending, staged, packed: GasInt ## sum => total
TxTabsLocality* = object ##\
## Return value for `locality()` function
local: seq[EthAddress] ##\
## List of local accounts, higest rank first
remote: seq[EthAddress] ##\
## List of non-local accounts, higest rank first
TxTabsRef* = ref object ##\
## Base descriptor
maxRejects: int ##\
@ -51,9 +43,6 @@ type
# ----- primary tables ------
byLocal*: Table[EthAddress,bool] ##\
## List of local accounts (currently idle/unused)
byRejects*: KeyedQueue[Hash256,TxItemRef] ##\
## Rejects queue and waste basket, queued by disposal event
@ -124,7 +113,6 @@ proc new*(T: type TxTabsRef): T {.gcsafe,raises: [].} =
new result
result.maxRejects = txTabMaxRejects
# result.byLocal -- Table, no need to init
# result.byItemID -- KeyedQueue, no need to init
# result.byRejects -- KeyedQueue, no need to init
@ -258,14 +246,6 @@ proc maxRejects*(xp: TxTabsRef): int =
## Getter
xp.maxRejects
proc local*(lc: TxTabsLocality): seq[EthAddress] =
## Getter
lc.local
proc remote*(lc: TxTabsLocality): seq[EthAddress] =
## Getter
lc.remote
# ------------------------------------------------------------------------------
# Public functions, setters
# ------------------------------------------------------------------------------
@ -309,43 +289,6 @@ proc gasTotals*(xp: TxTabsRef): TxTabsGasTotals =
result.staged = xp.byStatus.eq(txItemStaged).gasLimits
result.packed = xp.byStatus.eq(txItemPacked).gasLimits
# ------------------------------------------------------------------------------
# Public functions: local/remote sender accounts
# ------------------------------------------------------------------------------
proc isLocal*(xp: TxTabsRef; sender: EthAddress): bool =
## Returns `true` if account address is local
xp.byLocal.hasKey(sender)
proc locals*(xp: TxTabsRef): seq[EthAddress] =
## Returns an unsorted list of addresses tagged *local*
toSeq(xp.byLocal.keys)
proc locality*(xp: TxTabsRef): TxTabsLocality =
## Returns a pair of sorted lists of account addresses,
## highest address rank first
var rcRank = xp.byRank.le(TxRank.high)
while rcRank.isOk:
let (rank, addrList) = (rcRank.value.key, rcRank.value.data)
for account in addrList.keys:
if xp.byLocal.hasKey(account):
result.local.add account
else:
result.remote.add account
rcRank = xp.byRank.lt(rank)
proc setLocal*(xp: TxTabsRef; sender: EthAddress) =
## Tag `sender` address argument *local*
xp.byLocal[sender] = true
proc resLocal*(xp: TxTabsRef; sender: EthAddress) =
## Untag *local* `sender` address argument.
xp.byLocal.del(sender)
proc flushLocals*(xp: TxTabsRef) =
## Untag all *local* addresses on the system.
xp.byLocal.clear
# ------------------------------------------------------------------------------
# Public iterators, `TxRank` > `(EthAddress,TxStatusNonceRef)`
# ------------------------------------------------------------------------------
@ -397,16 +340,10 @@ iterator packingOrderAccounts*(xp: TxTabsRef; bucket: TxItemStatus):
(EthAddress,TxStatusNonceRef)
{.gcsafe,raises: [KeyError].} =
## Loop over accounts from a particular bucket ordered by
## + local ranks, higest one first
## + remote ranks, higest one first
## For the `txItemStaged` bucket, this iterator defines the packing order
## for transactions (important when calculationg the *txRoot*.)
for (account,nonceList) in xp.decAccount(bucket):
if xp.isLocal(account):
yield (account,nonceList)
for (account,nonceList) in xp.decAccount(bucket):
if not xp.isLocal(account):
yield (account,nonceList)
yield (account,nonceList)
# ------------------------------------------------------------------------------
# Public iterators, `TxRank` > `(EthAddress,TxSenderNonceRef)`

View File

@ -149,10 +149,6 @@ proc addTx*(xp: TxPoolRef; item: TxItemRef): bool
# Error processing => store in waste basket
xp.txDB.reject(item, vetted)
# core/tx_pool.go(848): func (pool *TxPool) AddLocals(txs []..
# core/tx_pool.go(854): func (pool *TxPool) AddLocals(txs []..
# core/tx_pool.go(864): func (pool *TxPool) AddRemotes(txs []..
# core/tx_pool.go(883): func (pool *TxPool) AddRemotes(txs []..
# core/tx_pool.go(889): func (pool *TxPool) addTxs(txs []*types.Transaction, ..
proc addTxs*(xp: TxPoolRef;
txs: openArray[PooledTransaction]; info = ""): TxAddStats

View File

@ -68,9 +68,6 @@ when extraTraceMessages:
# Private helpers
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"MemoryDB/" & info
proc newSession(db: MemBackendRef): MemPutHdlRef =
new result
result.TypedPutHdlRef.beginSession db

View File

@ -15,7 +15,6 @@ import
eth/common,
"../.."/[constants, errors],
".."/[kvt, aristo],
./backend/aristo_db,
./base/[api_tracking, base_config, base_desc, base_helpers]
export

View File

@ -105,10 +105,6 @@ func oaToStr(w: openArray[byte]): string =
func toStr(w: Hash256): string =
if w == EMPTY_ROOT_HASH: "EMPTY_ROOT_HASH" else: w.data.oaToStr
func toLenStr(w: Blob): string =
if 0 < w.len and w.len < 5: "<" & w.oaToStr & ">"
else: "Blob[" & $w.len & "]"
func toStr(ela: Duration): string =
aristo_profile.toStr(ela)

View File

@ -131,11 +131,11 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
# Early-initialise "--snap-sync" before starting any network connections.
block:
let
exCtrlFile = if conf.syncCtrlFile.isNone: Opt.none(string)
else: Opt.some(conf.syncCtrlFile.get)
tickerOK = conf.logLevel in {
LogLevel.INFO, LogLevel.DEBUG, LogLevel.TRACE}
#let
# exCtrlFile = if conf.syncCtrlFile.isNone: Opt.none(string)
# else: Opt.some(conf.syncCtrlFile.get)
# tickerOK = conf.logLevel in {
# LogLevel.INFO, LogLevel.DEBUG, LogLevel.TRACE}
case conf.syncMode:
#of SyncMode.Snap:
# # Minimal capability needed for sync only

View File

@ -65,14 +65,9 @@ type
newBlockHandler: NewBlockHandlerPair
newBlockHashesHandler: NewBlockHashesHandlerPair
ReconnectRef = ref object
pool: PeerPool
node: Node
const
NUM_PEERS_REBROADCAST_QUOTIENT = 4
POOLED_STORAGE_TIME_LIMIT = initDuration(minutes = 20)
PEER_LONG_BANTIME = chronos.minutes(150)
# ------------------------------------------------------------------------------
# Private functions: helper functions
@ -84,10 +79,6 @@ proc notEnabled(name: string) {.used.} =
proc notImplemented(name: string) {.used.} =
debug "Wire handler method not implemented", meth = name
proc inPool(ctx: EthWireRef, txHash: Hash256): bool =
let res = ctx.txPool.getItem(txHash)
res.isOk
proc successorHeader(db: CoreDbRef,
h: BlockHeader,
output: var BlockHeader,
@ -125,43 +116,6 @@ proc getPeers(ctx: EthWireRef, thisPeer: Peer): seq[Peer] =
if peer != thisPeer:
result.add peer
proc banExpiredReconnect(arg: pointer) =
# Reconnect to peer after ban period if pool is empty
try:
let reconnect = cast[ReconnectRef](arg)
if reconnect.pool.len > 0:
return
asyncSpawn reconnect.pool.connectToNode(reconnect.node)
except TransportError:
debug "Transport got closed during banExpiredReconnect"
except CatchableError as e:
debug "Exception in banExpiredReconnect", exc = e.name, err = e.msg
proc banPeer(pool: PeerPool, peer: Peer, banTime: chronos.Duration) {.async.} =
try:
await peer.disconnect(SubprotocolReason)
let expired = Moment.fromNow(banTime)
let reconnect = ReconnectRef(
pool: pool,
node: peer.remote
)
discard setTimer(
expired,
banExpiredReconnect,
cast[pointer](reconnect)
)
except TransportError:
debug "Transport got closed during banPeer"
except CatchableError as e:
debug "Exception in banPeer", exc = e.name, err = e.msg
proc cleanupKnownByPeer(ctx: EthWireRef) =
let now = getTime()
var tmp = HashSet[Hash256]()

View File

@ -311,11 +311,7 @@ proc runLedgerTransactionTests(noisy = true) =
for _ in 0..<NumTransactions:
let recipient = initAddr(recipientSeed)
let tx = env.makeTx(recipient, 1.u256)
let res = env.xp.addLocal(PooledTransaction(tx: tx), force = true)
check res.isOk
if res.isErr:
debugEcho res.error
return
env.xp.add(PooledTransaction(tx: tx))
inc recipientSeed

View File

@ -140,12 +140,8 @@ proc runTxPoolPosTest() =
blk: EthBlock
suite "Test TxPool with PoS block":
test "TxPool addLocal":
let res = xp.addLocal(PooledTransaction(tx: tx), force = true)
check res.isOk
if res.isErr:
debugEcho res.error
return
test "TxPool add":
xp.add(PooledTransaction(tx: tx))
test "TxPool jobCommit":
check xp.nItems.total == 1
@ -200,16 +196,9 @@ proc runTxPoolBlobhashTest() =
blk: EthBlock
suite "Test TxPool with blobhash block":
test "TxPool addLocal":
let res = xp.addLocal(PooledTransaction(tx: tx1), force = true)
check res.isOk
if res.isErr:
debugEcho res.error
return
let res2 = xp.addLocal(PooledTransaction(tx: tx2), force = true)
check res2.isOk
test "TxPool jobCommit":
xp.add(PooledTransaction(tx: tx1))
xp.add(PooledTransaction(tx: tx2))
check xp.nItems.total == 2
test "TxPool ethBlock":
@ -256,11 +245,7 @@ proc runTxPoolBlobhashTest() =
xp = env.xp
check xp.smartHead(blk.header)
let res = xp.addLocal(PooledTransaction(tx: tx4), force = true)
check res.isOk
if res.isErr:
debugEcho res.error
return
xp.add(PooledTransaction(tx: tx4))
check inPoolAndOk(xp, rlpHash(tx4)) == false
@ -288,8 +273,6 @@ proc runTxHeadDelta(noisy = true) =
for tn in 0..<txPerblock:
let tx = env.makeTx(recipient, amount)
# Instead of `add()`, the functions `addRemote()` or `addLocal()`
# also would do.
xp.add(PooledTransaction(tx: tx))
noisy.say "***", "txDB",
@ -350,8 +333,8 @@ proc runGetBlockBodyTest() =
tx1 = env.makeTx(recipient, 1.u256)
tx2 = env.makeTx(recipient, 2.u256)
check env.xp.addLocal(PooledTransaction(tx: tx1), true).isOk
check env.xp.addLocal(PooledTransaction(tx: tx2), true).isOk
env.xp.add(PooledTransaction(tx: tx1))
env.xp.add(PooledTransaction(tx: tx2))
env.com.pos.prevRandao = prevRandao
env.com.pos.feeRecipient = feeRecipient
@ -374,9 +357,9 @@ proc runGetBlockBodyTest() =
tx2 = env.makeTx(recipient, 4.u256)
tx3 = env.makeTx(recipient, 5.u256)
check env.xp.addLocal(PooledTransaction(tx: tx1), true).isOk
check env.xp.addLocal(PooledTransaction(tx: tx2), true).isOk
check env.xp.addLocal(PooledTransaction(tx: tx3), true).isOk
env.xp.add(PooledTransaction(tx: tx1))
env.xp.add(PooledTransaction(tx: tx2))
env.xp.add(PooledTransaction(tx: tx3))
env.com.pos.prevRandao = prevRandao
env.com.pos.feeRecipient = feeRecipient