519 lines
18 KiB
Nim
519 lines
18 KiB
Nim
# Nimbus
|
|
# Copyright (c) 2018 Status Research & Development GmbH
|
|
# Licensed under either of
|
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
|
# http://www.apache.org/licenses/LICENSE-2.0)
|
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
|
# http://opensource.org/licenses/MIT)
|
|
# at your option. This file may not be copied, modified, or distributed except
|
|
# according to those terms.
|
|
|
|
## Transaction Pool Database For Buckets And Waste Basket
|
|
## ======================================================
|
|
##
|
|
|
|
import
|
|
std/[sequtils, tables],
|
|
./tx_info,
|
|
./tx_item,
|
|
./tx_tabs/[tx_sender, tx_rank, tx_status],
|
|
eth/[common, keys],
|
|
stew/[keyed_queue, keyed_queue/kq_debug, results, sorted_set]
|
|
|
|
{.push raises: [Defect].}
|
|
|
|
export
|
|
# bySender/byStatus index operations
|
|
any, eq, ge, gt, le, len, lt, nItems, gasLimits
|
|
|
|
type
|
|
TxTabsItemsCount* = tuple
|
|
pending, staged, packed: int ## sum => total
|
|
total: int ## excluding rejects
|
|
disposed: int ## waste basket
|
|
|
|
TxTabsGasTotals* = tuple
|
|
pending, staged, packed: GasInt ## sum => total
|
|
|
|
TxTabsRef* = ref object ##\
|
|
## Base descriptor
|
|
maxRejects: int ##\
|
|
## Maximal number of items in waste basket
|
|
|
|
# ----- primary tables ------
|
|
|
|
byLocal*: Table[EthAddress,bool] ##\
|
|
## List of local accounts (currently idle/unused)
|
|
|
|
byRejects*: KeyedQueue[Hash256,TxItemRef] ##\
|
|
## Rejects queue and waste basket, queued by disposal event
|
|
|
|
byItemID*: KeyedQueue[Hash256,TxItemRef] ##\
|
|
## Primary table containing all tx items, queued by arrival event
|
|
|
|
# ----- index tables for byItemID ------
|
|
|
|
bySender*: TxSenderTab ##\
|
|
## Index for byItemID: `sender` > `status` > `nonce` > item
|
|
|
|
byStatus*: TxStatusTab ##\
|
|
## Index for byItemID: `status` > `nonce` > item
|
|
|
|
byRank*: TxRankTab ##\
|
|
## Ranked address table, used for sender address traversal
|
|
|
|
const
|
|
txTabMaxRejects = ##\
|
|
## Default size of rejects queue (aka waste basket.) Older waste items will
|
|
## be automatically removed so that there are no more than this many items
|
|
## in the rejects queue.
|
|
500
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private helpers
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc deleteImpl(xp: TxTabsRef; item: TxItemRef): bool
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
## Delete transaction (and wrapping container) from the database. If
|
|
## successful, the function returns the wrapping container that was just
|
|
## removed.
|
|
if xp.byItemID.delete(item.itemID).isOK:
|
|
discard xp.bySender.delete(item)
|
|
discard xp.byStatus.delete(item)
|
|
|
|
# Update address rank
|
|
let rc = xp.bySender.rank(item.sender)
|
|
if rc.isOK:
|
|
discard xp.byRank.insert(rc.value.TxRank, item.sender) # update
|
|
else:
|
|
discard xp.byRank.delete(item.sender)
|
|
|
|
return true
|
|
|
|
proc insertImpl(xp: TxTabsRef; item: TxItemRef): Result[void,TxInfo]
|
|
{.gcsafe,raises: [Defect,CatchableError].} =
|
|
if not xp.bySender.insert(item):
|
|
return err(txInfoErrSenderNonceIndex)
|
|
|
|
# Insert item
|
|
discard xp.byItemID.append(item.itemID,item)
|
|
discard xp.byStatus.insert(item)
|
|
|
|
# Update address rank
|
|
let rank = xp.bySender.rank(item.sender).value.TxRank
|
|
discard xp.byRank.insert(rank, item.sender)
|
|
|
|
return ok()
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions, constructor
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc new*(T: type TxTabsRef): T =
|
|
## Constructor, returns new tx-pool descriptor.
|
|
new result
|
|
result.maxRejects = txTabMaxRejects
|
|
|
|
# result.byLocal -- Table, no need to init
|
|
# result.byItemID -- KeyedQueue, no need to init
|
|
# result.byRejects -- KeyedQueue, no need to init
|
|
|
|
# index tables
|
|
result.bySender.init
|
|
result.byStatus.init
|
|
result.byRank.init
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions, add/remove entry
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc insert*(
|
|
xp: TxTabsRef;
|
|
tx: var Transaction;
|
|
status = txItemPending;
|
|
info = ""): Result[void,TxInfo]
|
|
{.gcsafe,raises: [Defect,CatchableError].} =
|
|
## Add new transaction argument `tx` to the database. If accepted and added
|
|
## to the database, a `key` value is returned which can be used to retrieve
|
|
## this transaction direcly via `tx[key].tx`. The following holds for the
|
|
## returned `key` value (see `[]` below for details):
|
|
## ::
|
|
## xp[key].id == key # id: transaction key stored in the wrapping container
|
|
## tx.toKey == key # holds as long as tx is not modified
|
|
##
|
|
## Adding the transaction will be rejected if the transaction key `tx.toKey`
|
|
## exists in the database already.
|
|
##
|
|
## CAVEAT:
|
|
## The returned transaction key `key` for the transaction `tx` is
|
|
## recoverable as `tx.toKey` only while the trasaction remains unmodified.
|
|
##
|
|
let itemID = tx.itemID
|
|
if xp.byItemID.hasKey(itemID):
|
|
return err(txInfoErrAlreadyKnown)
|
|
var item: TxItemRef
|
|
block:
|
|
let rc = TxItemRef.new(tx, itemID, status, info)
|
|
if rc.isErr:
|
|
return err(txInfoErrInvalidSender)
|
|
item = rc.value
|
|
block:
|
|
let rc = xp.insertImpl(item)
|
|
if rc.isErr:
|
|
return rc
|
|
ok()
|
|
|
|
proc insert*(xp: TxTabsRef; item: TxItemRef): Result[void,TxInfo]
|
|
{.gcsafe,raises: [Defect,CatchableError].} =
|
|
## Variant of `insert()` with fully qualified `item` argument.
|
|
if xp.byItemID.hasKey(item.itemID):
|
|
return err(txInfoErrAlreadyKnown)
|
|
return xp.insertImpl(item.dup)
|
|
|
|
|
|
proc reassign*(xp: TxTabsRef; item: TxItemRef; status: TxItemStatus): bool
|
|
{.gcsafe,raises: [Defect,CatchableError].} =
|
|
## Variant of `reassign()` for the `TxItemStatus` flag.
|
|
# make sure that the argument `item` is not some copy
|
|
let rc = xp.byItemID.eq(item.itemID)
|
|
if rc.isOK:
|
|
var realItem = rc.value
|
|
if realItem.status != status:
|
|
discard xp.bySender.delete(realItem) # delete original
|
|
discard xp.byStatus.delete(realItem)
|
|
realItem.status = status
|
|
discard xp.bySender.insert(realItem) # re-insert changed
|
|
discard xp.byStatus.insert(realItem)
|
|
return true
|
|
|
|
|
|
proc flushRejects*(xp: TxTabsRef; maxItems = int.high): (int,int)
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
## Flush/delete at most `maxItems` oldest items from the waste basket and
|
|
## return the numbers of deleted and remaining items (a waste basket item
|
|
## is considered older if it was moved there earlier.)
|
|
if xp.byRejects.len <= maxItems:
|
|
result[0] = xp.byRejects.len
|
|
xp.byRejects.clear
|
|
return # result
|
|
while result[0] < maxItems:
|
|
if xp.byRejects.shift.isErr:
|
|
break
|
|
result[0].inc
|
|
result[1] = xp.byRejects.len
|
|
|
|
|
|
proc dispose*(xp: TxTabsRef; item: TxItemRef; reason: TxInfo): bool
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
## Move argument `item` to rejects queue (aka waste basket.)
|
|
if xp.deleteImpl(item):
|
|
if xp.maxRejects <= xp.byRejects.len:
|
|
discard xp.flushRejects(1 + xp.byRejects.len - xp.maxRejects)
|
|
item.reject = reason
|
|
xp.byRejects[item.itemID] = item
|
|
return true
|
|
|
|
proc reject*(xp: TxTabsRef; tx: var Transaction;
|
|
reason: TxInfo; status = txItemPending; info = "")
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
## Similar to dispose but for a tx without the item wrapper, the function
|
|
## imports the tx into the waste basket (e.g. after it could not
|
|
## be inserted.)
|
|
if xp.maxRejects <= xp.byRejects.len:
|
|
discard xp.flushRejects(1 + xp.byRejects.len - xp.maxRejects)
|
|
let item = TxItemRef.new(tx, reason, status, info)
|
|
xp.byRejects[item.itemID] = item
|
|
|
|
proc reject*(xp: TxTabsRef; item: TxItemRef; reason: TxInfo)
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
## Variant of `reject()` with `item` rather than `tx` (assuming
|
|
## `item` is not in the database.)
|
|
if xp.maxRejects <= xp.byRejects.len:
|
|
discard xp.flushRejects(1 + xp.byRejects.len - xp.maxRejects)
|
|
item.reject = reason
|
|
xp.byRejects[item.itemID] = item
|
|
|
|
proc reject*(xp: TxTabsRef; tx: Transaction;
|
|
reason: TxInfo; status = txItemPending; info = "")
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
## Variant of `reject()`
|
|
var ty = tx
|
|
xp.reject(ty, reason, status)
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public getters
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc baseFee*(xp: TxTabsRef): GasPrice =
|
|
## Getter
|
|
xp.bySender.baseFee
|
|
|
|
proc maxRejects*(xp: TxTabsRef): int =
|
|
## Getter
|
|
xp.maxRejects
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions, setters
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc `baseFee=`*(xp: TxTabsRef; val: GasPrice)
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
## Setter, update may cause database re-org
|
|
if xp.bySender.baseFee != val:
|
|
xp.bySender.baseFee = val
|
|
# Build new rank table
|
|
xp.byRank.clear
|
|
for (address,rank) in xp.bySender.accounts:
|
|
discard xp.byRank.insert(rank.TxRank, address)
|
|
|
|
|
|
proc `maxRejects=`*(xp: TxTabsRef; val: int) =
|
|
## Setter, applicable with next `reject()` invocation.
|
|
xp.maxRejects = val
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions, miscellaneous
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc hasTx*(xp: TxTabsRef; tx: Transaction): bool =
|
|
## Returns `true` if the argument pair `(key,local)` exists in the
|
|
## database.
|
|
##
|
|
## If this function returns `true`, then it is save to use the `xp[key]`
|
|
## paradigm for accessing a transaction container.
|
|
xp.byItemID.hasKey(tx.itemID)
|
|
|
|
proc nItems*(xp: TxTabsRef): TxTabsItemsCount
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
result.pending = xp.byStatus.eq(txItemPending).nItems
|
|
result.staged = xp.byStatus.eq(txItemStaged).nItems
|
|
result.packed = xp.byStatus.eq(txItemPacked).nItems
|
|
result.total = xp.byItemID.len
|
|
result.disposed = xp.byRejects.len
|
|
|
|
proc gasTotals*(xp: TxTabsRef): TxTabsGasTotals
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
result.pending = xp.byStatus.eq(txItemPending).gasLimits
|
|
result.staged = xp.byStatus.eq(txItemStaged).gasLimits
|
|
result.packed = xp.byStatus.eq(txItemPacked).gasLimits
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions: local/remote sender accounts
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc isLocal*(xp: TxTabsRef; sender: EthAddress): bool =
|
|
## Returns `true` if account address is local
|
|
xp.byLocal.hasKey(sender)
|
|
|
|
proc locals*(xp: TxTabsRef): seq[EthAddress] =
|
|
## Returns an unsorted list of addresses tagged *local*
|
|
toSeq(xp.byLocal.keys)
|
|
|
|
proc remotes*(xp: TxTabsRef): seq[EthAddress] =
|
|
## Returns an sorted list of untagged addresses, highest address rank first
|
|
var rcRank = xp.byRank.le(TxRank.high)
|
|
while rcRank.isOK:
|
|
let (rank, addrList) = (rcRank.value.key, rcRank.value.data)
|
|
for account in addrList.keys:
|
|
if not xp.byLocal.hasKey(account):
|
|
result.add account
|
|
rcRank = xp.byRank.lt(rank)
|
|
|
|
proc setLocal*(xp: TxTabsRef; sender: EthAddress) =
|
|
## Tag `sender` address argument *local*
|
|
xp.byLocal[sender] = true
|
|
|
|
proc resLocal*(xp: TxTabsRef; sender: EthAddress) =
|
|
## Untag *local* `sender` address argument.
|
|
xp.byLocal.del(sender)
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public iterators, `TxRank` > `(EthAddress,TxStatusNonceRef)`
|
|
# ------------------------------------------------------------------------------
|
|
|
|
iterator incAccount*(xp: TxTabsRef; bucket: TxItemStatus;
|
|
fromRank = TxRank.low): (EthAddress,TxStatusNonceRef)
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
## Walk accounts with increasing ranks and return a nonce-ordered item list.
|
|
let rcBucket = xp.byStatus.eq(bucket)
|
|
if rcBucket.isOK:
|
|
let bucketList = xp.byStatus.eq(bucket).value.data
|
|
|
|
var rcRank = xp.byRank.ge(fromRank)
|
|
while rcRank.isOK:
|
|
let (rank, addrList) = (rcRank.value.key, rcRank.value.data)
|
|
|
|
# Use adresses for this rank which are also found in the bucket
|
|
for account in addrList.keys:
|
|
let rcAccount = bucketList.eq(account)
|
|
if rcAccount.isOK:
|
|
yield (account, rcAccount.value.data)
|
|
|
|
# Get next ranked address list (top down index walk)
|
|
rcRank = xp.byRank.gt(rank) # potenially modified database
|
|
|
|
|
|
iterator decAccount*(xp: TxTabsRef; bucket: TxItemStatus;
|
|
fromRank = TxRank.high): (EthAddress,TxStatusNonceRef)
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
## Walk accounts with decreasing ranks and return the nonce-ordered item list.
|
|
let rcBucket = xp.byStatus.eq(bucket)
|
|
if rcBucket.isOK:
|
|
let bucketList = xp.byStatus.eq(bucket).value.data
|
|
|
|
var rcRank = xp.byRank.le(fromRank)
|
|
while rcRank.isOK:
|
|
let (rank, addrList) = (rcRank.value.key, rcRank.value.data)
|
|
|
|
# Use adresses for this rank which are also found in the bucket
|
|
for account in addrList.keys:
|
|
let rcAccount = bucketList.eq(account)
|
|
if rcAccount.isOK:
|
|
yield (account, rcAccount.value.data)
|
|
|
|
# Get next ranked address list (top down index walk)
|
|
rcRank = xp.byRank.lt(rank) # potenially modified database
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public iterators, `TxRank` > `(EthAddress,TxSenderNonceRef)`
|
|
# ------------------------------------------------------------------------------
|
|
|
|
iterator incAccount*(xp: TxTabsRef;
|
|
fromRank = TxRank.low): (EthAddress,TxSenderNonceRef)
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
## Variant of `incAccount()` without bucket restriction.
|
|
var rcRank = xp.byRank.ge(fromRank)
|
|
while rcRank.isOK:
|
|
let (rank, addrList) = (rcRank.value.key, rcRank.value.data)
|
|
|
|
# Try all sender adresses found
|
|
for account in addrList.keys:
|
|
yield (account, xp.bySender.eq(account).any.value.data)
|
|
|
|
# Get next ranked address list (top down index walk)
|
|
rcRank = xp.byRank.gt(rank) # potenially modified database
|
|
|
|
|
|
iterator decAccount*(xp: TxTabsRef;
|
|
fromRank = TxRank.high): (EthAddress,TxSenderNonceRef)
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
## Variant of `decAccount()` without bucket restriction.
|
|
var rcRank = xp.byRank.le(fromRank)
|
|
while rcRank.isOK:
|
|
let (rank, addrList) = (rcRank.value.key, rcRank.value.data)
|
|
|
|
# Try all sender adresses found
|
|
for account in addrList.keys:
|
|
yield (account, xp.bySender.eq(account).any.value.data)
|
|
|
|
# Get next ranked address list (top down index walk)
|
|
rcRank = xp.byRank.lt(rank) # potenially modified database
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Public second stage iterators: nonce-ordered item lists.
|
|
# -----------------------------------------------------------------------------
|
|
|
|
iterator incNonce*(nonceList: TxSenderNonceRef;
|
|
nonceFrom = AccountNonce.low): TxItemRef
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
## Second stage iterator inside `incAccount()` or `decAccount()`. The
|
|
## items visited are always sorted by least-nonce first.
|
|
var rc = nonceList.ge(nonceFrom)
|
|
while rc.isOk:
|
|
let (nonce, item) = (rc.value.key, rc.value.data)
|
|
yield item
|
|
rc = nonceList.gt(nonce) # potenially modified database
|
|
|
|
|
|
iterator incNonce*(nonceList: TxStatusNonceRef;
|
|
nonceFrom = AccountNonce.low): TxItemRef =
|
|
## Variant of `incNonce()` for the `TxStatusNonceRef` list.
|
|
var rc = nonceList.ge(nonceFrom)
|
|
while rc.isOK:
|
|
let (nonce, item) = (rc.value.key, rc.value.data)
|
|
yield item
|
|
rc = nonceList.gt(nonce) # potenially modified database
|
|
|
|
#[
|
|
# There is currently no use for nonce count down traversal
|
|
|
|
iterator decNonce*(nonceList: TxSenderNonceRef;
|
|
nonceFrom = AccountNonce.high): TxItemRef
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
## Similar to `incNonce()` but visiting items in reverse order.
|
|
var rc = nonceList.le(nonceFrom)
|
|
while rc.isOk:
|
|
let (nonce, item) = (rc.value.key, rc.value.data)
|
|
yield item
|
|
rc = nonceList.lt(nonce) # potenially modified database
|
|
|
|
|
|
iterator decNonce*(nonceList: TxStatusNonceRef;
|
|
nonceFrom = AccountNonce.high): TxItemRef =
|
|
## Variant of `decNonce()` for the `TxStatusNonceRef` list.
|
|
var rc = nonceList.le(nonceFrom)
|
|
while rc.isOK:
|
|
let (nonce, item) = (rc.value.key, rc.value.data)
|
|
yield item
|
|
rc = nonceList.lt(nonce) # potenially modified database
|
|
]#
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions, debugging
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc verify*(xp: TxTabsRef): Result[void,TxInfo]
|
|
{.gcsafe, raises: [Defect,CatchableError].} =
|
|
## Verify descriptor and subsequent data structures.
|
|
block:
|
|
let rc = xp.bySender.verify
|
|
if rc.isErr:
|
|
return rc
|
|
block:
|
|
let rc = xp.byItemID.verify
|
|
if rc.isErr:
|
|
return err(txInfoVfyItemIdList)
|
|
block:
|
|
let rc = xp.byRejects.verify
|
|
if rc.isErr:
|
|
return err(txInfoVfyRejectsList)
|
|
block:
|
|
let rc = xp.byStatus.verify
|
|
if rc.isErr:
|
|
return rc
|
|
block:
|
|
let rc = xp.byRank.verify
|
|
if rc.isErr:
|
|
return rc
|
|
|
|
for status in TxItemStatus:
|
|
var
|
|
statusCount = 0
|
|
statusAllGas = 0.GasInt
|
|
for (account,nonceList) in xp.incAccount(status):
|
|
let bySenderStatusList = xp.bySender.eq(account).eq(status)
|
|
statusAllGas += bySenderStatusList.gasLimits
|
|
statusCount += bySenderStatusList.nItems
|
|
if bySenderStatusList.nItems != nonceList.nItems:
|
|
return err(txInfoVfyStatusSenderTotal)
|
|
|
|
if xp.byStatus.eq(status).nItems != statusCount:
|
|
return err(txInfoVfyStatusSenderTotal)
|
|
if xp.byStatus.eq(status).gasLimits != statusAllGas:
|
|
return err(txInfoVfyStatusSenderGasLimits)
|
|
|
|
if xp.byItemID.len != xp.bySender.nItems:
|
|
return err(txInfoVfySenderTotal)
|
|
|
|
if xp.byItemID.len != xp.byStatus.nItems:
|
|
return err(txInfoVfyStatusTotal)
|
|
|
|
if xp.bySender.len != xp.byRank.nItems:
|
|
return err(txInfoVfyRankTotal)
|
|
ok()
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# End
|
|
# ------------------------------------------------------------------------------
|