mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-01-27 20:45:48 +00:00
c0d580715e
* Re-model persistent database access why: Storage slots healing just run on the wrong sub-trie (i.e. the wrong key mapping). So get/put and bulk functions now use the definitions in `snapdb_desc` (earlier there were some shortcuts for `get()`.) * Fixes: missing return code, typo, redundant imports etc. * Remove obsolete debugging directives from `worker_desc` module * Correct failing unit tests for storage slots trie inspection why: Some pathological cases for the extended tests do not produce any hexary trie data. This is rightly detected by the trie inspection and the result checks needed to adjusted.
672 lines
23 KiB
Nim
672 lines
23 KiB
Nim
# Nimbus
|
|
# Copyright (c) 2018-2021 Status Research & Development GmbH
|
|
# Licensed and distributed under either of
|
|
# * MIT license (license terms in the root directory or at
|
|
# https://opensource.org/licenses/MIT).
|
|
# * Apache v2 license (license terms in the root directory or at
|
|
# https://www.apache.org/licenses/LICENSE-2.0).
|
|
# at your option. This file may not be copied, modified, or distributed
|
|
# except according to those terms.
|
|
|
|
## Fetch and queue blocks
|
|
## ======================
|
|
##
|
|
## Worker items state diagram and sketch of sync algorithm:
|
|
## ::
|
|
## unprocessed | | ready for | store into
|
|
## block ranges | peer workers | persistent database | database
|
|
## =======================================================================
|
|
##
|
|
## +------------------------------------------+
|
|
## | |
|
|
## | +----------------------------+ |
|
|
## | | | |
|
|
## V v | |
|
|
## <unprocessed> ---+---> <worker-0> ---+-----> <staged> -------> OUTPUT
|
|
## | |
|
|
## +---> <worker-1> ---+
|
|
## | |
|
|
## +---> <worker-2> ---+
|
|
## : :
|
|
##
|
|
## A work item is created from a range of block numbers extracted from the
|
|
## `<unprocessed>` set of block ranges.
|
|
##
|
|
## A work item consists of a
|
|
## * current state `<worker-#>` or `<staged>`
|
|
## * given range of consecutive block numbers `[from..to]`
|
|
## * sequence of block headers relating to `[from..to]` (to be completed)
|
|
## * sequence of block buddies relating to `[from..to]` (to be completed)
|
|
##
|
|
## Block ranges *may* be recycled back into the `<unprocessed>` set when a
|
|
## work item is destroyed. This is supposed to be an exceptional case.
|
|
## Typically, a `<staged>` work item is added to the persistent block chain
|
|
## database and destroyed without block range recycling.
|
|
##
|
|
## Beware of `<staged>` overflow
|
|
## -----------------------------
|
|
## When the `<staged>` queue gets too long in non-backtrack/re-org mode, this
|
|
## may be caused by a gap between the least `<unprocessed>` block number and
|
|
## the least `<staged>` block number. Then a mechanism is invoked where
|
|
## `<unprocessed>` block range is updated.
|
|
##
|
|
## For backtrack/re-org the system runs in single instance mode tracing
|
|
## backvards parent hash references. So updating `<unprocessed>` block numbers
|
|
## would have no effect. In that case, the record with the largest block
|
|
## numbers are deleted from the `<staged>` list.
|
|
##
|
|
|
|
import
|
|
std/[algorithm, options, sequtils, strutils],
|
|
chronicles,
|
|
chronos,
|
|
eth/[common, p2p],
|
|
stew/[byteutils, interval_set, sorted_set],
|
|
"../.."/[db/db_chain, utils],
|
|
".."/[protocol, sync_desc, types]
|
|
|
|
{.push raises:[Defect].}
|
|
|
|
logScope:
|
|
topics = "block-queue"
|
|
|
|
const
|
|
maxStagedWorkItems = 70
|
|
## Maximal items in the `staged` list.
|
|
|
|
stagedWorkItemsTrigger = 50
|
|
## Turn on the global `poolMode` if there are more than this many items
|
|
## staged.
|
|
|
|
type
|
|
BlockQueueRC* = enum
|
|
## Return & error codes
|
|
AllSmileOk
|
|
EmptyQueue
|
|
StagedQueueOverflow
|
|
BlockNumberGap
|
|
BacktrackDisabled
|
|
FetchHeadersError
|
|
FetchBodiesError
|
|
NoMoreUnprocessed
|
|
NoMorePeerBlocks
|
|
|
|
BlockRangeSetRef = IntervalSetRef[BlockNumber,UInt256]
|
|
## Disjunct sets of block number intervals
|
|
|
|
BlockRange = Interval[BlockNumber,UInt256]
|
|
## Block number interval
|
|
|
|
BlockItemQueue = SortedSet[BlockNumber,BlockItemRef]
|
|
## Block intervals sorted by least block number
|
|
|
|
BlockItemWalkRef = SortedSetWalkRef[BlockNumber,BlockItemRef]
|
|
## Fast traversal descriptor for `BlockItemQueue`
|
|
|
|
BlockItemRef* = ref object
|
|
## Public block items, OUTPUT
|
|
blocks*: BlockRange ## Block numbers ranvge covered
|
|
topHash*: Option[Hash256] ## Fetched by top hash rather than block
|
|
headers*: seq[BlockHeader] ## Block headers received
|
|
hashes*: seq[Hash256] ## Hashed from `headers[]` for convenience
|
|
bodies*: seq[BlockBody] ## Block bodies received
|
|
|
|
BlockQueueCtxRef* = ref object
|
|
## Globally shared data among `block` instances
|
|
backtrack: Option[Hash256] ## Find reverse block after re-org
|
|
unprocessed: BlockRangeSetRef ## Block ranges to fetch
|
|
staged: BlockItemQueue ## Blocks fetched but not stored yet
|
|
topAccepted: BlockNumber ## Up to this block number processed OK
|
|
|
|
BlockQueueWorkerRef* = ref object
|
|
## Local descriptor data extension
|
|
global: BlockQueueCtxRef ## Common data
|
|
bestNumber: Option[BlockNumber] ## Largest block number reported
|
|
ctrl: BuddyCtrlRef ## Control and state settings
|
|
peer: Peer ## network peer
|
|
|
|
BlockQueueStats* = object
|
|
## Statistics
|
|
topAccepted*: BlockNumber
|
|
nextUnprocessed*: Option[BlockNumber]
|
|
nextStaged*: Option[BlockNumber]
|
|
nStagedQueue*: int
|
|
reOrg*: bool
|
|
|
|
let
|
|
highBlockNumber = high(BlockNumber)
|
|
highBlockRange = BlockRange.new(highBlockNumber,highBlockNumber)
|
|
|
|
static:
|
|
doAssert stagedWorkItemsTrigger < maxStagedWorkItems
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private helpers
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc `+`(n: BlockNumber; delta: static[int]): BlockNumber =
|
|
## Syntactic sugar for expressions like `xxx.toBlockNumber + 1`
|
|
n + delta.toBlockNumber
|
|
|
|
proc `-`(n: BlockNumber; delta: static[int]): BlockNumber =
|
|
## Syntactic sugar for expressions like `xxx.toBlockNumber - 1`
|
|
n - delta.toBlockNumber
|
|
|
|
proc merge(ivSet: BlockRangeSetRef; wi: BlockItemRef): Uint256 =
|
|
## Syntactic sugar
|
|
ivSet.merge(wi.blocks)
|
|
|
|
proc reduce(ivSet: BlockRangeSetRef; wi: BlockItemRef): Uint256 =
|
|
## Syntactic sugar
|
|
ivSet.reduce(wi.blocks)
|
|
|
|
# ---------------
|
|
|
|
proc pp(n: BlockNumber): string =
|
|
## Dedicated pretty printer (`$` is defined elsewhere using `UInt256`)
|
|
if n == highBlockNumber: "high" else:"#" & $n
|
|
|
|
proc `$`(iv: BlockRange): string =
|
|
## Needed for macro generated DSL files like `snap.nim` because the
|
|
## `distinct` flavour of `NodeTag` is discarded there.
|
|
result = "[" & iv.minPt.pp
|
|
if iv.minPt != iv.maxPt:
|
|
result &= "," & iv.maxPt.pp
|
|
result &= "]"
|
|
|
|
proc `$`(n: Option[BlockRange]): string =
|
|
if n.isNone: "n/a" else: $n.get
|
|
|
|
proc `$`(n: Option[BlockNumber]): string =
|
|
if n.isNone: "n/a" else: n.get.pp
|
|
|
|
proc `$`(brs: BlockRangeSetRef): string =
|
|
"{" & toSeq(brs.increasing).mapIt($it).join(",") & "}"
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private helpers
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc nextUnprocessed(ctx: BlockQueueCtxRef): Option[BlockNumber] =
|
|
## Pseudo getter
|
|
let rc = ctx.unprocessed.ge()
|
|
if rc.isOK:
|
|
result = some(rc.value.minPt)
|
|
|
|
proc nextStaged(ctx: BlockQueueCtxRef): Option[BlockRange] =
|
|
## Pseudo getter
|
|
let rc = ctx.staged.ge(low(BlockNumber))
|
|
if rc.isOK:
|
|
result = some(rc.value.data.blocks)
|
|
|
|
template safeTransport(
|
|
qd: BlockQueueWorkerRef;
|
|
info: static[string];
|
|
code: untyped) =
|
|
try:
|
|
code
|
|
except TransportError as e:
|
|
error info & ", stop", error=($e.name), msg=e.msg
|
|
qd.ctrl.stopped = true
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private functions
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc newWorkItem(qd: BlockQueueWorkerRef): Result[BlockItemRef,BlockQueueRC] =
|
|
## Fetch the next unprocessed block range and register it as work item.
|
|
##
|
|
## This function will grab a block range from the `unprocessed` range set,
|
|
## ove it and return it as a `BlockItemRef`. The returned range is registered
|
|
## in the `pending` list.
|
|
let rc = qd.global.unprocessed.ge()
|
|
if rc.isErr:
|
|
return err(NoMoreUnprocessed) # no more data for this peer
|
|
|
|
# Check whether there is somthing to do at all
|
|
if qd.bestNumber.isNone or
|
|
qd.bestNumber.unsafeGet < rc.value.minPt:
|
|
return err(NoMorePeerBlocks) # no more data for this peer
|
|
|
|
# Compute interval
|
|
let iv = BlockRange.new(
|
|
rc.value.minPt,
|
|
min(rc.value.maxPt,
|
|
min(rc.value.minPt + maxHeadersFetch - 1, qd.bestNumber.unsafeGet)))
|
|
|
|
discard qd.global.unprocessed.reduce(iv)
|
|
ok(BlockItemRef(blocks: iv))
|
|
|
|
|
|
proc stageItem(
|
|
qd: BlockQueueWorkerRef;
|
|
wi: BlockItemRef;
|
|
): Result[void,BlockQueueRC] =
|
|
## Add work item to the list of staged items
|
|
##
|
|
## Typically, the function returns `AllSmileOk` unless there is a queue
|
|
## oberflow (with return code`StagedQueueOverflow`) which needs to be handled
|
|
## in *pool mode* by running `blockQueueGrout()`.
|
|
var
|
|
error = AllSmileOk
|
|
let
|
|
peer = qd.peer
|
|
rc = qd.global.staged.insert(wi.blocks.minPt)
|
|
if rc.isOk:
|
|
rc.value.data = wi
|
|
|
|
# Return `true` if staged queue oberflows (unless backtracking.)
|
|
if stagedWorkItemsTrigger < qd.global.staged.len and
|
|
qd.global.backtrack.isNone and
|
|
wi.topHash.isNone:
|
|
debug "Staged queue too long", peer,
|
|
staged=qd.global.staged.len, max=stagedWorkItemsTrigger
|
|
error = StagedQueueOverflow
|
|
|
|
# The list size is limited. So cut if necessary and recycle back the block
|
|
# range of the discarded item (tough luck if the current work item is the
|
|
# one removed from top.)
|
|
while maxStagedWorkItems < qd.global.staged.len:
|
|
let topValue = qd.global.staged.le(highBlockNumber).value
|
|
discard qd.global.unprocessed.merge(topValue.data)
|
|
discard qd.global.staged.delete(topValue.key)
|
|
else:
|
|
# Ooops, duplicates should not exist (but anyway ...)
|
|
let wj = block:
|
|
let rc = qd.global.staged.eq(wi.blocks.minPt)
|
|
doAssert rc.isOk
|
|
# Store `wi` and return offending entry
|
|
let rcData = rc.value.data
|
|
rc.value.data = wi
|
|
rcData
|
|
|
|
# Update `staged` list and `unprocessed` ranges
|
|
block:
|
|
debug "Replacing dup item in staged list", peer,
|
|
range=($wi.blocks), discarded=($wj.blocks)
|
|
let rc = wi.blocks - wj.blocks
|
|
if rc.isOk:
|
|
discard qd.global.unprocessed.merge(rc.value)
|
|
|
|
if error != AllSmileOk:
|
|
return err(error)
|
|
ok()
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private functions, asynchroneous data network activity
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc fetchHeaders(
|
|
qd: BlockQueueWorkerRef;
|
|
wi: BlockItemRef;
|
|
): Future[bool]
|
|
{.async.} =
|
|
## Get the work item with the least interval and complete it. The function
|
|
## returns `true` if bodies were fetched and there were no inconsistencies.
|
|
if 0 < wi.hashes.len:
|
|
return true
|
|
|
|
let peer = qd.peer
|
|
var hdrReq: BlocksRequest
|
|
if wi.topHash.isNone:
|
|
hdrReq = BlocksRequest(
|
|
startBlock: HashOrNum(
|
|
isHash: false,
|
|
number: wi.blocks.minPt),
|
|
maxResults: wi.blocks.len.truncate(uint),
|
|
skip: 0,
|
|
reverse: false)
|
|
trace trEthSendSendingGetBlockHeaders, peer,
|
|
blocks=($wi.blocks)
|
|
|
|
else:
|
|
hdrReq = BlocksRequest(
|
|
startBlock: HashOrNum(
|
|
isHash: true,
|
|
hash: wi.topHash.get),
|
|
maxResults: maxHeadersFetch,
|
|
skip: 0,
|
|
reverse: true)
|
|
trace trEthSendSendingGetBlockHeaders & " reverse", peer,
|
|
topHash=hdrReq.startBlock.hash, reqLen=hdrReq.maxResults
|
|
|
|
# Fetch headers from peer
|
|
var hdrResp: Option[blockHeadersObj]
|
|
block:
|
|
let reqLen = hdrReq.maxResults
|
|
qd.safeTransport("Error fetching block headers"):
|
|
hdrResp = await peer.getBlockHeaders(hdrReq)
|
|
# Beware of peer terminating the session
|
|
if qd.ctrl.stopped:
|
|
return false
|
|
|
|
if hdrResp.isNone:
|
|
trace trEthRecvReceivedBlockHeaders, peer, reqLen, respose="n/a"
|
|
return false
|
|
|
|
let hdrRespLen = hdrResp.get.headers.len
|
|
trace trEthRecvReceivedBlockHeaders, peer, reqLen, hdrRespLen
|
|
|
|
if hdrRespLen == 0:
|
|
qd.ctrl.stopped = true
|
|
return false
|
|
|
|
# Update block range for reverse search
|
|
if wi.topHash.isSome:
|
|
# Headers are in reversed order
|
|
wi.headers = hdrResp.get.headers.reversed
|
|
wi.blocks = BlockRange.new(
|
|
wi.headers[0].blockNumber, wi.headers[^1].blockNumber)
|
|
discard qd.global.unprocessed.reduce(wi)
|
|
trace "Updated reverse header range", peer, range=($wi.blocks)
|
|
|
|
# Verify start block number
|
|
elif hdrResp.get.headers[0].blockNumber != wi.blocks.minPt:
|
|
trace "Header range starts with wrong block number", peer,
|
|
startBlock=hdrResp.get.headers[0].blockNumber,
|
|
requestedBlock=wi.blocks.minPt
|
|
qd.ctrl.zombie = true
|
|
return false
|
|
|
|
# Import into `wi.headers`
|
|
else:
|
|
wi.headers.shallowCopy(hdrResp.get.headers)
|
|
|
|
# Calculate block header hashes and verify it against parent links. If
|
|
# necessary, cut off some offending block headers tail.
|
|
wi.hashes.setLen(wi.headers.len)
|
|
wi.hashes[0] = wi.headers[0].hash
|
|
for n in 1 ..< wi.headers.len:
|
|
if wi.headers[n-1].blockNumber + 1 != wi.headers[n].blockNumber:
|
|
trace "Non-consecutive block numbers in header list response", peer
|
|
qd.ctrl.zombie = true
|
|
return false
|
|
if wi.hashes[n-1] != wi.headers[n].parentHash:
|
|
# Oops, cul-de-sac after block chain re-org?
|
|
trace "Dangling parent link in header list response. Re-org?", peer
|
|
wi.headers.setLen(n)
|
|
wi.hashes.setLen(n)
|
|
break
|
|
wi.hashes[n] = wi.headers[n].hash
|
|
|
|
# Adjust range length if necessary
|
|
if wi.headers[^1].blockNumber < wi.blocks.maxPt:
|
|
let redRng = BlockRange.new(
|
|
wi.headers[0].blockNumber, wi.headers[^1].blockNumber)
|
|
trace "Adjusting block range", peer, range=($wi.blocks), reduced=($redRng)
|
|
discard qd.global.unprocessed.merge(redRng.maxPt + 1, wi.blocks.maxPt)
|
|
wi.blocks = redRng
|
|
|
|
return true
|
|
|
|
|
|
proc fetchBodies(
|
|
qd: BlockQueueWorkerRef;
|
|
wi: BlockItemRef
|
|
): Future[bool]
|
|
{.async.} =
|
|
## Get the work item with the least interval and complete it. The function
|
|
## returns `true` if bodies were fetched and there were no inconsistencies.
|
|
let peer = qd.peer
|
|
|
|
# Complete group of bodies
|
|
qd.safeTransport("Error fetching block bodies"):
|
|
while wi.bodies.len < wi.hashes.len:
|
|
let
|
|
start = wi.bodies.len
|
|
reqLen = min(wi.hashes.len - wi.bodies.len, maxBodiesFetch)
|
|
top = start + reqLen
|
|
hashes = wi.hashes[start ..< top]
|
|
|
|
trace trEthSendSendingGetBlockBodies, peer, reqLen
|
|
|
|
# Append bodies from peer to `wi.bodies`
|
|
block:
|
|
let bdyResp = await peer.getBlockBodies(hashes)
|
|
# Beware of peer terminating the session
|
|
if qd.ctrl.stopped:
|
|
return false
|
|
|
|
if bdyResp.isNone:
|
|
trace trEthRecvReceivedBlockBodies, peer, reqLen, respose="n/a"
|
|
qd.ctrl.zombie = true
|
|
return false
|
|
|
|
let bdyRespLen = bdyResp.get.blocks.len
|
|
trace trEthRecvReceivedBlockBodies, peer, reqLen, bdyRespLen
|
|
|
|
if bdyRespLen == 0 or reqLen < bdyRespLen:
|
|
qd.ctrl.zombie = true
|
|
return false
|
|
|
|
wi.bodies.add bdyResp.get.blocks
|
|
|
|
return true
|
|
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions, constructor
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc init*(
|
|
T: type BlockQueueCtxRef; ## Global data descriptor type
|
|
firstBlockNumber = 0.toBlockNumber; ## Of first block to fetch from network
|
|
): T =
|
|
## Global constructor, shared data
|
|
result = T(
|
|
unprocessed: BlockRangeSetRef.init())
|
|
result.staged.init()
|
|
result.topAccepted = max(firstBlockNumber,1.toBlockNumber) - 1
|
|
discard result.unprocessed.merge(result.topAccepted + 1, highBlockNumber)
|
|
|
|
proc init*(
|
|
T: type BlockQueueWorkerRef; ## Global data descriptor type
|
|
ctx: BlockQueueCtxRef; ## Global data descriptor
|
|
ctrl: BuddyCtrlRef; ## Control and state settings
|
|
peer: Peer; ## For fetching data from network
|
|
): T =
|
|
## Buddy/local constructor
|
|
T(global: ctx,
|
|
peer: peer,
|
|
ctrl: ctrl)
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions -- synchronous
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc blockQueueFetchStaged*(
|
|
qd: BlockQueueWorkerRef;
|
|
): Result[BlockItemRef,BlockQueueRC]=
|
|
## Fetch the next item from the staged block queue. This item will be removed
|
|
## from the staged queue and must be recycled if it cannot be processed.
|
|
##
|
|
## On error, the function returns `EmptyQueue` if the queue was empty and
|
|
## `BlockNumberGap` if processing this item would result in a gap between the
|
|
## last accepted block number and the fitsr block number of the next queue
|
|
## item.
|
|
##
|
|
## This gap might appear if another function processes the in-beween block
|
|
## in paralell or if something went wrong, see `blockQueueGrout()`, below.
|
|
let rc = qd.global.staged.ge(low(BlockNumber))
|
|
if rc.isErr:
|
|
# No more items in the database
|
|
return err(EmptyQueue)
|
|
|
|
let
|
|
peer = qd.peer
|
|
wi = rc.value.data
|
|
topAccepted = qd.global.topAccepted
|
|
startNumber = wi.headers[0].blockNumber
|
|
|
|
# Check whether this record of blocks can be stored, at all
|
|
if topAccepted + 1 < startNumber:
|
|
trace "Staged work item postponed", peer, topAccepted,
|
|
range=($wi.blocks), staged=qd.global.staged.len
|
|
return err(BlockNumberGap)
|
|
|
|
# Ok, store into the block chain database
|
|
trace "Staged work item", peer,
|
|
topAccepted, range=($wi.blocks)
|
|
|
|
# Remove from staged DB
|
|
discard qd.global.staged.delete(wi.blocks.minPt)
|
|
|
|
ok(wi)
|
|
|
|
|
|
proc blockQueueAccept*(qd: BlockQueueWorkerRef; wi: BlockItemRef) =
|
|
## Mark this argument item `wi` to be the item with the topmost block number
|
|
## accepted. This statement comes tyipcally after the successful processing
|
|
## and storage of the work item fetched by `blockQueueFetchStaged()`.
|
|
qd.global.topAccepted = wi.blocks.maxPt
|
|
|
|
|
|
proc blockQueueGrout*(qd: BlockQueueWorkerRef) =
|
|
## Fill the gap unprocessed and staged block numbers. If there is such a gap
|
|
## (which should not at all), the `blockQueueFetchStaged()` will always fail
|
|
## with a `true` error code because there is no next work item.
|
|
##
|
|
## To close the gap and avoid double processing, all other workers should
|
|
## have finished their tasks while this function is run. A way to achive that
|
|
## is to run this function in *pool mode* once.
|
|
# Mind the gap, fill in if necessary
|
|
let covered = min(
|
|
qd.global.nextUnprocessed.get(otherwise = highBlockNumber),
|
|
qd.global.nextStaged.get(otherwise = highBlockRange).minPt)
|
|
if qd.global.topAccepted + 1 < covered:
|
|
discard qd.global.unprocessed.merge(qd.global.topAccepted + 1, covered - 1)
|
|
|
|
|
|
proc blockQueueRecycle*(qd: BlockQueueWorkerRef; wi: BlockItemRef) =
|
|
## Put back and destroy the `wi` argument item. The covered block range needs
|
|
## to be re-fetched from the network. This statement is typically used instead
|
|
## of `blockQueueAccept()` after a failure tpo process and store the work item
|
|
## fetched by `blockQueueFetchStaged()`.
|
|
discard qd.global.unprocessed.merge(wi.blocks)
|
|
|
|
|
|
proc blockQueueRecycleStaged*(qd: BlockQueueWorkerRef) =
|
|
## Similar to `blockQueueRecycle()`, recycle all items from the staged queue.
|
|
# using fast traversal
|
|
let
|
|
walk = BlockItemWalkRef.init(qd.global.staged)
|
|
var
|
|
rc = walk.first()
|
|
while rc.isOk:
|
|
# Store back into `unprocessed` ranges set
|
|
discard qd.global.unprocessed.merge(rc.value.data)
|
|
rc = walk.next()
|
|
# optional clean up, see comments on the destroy() directive
|
|
walk.destroy()
|
|
qd.global.staged.clear()
|
|
|
|
|
|
proc blockQueueBacktrackFrom*(qd: BlockQueueWorkerRef; wi: BlockItemRef) =
|
|
## Set backtrack mode starting with the blocks before the argument work
|
|
## item `wi`.
|
|
qd.global.backtrack = some(wi.headers[0].parentHash)
|
|
|
|
|
|
proc blockQueueBacktrackOk*(qd: BlockQueueWorkerRef): bool =
|
|
## Returns `true` if the queue is in backtrack mode.
|
|
qd.global.backtrack.isSome
|
|
|
|
|
|
proc blockQueueStats*(ctx: BlockQueueCtxRef; stats: var BlockQueueStats) =
|
|
## Collect global statistics
|
|
stats.topAccepted = ctx.topAccepted
|
|
stats.nextUnprocessed = ctx.nextUnprocessed
|
|
stats.nStagedQueue = ctx.staged.len
|
|
stats.reOrg = ctx.backtrack.isSome
|
|
stats.nextStaged =
|
|
if ctx.nextStaged.isSome: some(ctx.nextStaged.unsafeGet.minPt)
|
|
else: none(BlockNumber)
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions -- asynchronous
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc blockQueueBacktrackWorker*(
|
|
qd: BlockQueueWorkerRef;
|
|
): Future[Result[void,BlockQueueRC]]
|
|
{.async.} =
|
|
## This function does some backtrack processing on the queue. Backtracking
|
|
## is single threaded due to the fact that the next block is identified by
|
|
## the hash of the parent header. So this function needs to run in *single
|
|
## mode*.
|
|
##
|
|
## If backtracking is enabled, this function fetches the next parent work
|
|
## item from the network and makes it available on the staged queue to be
|
|
## retrieved with `blockQueueFetchStaged()`. In that case, the function
|
|
## succeeds and `blockQueueBacktrackOk()` will return `false`.
|
|
##
|
|
## In all other cases, the function returns an error code.
|
|
var error = BacktrackDisabled
|
|
if qd.global.backtrack.isSome:
|
|
let
|
|
peer = qd.peer
|
|
wi = BlockItemRef(
|
|
# This dummy interval can savely merged back without any effect
|
|
blocks: highBlockRange,
|
|
# Enable backtrack
|
|
topHash: some(qd.global.backtrack.unsafeGet))
|
|
|
|
# Fetch headers and bodies for the current work item
|
|
trace "Single mode worker, re-org backtracking", peer
|
|
if not await qd.fetchHeaders(wi):
|
|
error = FetchHeadersError
|
|
elif not await qd.fetchBodies(wi):
|
|
error = FetchBodiesError
|
|
else:
|
|
qd.global.backtrack = none(Hash256)
|
|
discard qd.stageItem(wi)
|
|
return ok()
|
|
|
|
# This work item failed, nothing to do anymore.
|
|
discard qd.global.unprocessed.merge(wi)
|
|
|
|
return err(error)
|
|
|
|
|
|
proc blockQueueWorker*(
|
|
qd: BlockQueueWorkerRef;
|
|
): Future[Result[void,BlockQueueRC]]
|
|
{.async.} =
|
|
## Normal worker function used to stage another work item be retrieved by
|
|
## `blockQueueFetchStaged()`. This function may run in *multi mode*. Not
|
|
## until retrieving work items the queue will be synchronised in a way that
|
|
## after the next item can be retrieved the queue will be blocked by a
|
|
## *gap* until the item is commited by `blockQueueAccept()`.
|
|
##
|
|
## On error, with most error codes there is not much that can be done. The
|
|
## one remarcable error code is `StagedQueueOverflow` which pops up if there
|
|
## is a gap between unprocessed and staged block numbers. One of the actions
|
|
## to be considered here is to run `blockQueueGrout()` in *pool mode*.
|
|
## Otherwise, the `StagedQueueOverflow` can be treated as a success would be.
|
|
##
|
|
# Fetch work item
|
|
let wi = block:
|
|
let rc = qd.newWorkItem()
|
|
if rc.isErr:
|
|
# No way, end of capacity for this peer => re-calibrate
|
|
qd.bestNumber = none(BlockNumber)
|
|
return err(rc.error)
|
|
rc.value
|
|
|
|
# Fetch headers and bodies for the current work item
|
|
var error = AllSmileOk
|
|
if not await qd.fetchHeaders(wi):
|
|
error = FetchHeadersError
|
|
elif not await qd.fetchBodies(wi):
|
|
error = FetchBodiesError
|
|
else:
|
|
return qd.stageItem(wi)
|
|
|
|
# This work item failed
|
|
discard qd.global.unprocessed.merge(wi)
|
|
return err(error)
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# End
|
|
# ------------------------------------------------------------------------------
|