
689 lines
24 KiB
Raw Normal View History

# Nimbus
# Copyright (c) 2018-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# * Apache v2 license (license terms in the root directory or at
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Fetch and queue blocks
## ======================
## Worker items state diagram and sketch of sync algorithm:
## ::
Prep for full sync after snap (#1253) * Split fetch accounts into sub-modules details: There will be separated modules for accounts snapshot, storage snapshot, and healing for either. * Allow to rebase pivot before negotiated header why: Peers seem to have not too many snapshots available. By setting back the pivot block header slightly, the chances might be higher to find more peers to serve this pivot. Experiment on mainnet showed that setting back too much (tested with 1024), the chances to find matching snapshot peers seem to decrease. * Add accounts healing * Update variable/field naming in `worker_desc` for readability * Handle leaf nodes in accounts healing why: There is no need to fetch accounts when they had been added by the healing process. On the flip side, these accounts must be checked for storage data and the batch queue updated, accordingly. * Reorganising accounts hash ranges batch queue why: The aim is to formally cover as many accounts as possible for different pivot state root environments. Formerly, this was tried by starting the accounts batch queue at a random value for each pivot (and wrapping around.) Now, each pivot environment starts with an interval set mutually disjunct from any interval set retrieved with other pivot state roots. also: Stop fishing for more pivots in `worker` if 100% download is reached * Reorganise/update accounts healing why: Error handling was wrong and the (math. complexity of) whole process could be better managed. details: Much of the algorithm is now documented at the top of the file `heal_accounts.nim`
2022-10-08 17:20:50 +00:00
## unprocessed | | ready for | store into
## block ranges | peer workers | persistent database | database
## =======================================================================
Prep for full sync after snap (#1253) * Split fetch accounts into sub-modules details: There will be separated modules for accounts snapshot, storage snapshot, and healing for either. * Allow to rebase pivot before negotiated header why: Peers seem to have not too many snapshots available. By setting back the pivot block header slightly, the chances might be higher to find more peers to serve this pivot. Experiment on mainnet showed that setting back too much (tested with 1024), the chances to find matching snapshot peers seem to decrease. * Add accounts healing * Update variable/field naming in `worker_desc` for readability * Handle leaf nodes in accounts healing why: There is no need to fetch accounts when they had been added by the healing process. On the flip side, these accounts must be checked for storage data and the batch queue updated, accordingly. * Reorganising accounts hash ranges batch queue why: The aim is to formally cover as many accounts as possible for different pivot state root environments. Formerly, this was tried by starting the accounts batch queue at a random value for each pivot (and wrapping around.) Now, each pivot environment starts with an interval set mutually disjunct from any interval set retrieved with other pivot state roots. also: Stop fishing for more pivots in `worker` if 100% download is reached * Reorganise/update accounts healing why: Error handling was wrong and the (math. complexity of) whole process could be better managed. details: Much of the algorithm is now documented at the top of the file `heal_accounts.nim`
2022-10-08 17:20:50 +00:00
## +------------------------------------------+
## | |
## | +----------------------------+ |
## | | | |
## V v | |
## <unprocessed> ---+---> <worker-0> ---+-----> <staged> -------> OUTPUT
## | |
## +---> <worker-1> ---+
## | |
## +---> <worker-2> ---+
## : :
## A work item is created from a range of block numbers extracted from the
## `<unprocessed>` set of block ranges.
## A work item consists of a
## * current state `<worker-#>` or `<staged>`
## * given range of consecutive block numbers `[]`
## * sequence of block headers relating to `[]` (to be completed)
## * sequence of block buddies relating to `[]` (to be completed)
## Block ranges *may* be recycled back into the `<unprocessed>` set when a
## work item is destroyed. This is supposed to be an exceptional case.
## Typically, a `<staged>` work item is added to the persistent block chain
## database and destroyed without block range recycling.
## Beware of `<staged>` overflow
## -----------------------------
## When the `<staged>` queue gets too long in non-backtrack/re-org mode, this
## may be caused by a gap between the least `<unprocessed>` block number and
## the least `<staged>` block number. Then a mechanism is invoked where
## `<unprocessed>` block range is updated.
## For backtrack/re-org the system runs in single instance mode tracing
## backvards parent hash references. So updating `<unprocessed>` block numbers
## would have no effect. In that case, the record with the largest block
## numbers are deleted from the `<staged>` list.
{.push raises:[].}
std/[algorithm, options, sequtils, strutils],
stew/[byteutils, interval_set, sorted_set],
2022-12-02 04:39:12 +00:00
".."/[protocol, sync_desc, types]
topics = "block-queue"
maxStagedWorkItems = 70
## Maximal items in the `staged` list.
stagedWorkItemsTrigger = 50
## Turn on the global `poolMode` if there are more than this many items
## staged.
BlockQueueRC* = enum
## Return & error codes
BlockRangeSetRef = IntervalSetRef[BlockNumber,UInt256]
## Disjunct sets of block number intervals
BlockRange = Interval[BlockNumber,UInt256]
## Block number interval
BlockItemQueue = SortedSet[BlockNumber,BlockItemRef]
## Block intervals sorted by least block number
BlockItemWalkRef = SortedSetWalkRef[BlockNumber,BlockItemRef]
## Fast traversal descriptor for `BlockItemQueue`
BlockItemRef* = ref object
## Public block items, OUTPUT
blocks*: BlockRange ## Block numbers ranvge covered
topHash*: Option[Hash256] ## Fetched by top hash rather than block
headers*: seq[BlockHeader] ## Block headers received
hashes*: seq[Hash256] ## Hashed from `headers[]` for convenience
bodies*: seq[BlockBody] ## Block bodies received
BlockQueueCtxRef* = ref object
## Globally shared data among `block` instances
backtrack: Option[Hash256] ## Find reverse block after re-org
unprocessed: BlockRangeSetRef ## Block ranges to fetch
staged: BlockItemQueue ## Blocks fetched but not stored yet
topAccepted: BlockNumber ## Up to this block number processed OK
BlockQueueWorkerRef* = ref object
## Local descriptor data extension
global: BlockQueueCtxRef ## Common data
bestNumber: Option[BlockNumber] ## Largest block number reported
ctrl: BuddyCtrlRef ## Control and state settings
peer: Peer ## network peer
BlockQueueStats* = object
## Statistics
topAccepted*: BlockNumber
nextUnprocessed*: Option[BlockNumber]
nextStaged*: Option[BlockNumber]
nStagedQueue*: int
reOrg*: bool
extraTraceMessages = false or true
## Enabled additional logging noise
highBlockNumber = high(BlockNumber)
highBlockRange =,highBlockNumber)
doAssert stagedWorkItemsTrigger < maxStagedWorkItems
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc `+`(n: BlockNumber; delta: static[int]): BlockNumber =
## Syntactic sugar for expressions like `xxx.toBlockNumber + 1`
n + delta.toBlockNumber
proc `-`(n: BlockNumber; delta: static[int]): BlockNumber =
## Syntactic sugar for expressions like `xxx.toBlockNumber - 1`
n - delta.toBlockNumber
proc merge(ivSet: BlockRangeSetRef; wi: BlockItemRef): Uint256 =
## Syntactic sugar
proc reduce(ivSet: BlockRangeSetRef; wi: BlockItemRef): Uint256 =
## Syntactic sugar
# ---------------
proc `$`(iv: BlockRange): string =
## Needed for macro generated DSL files like `snap.nim` because the
## `distinct` flavour of `NodeTag` is discarded there.
result = "[" & iv.minPt.toStr
if iv.minPt != iv.maxPt:
result &= "," & iv.maxPt.toStr
result &= "]"
proc `$`(n: Option[BlockRange]): string =
if n.isNone: "n/a" else: $n.get
proc `$`(n: Option[BlockNumber]): string =
proc `$`(brs: BlockRangeSetRef): string =
"{" & toSeq(brs.increasing).mapIt($it).join(",") & "}"
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc nextUnprocessed(ctx: BlockQueueCtxRef): Option[BlockNumber] =
## Pseudo getter
let rc =
if rc.isOK:
result = some(rc.value.minPt)
proc nextStaged(ctx: BlockQueueCtxRef): Option[BlockRange] =
## Pseudo getter
let rc =
if rc.isOK:
result = some(
template safeTransport(
qd: BlockQueueWorkerRef;
info: static[string];
code: untyped) =
except TransportError as e:
error info & ", stop", error=($, msg=e.msg
qd.ctrl.stopped = true
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc newWorkItem(qd: BlockQueueWorkerRef): Result[BlockItemRef,BlockQueueRC] =
## Fetch the next unprocessed block range and register it as work item.
## This function will grab a block range from the `unprocessed` range set,
## ove it and return it as a `BlockItemRef`. The returned range is registered
## in the `pending` list.
let rc =
if rc.isErr:
return err(NoMoreUnprocessed) # no more data for this peer
# Check whether there is somthing to do at all
if qd.bestNumber.isNone or
qd.bestNumber.unsafeGet < rc.value.minPt:
when extraTraceMessages:
trace "no new work item", bestNumer=qd.bestNumber.toStr, range=rc.value
return err(NoMorePeerBlocks) # no more data for this peer
# Compute interval
let iv =
min(rc.value.minPt + maxHeadersFetch - 1, qd.bestNumber.unsafeGet)))
ok(BlockItemRef(blocks: iv))
proc stageItem(
qd: BlockQueueWorkerRef;
wi: BlockItemRef;
): Result[void,BlockQueueRC] =
## Add work item to the list of staged items
## Typically, the function returns `AllSmileOk` unless there is a queue
## oberflow (with return code`StagedQueueOverflow`) which needs to be handled
## in *pool mode* by running `blockQueueGrout()`.
error = AllSmileOk
peer = qd.peer
rc =
if rc.isOk: = wi
# Return `true` if staged queue oberflows (unless backtracking.)
if stagedWorkItemsTrigger < and and
debug "Staged queue too long", peer,, max=stagedWorkItemsTrigger
error = StagedQueueOverflow
# The list size is limited. So cut if necessary and recycle back the block
# range of the discarded item (tough luck if the current work item is the
# one removed from top.)
while maxStagedWorkItems <
let topValue =
# Ooops, duplicates should not exist (but anyway ...)
let wj = block:
let rc =
doAssert rc.isOk
# Store `wi` and return offending entry
let rcData = = wi
# Update `staged` list and `unprocessed` ranges
debug "Replacing dup item in staged list", peer,
range=($wi.blocks), discarded=($wj.blocks)
let rc = wi.blocks - wj.blocks
if rc.isOk:
if error != AllSmileOk:
return err(error)
# ------------------------------------------------------------------------------
# Private functions, asynchroneous data network activity
# ------------------------------------------------------------------------------
proc fetchHeaders(
qd: BlockQueueWorkerRef;
wi: BlockItemRef;
): Future[bool]
{.async.} =
## Get the work item with the least interval and complete it. The function
## returns `true` if bodies were fetched and there were no inconsistencies.
if 0 < wi.hashes.len:
return true
let peer = qd.peer
var hdrReq: BlocksRequest
if wi.topHash.isNone:
hdrReq = BlocksRequest(
startBlock: HashOrNum(
isHash: false,
number: wi.blocks.minPt),
maxResults: wi.blocks.len.truncate(uint),
skip: 0,
reverse: false)
trace trEthSendSendingGetBlockHeaders, peer,
hdrReq = BlocksRequest(
startBlock: HashOrNum(
isHash: true,
hash: wi.topHash.get),
maxResults: maxHeadersFetch,
skip: 0,
reverse: true)
trace trEthSendSendingGetBlockHeaders & " reverse", peer,
topHash=hdrReq.startBlock.hash, reqLen=hdrReq.maxResults
# Fetch headers from peer
var hdrResp: Option[blockHeadersObj]
let reqLen {.used.} = hdrReq.maxResults
qd.safeTransport("Error fetching block headers"):
hdrResp = await peer.getBlockHeaders(hdrReq)
# Beware of peer terminating the session
if qd.ctrl.stopped:
return false
if hdrResp.isNone:
trace trEthRecvReceivedBlockHeaders, peer, reqLen, respose="n/a"
return false
let hdrRespLen = hdrResp.get.headers.len
trace trEthRecvReceivedBlockHeaders, peer, reqLen, hdrRespLen
if hdrRespLen == 0:
qd.ctrl.stopped = true
return false
# Update block range for reverse search
if wi.topHash.isSome:
# Headers are in reversed order
wi.headers = hdrResp.get.headers.reversed
wi.blocks =
wi.headers[0].blockNumber, wi.headers[^1].blockNumber)
trace "Updated reverse header range", peer, range=($wi.blocks)
# Verify start block number
elif hdrResp.get.headers[0].blockNumber != wi.blocks.minPt:
trace "Header range starts with wrong block number", peer,
qd.ctrl.zombie = true
return false
# Import into `wi.headers`
wi.headers = system.move(hdrResp.get.headers)
# Calculate block header hashes and verify it against parent links. If
# necessary, cut off some offending block headers tail.
wi.hashes[0] = wi.headers[0].hash
for n in 1 ..< wi.headers.len:
if wi.headers[n-1].blockNumber + 1 != wi.headers[n].blockNumber:
trace "Non-consecutive block numbers in header list response", peer
qd.ctrl.zombie = true
return false
if wi.hashes[n-1] != wi.headers[n].parentHash:
# Oops, cul-de-sac after block chain re-org?
trace "Dangling parent link in header list response. Re-org?", peer
wi.hashes[n] = wi.headers[n].hash
# Adjust range length if necessary
if wi.headers[^1].blockNumber < wi.blocks.maxPt:
let redRng =
wi.headers[0].blockNumber, wi.headers[^1].blockNumber)
trace "Adjusting block range", peer, range=($wi.blocks), reduced=($redRng)
discard + 1, wi.blocks.maxPt)
wi.blocks = redRng
return true
proc fetchBodies(
qd: BlockQueueWorkerRef;
wi: BlockItemRef
): Future[bool]
{.async.} =
## Get the work item with the least interval and complete it. The function
## returns `true` if bodies were fetched and there were no inconsistencies.
let peer = qd.peer
# Complete group of bodies
qd.safeTransport("Error fetching block bodies"):
while wi.bodies.len < wi.hashes.len:
start = wi.bodies.len
reqLen = min(wi.hashes.len - wi.bodies.len, maxBodiesFetch)
top = start + reqLen
hashes = wi.hashes[start ..< top]
trace trEthSendSendingGetBlockBodies, peer, reqLen
# Append bodies from peer to `wi.bodies`
let bdyResp = await peer.getBlockBodies(hashes)
# Beware of peer terminating the session
if qd.ctrl.stopped:
return false
if bdyResp.isNone:
trace trEthRecvReceivedBlockBodies, peer, reqLen, respose="n/a"
qd.ctrl.zombie = true
return false
let bdyRespLen = bdyResp.get.blocks.len
trace trEthRecvReceivedBlockBodies, peer, reqLen, bdyRespLen
if bdyRespLen == 0 or reqLen < bdyRespLen:
qd.ctrl.zombie = true
return false
wi.bodies.add bdyResp.get.blocks
return true
# ------------------------------------------------------------------------------
# Public functions, constructor
# ------------------------------------------------------------------------------
proc init*(
T: type BlockQueueCtxRef; ## Global data descriptor type
firstBlockNumber = 0.toBlockNumber; ## Of first block to fetch from network
): T =
## Global constructor, shared data
result = T(
unprocessed: BlockRangeSetRef.init())
result.topAccepted = max(firstBlockNumber,1.toBlockNumber) - 1
discard result.unprocessed.merge(result.topAccepted + 1, highBlockNumber)
proc init*(
T: type BlockQueueWorkerRef; ## Global data descriptor type
ctx: BlockQueueCtxRef; ## Global data descriptor
ctrl: BuddyCtrlRef; ## Control and state settings
peer: Peer; ## For fetching data from network
): T =
## Buddy/local constructor
T(global: ctx,
peer: peer,
ctrl: ctrl)
# ------------------------------------------------------------------------------
# Public functions -- getter/setter
# ------------------------------------------------------------------------------
proc bestNumber*(qd: BlockQueueWorkerRef): Option[BlockNumber] =
## Getter
proc `bestNumber=`*(qd: BlockQueueWorkerRef; val: Option[BlockNumber]) =
## Setter, needs to be set to something valid so that `blockQueueWorker()`
## does something useful.
qd.bestNumber = val
proc topAccepted*(qd: BlockQueueWorkerRef): BlockNumber =
## Getter
# ------------------------------------------------------------------------------
# Public functions -- synchronous
# ------------------------------------------------------------------------------
proc blockQueueFetchStaged*(
qd: BlockQueueWorkerRef;
): Result[BlockItemRef,BlockQueueRC]=
## Fetch the next item from the staged block queue. This item will be removed
## from the staged queue and must be recycled if it cannot be processed.
## On error, the function returns `EmptyQueue` if the queue was empty and
## `BlockNumberGap` if processing this item would result in a gap between the
## last accepted block number and the fitsr block number of the next queue
## item.
## This gap might appear if another function processes the in-beween block
## in paralell or if something went wrong, see `blockQueueGrout()`, below.
let rc =
if rc.isErr:
# No more items in the database
return err(EmptyQueue)
peer {.used.} = qd.peer
wi =
topAccepted =
startNumber = wi.headers[0].blockNumber
# Check whether this record of blocks can be stored, at all
if topAccepted + 1 < startNumber:
trace "Staged work item postponed", peer, topAccepted,
return err(BlockNumberGap)
# Ok, store into the block chain database
trace "Staged work item", peer,
topAccepted, range=($wi.blocks)
# Remove from staged DB
proc blockQueueAccept*(qd: BlockQueueWorkerRef; wi: BlockItemRef) =
## Mark this argument item `wi` to be the item with the topmost block number
## accepted. This statement comes tyipcally after the successful processing
## and storage of the work item fetched by `blockQueueFetchStaged()`. = wi.blocks.maxPt
proc blockQueueGrout*(qd: BlockQueueWorkerRef) =
## Fill the gap unprocessed and staged block numbers. If there is such a gap
## (which should not at all), the `blockQueueFetchStaged()` will always fail
## with a `true` error code because there is no next work item.
## To close the gap and avoid double processing, all other workers should
## have finished their tasks while this function is run. A way to achive that
## is to run this function in *pool mode* once.
# Mind the gap, fill in if necessary
let covered = min( = highBlockNumber), = highBlockRange).minPt)
if + 1 < covered:
discard + 1, covered - 1)
proc blockQueueRecycle*(qd: BlockQueueWorkerRef; wi: BlockItemRef) =
## Put back and destroy the `wi` argument item. The covered block range needs
## to be re-fetched from the network. This statement is typically used instead
## of `blockQueueAccept()` after a failure tpo process and store the work item
## fetched by `blockQueueFetchStaged()`.
proc blockQueueRecycleStaged*(qd: BlockQueueWorkerRef) =
## Similar to `blockQueueRecycle()`, recycle all items from the staged queue.
# using fast traversal
walk = BlockItemWalkRef.init(
rc = walk.first()
while rc.isOk:
# Store back into `unprocessed` ranges set
rc =
# optional clean up, see comments on the destroy() directive
proc blockQueueBacktrackFrom*(qd: BlockQueueWorkerRef; wi: BlockItemRef) =
## Set backtrack mode starting with the blocks before the argument work
## item `wi`. = some(wi.headers[0].parentHash)
proc blockQueueBacktrackOk*(qd: BlockQueueWorkerRef): bool =
## Returns `true` if the queue is in backtrack mode.
proc blockQueueStats*(ctx: BlockQueueCtxRef; stats: var BlockQueueStats) =
## Collect global statistics
stats.topAccepted = ctx.topAccepted
stats.nextUnprocessed = ctx.nextUnprocessed
stats.nStagedQueue = ctx.staged.len
stats.reOrg = ctx.backtrack.isSome
stats.nextStaged =
if ctx.nextStaged.isSome: some(ctx.nextStaged.unsafeGet.minPt)
else: none(BlockNumber)
# ------------------------------------------------------------------------------
# Public functions -- asynchronous
# ------------------------------------------------------------------------------
proc blockQueueBacktrackWorker*(
qd: BlockQueueWorkerRef;
): Future[Result[void,BlockQueueRC]]
{.async.} =
## This function does some backtrack processing on the queue. Backtracking
## is single threaded due to the fact that the next block is identified by
## the hash of the parent header. So this function needs to run in *single
## mode*.
## If backtracking is enabled, this function fetches the next parent work
## item from the network and makes it available on the staged queue to be
## retrieved with `blockQueueFetchStaged()`. In that case, the function
## succeeds and `blockQueueBacktrackOk()` will return `false`.
## In all other cases, the function returns an error code.
var error = BacktrackDisabled
peer {.used.} = qd.peer
wi = BlockItemRef(
# This dummy interval can savely merged back without any effect
blocks: highBlockRange,
# Enable backtrack
topHash: some(
# Fetch headers and bodies for the current work item
trace "Single mode worker, re-org backtracking", peer
if not await qd.fetchHeaders(wi):
error = FetchHeadersError
elif not await qd.fetchBodies(wi):
error = FetchBodiesError
else: = none(Hash256)
discard qd.stageItem(wi)
return ok()
# This work item failed, nothing to do anymore.
return err(error)
proc blockQueueWorker*(
qd: BlockQueueWorkerRef;
): Future[Result[void,BlockQueueRC]]
{.async.} =
## Normal worker function used to stage another work item be retrieved by
## `blockQueueFetchStaged()`. This function may run in *multi mode*. Not
## until retrieving work items the queue will be synchronised in a way that
## after the next item can be retrieved the queue will be blocked by a
## *gap* until the item is commited by `blockQueueAccept()`.
## On error, with most error codes there is not much that can be done. The
## one remarcable error code is `StagedQueueOverflow` which pops up if there
## is a gap between unprocessed and staged block numbers. One of the actions
## to be considered here is to run `blockQueueGrout()` in *pool mode*.
## Otherwise, the `StagedQueueOverflow` can be treated as a success would be.
# Fetch work item
let wi = block:
let rc = qd.newWorkItem()
if rc.isErr:
# No way, end of capacity for this peer => re-calibrate
qd.bestNumber = none(BlockNumber)
return err(rc.error)
# Fetch headers and bodies for the current work item
var error = AllSmileOk
if not await qd.fetchHeaders(wi):
error = FetchHeadersError
elif not await qd.fetchBodies(wi):
error = FetchBodiesError
return qd.stageItem(wi)
# This work item failed
return err(error)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------