jangko 01980041d5
setup nimbus tx pool for app wide usage
currently the txpool is used at:
- sealer engine
- graphql ethapi
- json rpc ethapi
- ws rpc ethapi
2022-01-25 11:38:49 +07:00

246 lines
9.1 KiB
Nim

# Nimbus
# Copyright (c) 2018 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed except
# according to those terms.
## Transaction Pool Tasklet: Move Head of Block Chain
## ==================================================
##
import
std/[tables],
../../../db/db_chain,
../tx_chain,
../tx_desc,
../tx_info,
../tx_item,
chronicles,
eth/[common, keys],
stew/keyed_queue
{.push raises: [Defect].}
type
TxHeadDiffRef* = ref object ##\
## Diff data, txs changes that apply after changing the head\
## insertion point of the block chain
addTxs*: KeyedQueue[Hash256,Transaction] ##\
## txs to add; using a queue makes it more intuive to delete
## items while travesing the queue in a loop.
remTxs*: Table[Hash256,bool] ##\
## txs to remove
logScope:
topics = "tx-pool head adjust"
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
# use it as a stack/lifo as the ordering is reversed
proc insert(xp: TxPoolRef; kq: TxHeadDiffRef; blockHash: Hash256)
{.gcsafe,raises: [Defect,CatchableError].} =
for tx in xp.chain.db.getBlockBody(blockHash).transactions:
kq.addTxs[tx.itemID] = tx
proc remove(xp: TxPoolRef; kq: TxHeadDiffRef; blockHash: Hash256)
{.gcsafe,raises: [Defect,CatchableError].} =
for tx in xp.chain.db.getBlockBody(blockHash).transactions:
kq.remTxs[tx.itemID] = true
proc new(T: type TxHeadDiffRef): T =
new result
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
# core/tx_pool.go(218): func (pool *TxPool) reset(oldHead, newHead ...
proc headDiff*(xp: TxPoolRef;
newHead: BlockHeader): Result[TxHeadDiffRef,TxInfo]
{.gcsafe,raises: [Defect,CatchableError].} =
## This function caclulates the txs differences between the cached block
## chain head to a new head implied by the argument `newHeader`. Differences
## are returned as two tables for adding and removing txs. The tables table
## for adding transactions (is a queue and) preserves the order of the txs
## from the block chain.
##
## Now, considering add/delete tx actions needed when replacing the cached
## *current head* position by the *new head* position (as derived from the
## function argument `newHeader`), the most complex case of a block chain
## might look as follows:
## ::
## . o---o-- .. ---o---o
## . / ^
## . block chain .. ---o---o---o .. --o |
## . ^ ^ |
## . | | |
## . common ancestor | |
## . | |
## . new head |
## . |
## . current head
##
## Legend
## * the bullet *o* stands for a block
## * a link *---* between two blocks indicates that the block number to
## the right increases by *1*
## * the *common ancestor* is chosen with the largest possible block number
## not exceeding the block numbers of both, the *current head* and the
## *new head*
## * the branches to the right of the *common ancestor* may collapse to a
## a single branch (in which case at least one of *old head* or
## *new head* collapses with the *common ancestor*)
## * there is no assumption on the block numbers of *new head* and
## *current head* as of which one is greater, they might also be equal
##
## Consider the two sets *ADD* and *DEL* where
##
## *ADD*
## is the set of txs on the branch between the *common ancestor* and
## the *current head*, and
## *DEL*
## is the set of txs on the branch between the *common ancestor* and
## the *new head*
##
## Then, the set of txs to be added to the pool is *ADD - DEL* and the set
## of txs to be removed is *DEL - ADD*.
##
let
curHead = xp.chain.head
curHash = curHead.blockHash
newHash = newHead.blockHash
var ignHeader: BlockHeader
if not xp.chain.db.getBlockHeader(newHash, ignHeader):
# sanity check
warn "Tx-pool head forward for non-existing header",
newHead = newHash,
newNumber = newHead.blockNumber
return err(txInfoErrForwardHeadMissing)
if not xp.chain.db.getBlockHeader(curHash, ignHeader):
# This can happen if a `setHead()` is performed, where we have discarded
# the old head from the chain.
if curHead.blockNumber <= newHead.blockNumber:
warn "Tx-pool head forward from detached current header",
curHead = curHash,
curNumber = curHead.blockNumber
return err(txInfoErrAncestorMissing)
debug "Tx-pool reset with detached current head",
curHeader = curHash,
curNumber = curHead.blockNumber,
newHeader = newHash,
newNumber = newHead.blockNumber
return err(txInfoErrChainHeadMissing)
# Equalise block numbers between branches (typically, these btanches
# collapse and there is a single strain only)
var
txDiffs = TxHeadDiffRef.new
curBranchHead = curHead
curBranchHash = curHash
newBranchHead = newHead
newBranchHash = newHash
if newHead.blockNumber < curHead.blockNumber:
#
# new head block number smaller than the current head one
#
# ,o---o-- ..--o
# / ^
# / |
# ----o---o---o |
# ^ |
# | |
# new << current (step back this one)
#
# preserve transactions on the upper branch block numbers
# between #new..#current to be re-inserted into the pool
#
while newHead.blockNumber < curBranchHead.blockNumber:
xp.insert(txDiffs, curBranchHash)
let
tmpHead = curBranchHead # cache value for error logging
tmpHash = curBranchHash
curBranchHash = curBranchHead.parentHash # decrement block number
if not xp.chain.db.getBlockHeader(curBranchHash, curBranchHead):
error "Unrooted old chain seen by tx-pool",
curBranchHead = tmpHash,
curBranchNumber = tmpHead.blockNumber
return err(txInfoErrUnrootedCurChain)
else:
#
# current head block number smaller (or equal) than the new head one
#
# ,o---o-- ..--o
# / ^
# / |
# ----o---o---o |
# ^ |
# | |
# current << new (step back this one)
#
# preserve transactions on the upper branch block numbers
# between #current..#new to be deleted from the pool
#
while curHead.blockNumber < newBranchHead.blockNumber:
xp.remove(txDiffs, curBranchHash)
let
tmpHead = newBranchHead # cache value for error logging
tmpHash = newBranchHash
newBranchHash = newBranchHead.parentHash # decrement block number
if not xp.chain.db.getBlockHeader(newBranchHash, newBranchHead):
error "Unrooted new chain seen by tx-pool",
newBranchHead = tmpHash,
newBranchNumber = tmpHead.blockNumber
return err(txInfoErrUnrootedNewChain)
# simultaneously step back until junction-head (aka common ancestor) while
# preserving txs between block numbers #ancestor..#current unless
# between #ancestor..#new
while curBranchHash != newBranchHash:
block:
xp.insert(txDiffs, curBranchHash)
let
tmpHead = curBranchHead # cache value for error logging
tmpHash = curBranchHash
curBranchHash = curBranchHead.parentHash
if not xp.chain.db.getBlockHeader(curBranchHash, curBranchHead):
error "Unrooted old chain seen by tx-pool",
curBranchHead = tmpHash,
curBranchNumber = tmpHead.blockNumber
return err(txInfoErrUnrootedCurChain)
block:
xp.remove(txDiffs, newBranchHash)
let
tmpHead = newBranchHead # cache value for error logging
tmpHash = newBranchHash
newBranchHash = newBranchHead.parentHash
if not xp.chain.db.getBlockHeader(newBranchHash, newBranchHead):
error "Unrooted new chain seen by tx-pool",
newBranchHead = tmpHash,
newBranchNumber = tmpHead.blockNumber
return err(txInfoErrUnrootedNewChain)
# figure out difference sets
for itemID in txDiffs.addTxs.nextKeys:
if txDiffs.remTxs.hasKey(itemID):
txDiffs.addTxs.del(itemID) # ok to delete the current one on a KeyedQueue
txDiffs.remTxs.del(itemID)
ok(txDiffs)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------