mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-02-05 16:54:00 +00:00
103656dbb5
details: For documentation, see comments in the file tx_pool.nim. For prettified manual pages run 'make docs' in the nimbus directory and point your web browser to the newly created 'docs' directory.
264 lines
7.7 KiB
Nim
264 lines
7.7 KiB
Nim
# Nimbus
|
|
# Copyright (c) 2018 Status Research & Development GmbH
|
|
# Licensed under either of
|
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
|
# http://www.apache.org/licenses/LICENSE-2.0)
|
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
|
# http://opensource.org/licenses/MIT)
|
|
# at your option. This file may not be copied, modified, or distributed except
|
|
# according to those terms.
|
|
|
|
## Jobs Queue For Transaction Pool
|
|
## ===============================
|
|
##
|
|
|
|
import
|
|
std/[hashes, tables],
|
|
./tx_info,
|
|
./tx_item,
|
|
./tx_tabs,
|
|
eth/[common, keys],
|
|
stew/[keyed_queue, keyed_queue/kq_debug, results]
|
|
|
|
{.push raises: [Defect].}
|
|
|
|
# hide complexity unless really needed
|
|
const
|
|
jobWaitCompilerFlag = defined(job_wait_enabled) or defined(debug)
|
|
|
|
JobWaitEnabled* = ##\
|
|
## Compiler flag: fire *chronos* event if job queue becomes populated
|
|
jobWaitCompilerFlag
|
|
|
|
when JobWaitEnabled:
|
|
import chronos
|
|
|
|
|
|
type
|
|
TxJobID* = ##\
|
|
## Valid interval: *1 .. TxJobIdMax*, the value `0` corresponds to\
|
|
## `TxJobIdMax` and is internally accepted only right after initialisation.
|
|
distinct uint
|
|
|
|
TxJobKind* = enum ##\
|
|
## Types of batch job data. See `txJobPriorityKind` for the list of\
|
|
## *out-of-band* jobs.
|
|
|
|
txJobNone = 0 ##\
|
|
## no action
|
|
|
|
txJobAddTxs ##\
|
|
## Enqueues a batch of transactions
|
|
|
|
txJobDelItemIDs ##\
|
|
## Enqueues a batch of itemIDs the items of which to be disposed
|
|
|
|
const
|
|
txJobPriorityKind*: set[TxJobKind] = ##\
|
|
## Prioritised jobs, either small or important ones.
|
|
{}
|
|
|
|
type
|
|
TxJobDataRef* = ref object
|
|
case kind*: TxJobKind
|
|
of txJobNone:
|
|
discard
|
|
|
|
of txJobAddTxs:
|
|
addTxsArgs*: tuple[
|
|
txs: seq[Transaction],
|
|
info: string]
|
|
|
|
of txJobDelItemIDs:
|
|
delItemIDsArgs*: tuple[
|
|
itemIDs: seq[Hash256],
|
|
reason: TxInfo]
|
|
|
|
|
|
TxJobPair* = object ## Responding to a job queue query
|
|
id*: TxJobID ## Job ID, queue database key
|
|
data*: TxJobDataRef ## Data record
|
|
|
|
|
|
TxJobRef* = ref object ##\
|
|
## Job queue with increasing job *ID* numbers (wrapping around at\
|
|
## `TxJobIdMax`.)
|
|
topID: TxJobID ## Next job will have `topID+1`
|
|
jobs: KeyedQueue[TxJobID,TxJobDataRef] ## Job queue
|
|
|
|
# hide complexity unless really needed
|
|
when JobWaitEnabled:
|
|
jobsAvail: AsyncEvent ## Fired if there is a job available
|
|
|
|
const
|
|
txJobIdMax* = ##\
|
|
## Wraps around to `1` after last ID
|
|
999999.TxJobID
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private helpers
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc hash(id: TxJobID): Hash =
|
|
## Needed if `TxJobID` is used as hash-`Table` index.
|
|
id.uint.hash
|
|
|
|
proc `+`(a, b: TxJobID): TxJobID {.borrow.}
|
|
proc `-`(a, b: TxJobID): TxJobID {.borrow.}
|
|
|
|
proc `+`(a: TxJobID; b: int): TxJobID = a + b.TxJobID
|
|
proc `-`(a: TxJobID; b: int): TxJobID = a - b.TxJobID
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public helpers (operators needed in jobAppend() and jobUnshift() functions)
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc `<=`*(a, b: TxJobID): bool {.borrow.}
|
|
proc `==`*(a, b: TxJobID): bool {.borrow.}
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private functions
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc jobAppend(jq: TxJobRef; data: TxJobDataRef): TxJobID
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
## Appends a job to the *FIFO*. This function returns a non-zero *ID* if
|
|
## successful.
|
|
##
|
|
## :Note:
|
|
## An error can only occur if the *ID* of the first job follows the *ID*
|
|
## of the last job (*modulo* `TxJobIdMax`). This occurs when
|
|
## * there are `TxJobIdMax` jobs already on the queue
|
|
## * some jobs were deleted in the middle of the queue and the *ID*
|
|
## gap was not shifted out yet.
|
|
var id: TxJobID
|
|
if txJobIdMax <= jq.topID:
|
|
id = 1.TxJobID
|
|
else:
|
|
id = jq.topID + 1
|
|
if jq.jobs.append(id, data):
|
|
jq.topID = id
|
|
return id
|
|
|
|
proc jobUnshift(jq: TxJobRef; data: TxJobDataRef): TxJobID
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
## Stores *back* a job to to the *FIFO* front end be re-fetched next. This
|
|
## function returns a non-zero *ID* if successful.
|
|
##
|
|
## See also the **Note* at the comment for `txAdd()`.
|
|
var id: TxJobID
|
|
if jq.jobs.len == 0:
|
|
if jq.topID == 0.TxJobID:
|
|
jq.topID = txJobIdMax # must be non-zero after first use
|
|
id = jq.topID
|
|
else:
|
|
id = jq.jobs.firstKey.value - 1
|
|
if id == 0.TxJobID:
|
|
id = txJobIdMax
|
|
if jq.jobs.unshift(id, data):
|
|
return id
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions, constructor
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc new*(T: type TxJobRef; initSize = 10): T =
|
|
## Constructor variant
|
|
new result
|
|
result.jobs.init(initSize)
|
|
|
|
# hide complexity unless really needed
|
|
when JobWaitEnabled:
|
|
result.jobsAvail = newAsyncEvent()
|
|
|
|
|
|
proc clear*(jq: TxJobRef) =
|
|
## Re-initilaise variant
|
|
jq.jobs.clear
|
|
|
|
# hide complexity unless really needed
|
|
when JobWaitEnabled:
|
|
jq.jobsAvail.clear
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions, add/remove entry
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc add*(jq: TxJobRef; data: TxJobDataRef): TxJobID
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
## Add a new job to the *FIFO*.
|
|
if data.kind in txJobPriorityKind:
|
|
result = jq.jobUnshift(data)
|
|
else:
|
|
result = jq.jobAppend(data)
|
|
|
|
# hide complexity unless really needed
|
|
when JobWaitEnabled:
|
|
# update event
|
|
jq.jobsAvail.fire
|
|
|
|
|
|
proc fetch*(jq: TxJobRef): Result[TxJobPair,void]
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
## Fetches (and deletes) the next job from the *FIFO*.
|
|
|
|
# first item from queue
|
|
let rc = jq.jobs.shift
|
|
if rc.isErr:
|
|
return err()
|
|
|
|
# hide complexity unless really needed
|
|
when JobWaitEnabled:
|
|
# update event
|
|
jq.jobsAvail.clear
|
|
|
|
# result
|
|
ok(TxJobPair(id: rc.value.key, data: rc.value.data))
|
|
|
|
|
|
# hide complexity unless really needed
|
|
when JobWaitEnabled:
|
|
proc waitAvail*(jq: TxJobRef) {.async,raises: [Defect,CatchableError].} =
|
|
## Asynchronously wait until at least one job is available (available
|
|
## only if the `JobWaitEnabled` compile time constant is set.)
|
|
if jq.jobs.len == 0:
|
|
await jq.jobsAvail.wait
|
|
else:
|
|
proc waitAvail*(jq: TxJobRef)
|
|
{.deprecated: "will raise exception unless JobWaitEnabled is set",
|
|
raises: [Defect,CatchableError].} =
|
|
raiseAssert "Must not be called unless JobWaitEnabled is set"
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public queue/table ops
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc`[]`*(jq: TxJobRef; id: TxJobID): TxJobDataRef
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
jq.jobs[id]
|
|
|
|
proc hasKey*(jq: TxJobRef; id: TxJobID): bool
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
jq.jobs.hasKey(id)
|
|
|
|
proc len*(jq: TxJobRef): int
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
jq.jobs.len
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions, debugging
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc verify*(jq: TxJobRef): Result[void,TxInfo]
|
|
{.gcsafe,raises: [Defect,KeyError].} =
|
|
block:
|
|
let rc = jq.jobs.verify
|
|
if rc.isErr:
|
|
return err(txInfoVfyJobQueue)
|
|
|
|
ok()
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# End
|
|
# ------------------------------------------------------------------------------
|