nimbus-eth1/nimbus/sync/full/worker.nim

302 lines
9.8 KiB
Nim

# Nimbus
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
std/[options],
chronicles,
chronos,
eth/[common, p2p],
".."/[protocol, sync_desc],
../misc/[best_pivot, block_queue],
./ticker
{.push raises:[Defect].}
logScope:
topics = "full-sync"
type
BuddyData* = object
## Local descriptor data extension
pivot: BestPivotWorkerRef ## Local pivot worker descriptor
bQueue: BlockQueueWorkerRef ## Block queue worker
bestNumber: Option[BlockNumber] ## Largest block number reported
CtxData* = object
## Globally shared data extension
rng*: ref HmacDrbgContext ## Random generator, pre-initialised
pivot: BestPivotCtxRef ## Global pivot descriptor
bCtx: BlockQueueCtxRef ## Global block queue descriptor
ticker: TickerRef ## Logger ticker
FullBuddyRef* = ##\
## Extended worker peer descriptor
BuddyRef[CtxData,BuddyData]
FullCtxRef* = ##\
## Extended global descriptor
CtxRef[CtxData]
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc pp(n: BlockNumber): string =
## Dedicated pretty printer (`$` is defined elsewhere using `UInt256`)
if n == high(BlockNumber): "high" else:"#" & $n
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc topUsedNumber(
ctx: FullCtxRef;
backBlocks = maxHeadersFetch;
): Result[BlockNumber,void] =
var
top = 0.toBlockNumber
try:
let
bestNumber = ctx.chain.db.getCanonicalHead().blockNumber
nBackBlocks = backBlocks.toBlockNumber
# Initialise before best block number
if nBackBlocks < bestNumber:
top = bestNumber - nBackBlocks
except CatchableError as e:
error "Best block header problem", backBlocks, error=($e.name), msg=e.msg
return err()
ok(top)
proc tickerUpdater(ctx: FullCtxRef): TickerStatsUpdater =
result = proc: TickerStats =
var stats: BlockQueueStats
ctx.data.bCtx.blockQueueStats(stats)
TickerStats(
topPersistent: stats.topAccepted,
nextStaged: stats.nextStaged,
nextUnprocessed: stats.nextUnprocessed,
nStagedQueue: stats.nStagedQueue,
reOrg: stats.reOrg)
proc processStaged(buddy: FullBuddyRef): bool =
## Fetch a work item from the `staged` queue an process it to be
## stored on the persistent block chain.
let
ctx = buddy.ctx
peer = buddy.peer
chainDb = buddy.ctx.chain.db
chain = buddy.ctx.chain
bq = buddy.data.bQueue
# Get a work item, a list of headers + bodies
wi = block:
let rc = bq.blockQueueFetchStaged()
if rc.isErr:
return false
rc.value
startNumber = wi.headers[0].blockNumber
# Store in persistent database
try:
if chain.persistBlocks(wi.headers, wi.bodies) == ValidationResult.OK:
bq.blockQueueAccept(wi)
return true
except CatchableError as e:
error "Storing persistent blocks failed", peer, range=($wi.blocks),
error = $e.name, msg = e.msg
except Defect as e:
# Pass through
raise e
except Exception as e:
# Notorious case where the `Chain` reference applied to
# `persistBlocks()` has the compiler traced a possible `Exception`
# (i.e. `ctx.chain` could be uninitialised.)
error "Exception while storing persistent blocks", peer,
range=($wi.blocks), error=($e.name), msg=e.msg
raise (ref Defect)(msg: $e.name & ": " & e.msg)
# Something went wrong. Recycle work item (needs to be re-fetched, anyway)
let
parentHash = wi.headers[0].parentHash
try:
# Check whether hash of the first block is consistent
var parent: BlockHeader
if chainDb.getBlockHeader(parentHash, parent):
# First block parent is ok, so there might be other problems. Re-fetch
# the blocks from another peer.
trace "Storing persistent blocks failed", peer, range=($wi.blocks)
bq.blockQueueRecycle(wi)
buddy.ctrl.zombie = true
return false
except CatchableError as e:
error "Failed to access parent blocks", peer,
blockNumber=wi.headers[0].blockNumber.pp, error=($e.name), msg=e.msg
# Parent block header problem, so we might be in the middle of a re-org.
# Set single mode backtrack following the offending parent hash.
bq.blockQueueBacktrackFrom(wi)
buddy.ctrl.multiOk = false
if wi.topHash.isNone:
# Assuming that currently staged entries are on the wrong branch
bq.blockQueueRecycleStaged()
notice "Starting chain re-org backtrack work item", peer, range=($wi.blocks)
else:
# Leave that block range in the staged list
trace "Resuming chain re-org backtrack work item", peer, range=($wi.blocks)
discard
return false
# ------------------------------------------------------------------------------
# Public start/stop and admin functions
# ------------------------------------------------------------------------------
proc setup*(ctx: FullCtxRef; tickerOK: bool): bool =
## Global set up
ctx.data.pivot = BestPivotCtxRef.init(ctx.data.rng)
if tickerOK:
ctx.data.ticker = TickerRef.init(ctx.tickerUpdater)
else:
debug "Ticker is disabled"
let rc = ctx.topUsedNumber(backBlocks = 0)
if rc.isErr:
ctx.data.bCtx = BlockQueueCtxRef.init()
return false
ctx.data.bCtx = BlockQueueCtxRef.init(rc.value + 1)
true
proc release*(ctx: FullCtxRef) =
## Global clean up
ctx.data.pivot = nil
if not ctx.data.ticker.isNil:
ctx.data.ticker.stop()
proc start*(buddy: FullBuddyRef): bool =
## Initialise worker peer
let
ctx = buddy.ctx
peer = buddy.peer
if peer.supports(protocol.eth) and
peer.state(protocol.eth).initialized:
if not ctx.data.ticker.isNil:
ctx.data.ticker.startBuddy()
buddy.data.pivot =
BestPivotWorkerRef.init(ctx.data.pivot, buddy.ctrl, buddy.peer)
buddy.data.bQueue = BlockQueueWorkerRef.init(
ctx.data.bCtx, buddy.ctrl, peer)
return true
proc stop*(buddy: FullBuddyRef) =
## Clean up this peer
buddy.ctrl.stopped = true
buddy.data.pivot.clear()
if not buddy.ctx.data.ticker.isNil:
buddy.ctx.data.ticker.stopBuddy()
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc runSingle*(buddy: FullBuddyRef) {.async.} =
## This peer worker is invoked if the peer-local flag `buddy.ctrl.multiOk`
## is set `false` which is the default mode. This flag is updated by the
## worker when deemed appropriate.
## * For all workers, there can be only one `runSingle()` function active
## simultaneously for all worker peers.
## * There will be no `runMulti()` function active for the same worker peer
## simultaneously
## * There will be no `runPool()` iterator active simultaneously.
##
## Note that this function runs in `async` mode.
##
let
ctx = buddy.ctx
peer = buddy.peer
bq = buddy.data.bQueue
if bq.blockQueueBacktrackOk:
let rc = await bq.blockQueueBacktrackWorker()
if rc.isOk:
# Update persistent database (may reset `multiOk`)
buddy.ctrl.multiOk = true
while buddy.processStaged() and not buddy.ctrl.stopped:
# Allow thread switch as `persistBlocks()` might be slow
await sleepAsync(10.milliseconds)
return
buddy.ctrl.zombie = true
# Initialise/re-initialise this worker
elif await buddy.data.pivot.pivotNegotiate(buddy.data.bestNumber):
buddy.ctrl.multiOk = true
# Update/activate `bestNumber` for local use
buddy.data.bestNumber =
some(buddy.data.pivot.pivotHeader.value.blockNumber)
elif not buddy.ctrl.stopped:
await sleepAsync(2.seconds)
proc runPool*(buddy: FullBuddyRef; last: bool) =
## Ocne started, the function `runPool()` is called for all worker peers in
## a row (as the body of an iteration.) There will be no other worker peer
## functions activated simultaneously.
##
## This procedure is started if the global flag `buddy.ctx.poolMode` is set
## `true` (default is `false`.) It is the responsibility of the `runPool()`
## instance to reset the flag `buddy.ctx.poolMode`, typically at the first
## peer instance.
##
## The argument `last` is set `true` if the last entry is reached.
##
## Note that this function does not run in `async` mode.
##
let
ctx = buddy.ctx
bq = buddy.data.bQueue
if ctx.poolMode:
# Mind the gap, fill in if necessary
bq.blockQueueGrout()
ctx.poolMode = false
proc runMulti*(buddy: FullBuddyRef) {.async.} =
## This peer worker is invoked if the `buddy.ctrl.multiOk` flag is set
## `true` which is typically done after finishing `runSingle()`. This
## instance can be simultaneously active for all peer workers.
##
# Fetch work item
let
ctx = buddy.ctx
bq = buddy.data.bQueue
rc = await bq.blockQueueWorker()
if rc.isErr:
if rc.error == StagedQueueOverflow:
# Mind the gap: Turn on pool mode if there are too may staged items.
buddy.ctx.poolMode = true
else:
return
# Update persistent database
while buddy.processStaged() and not buddy.ctrl.stopped:
# Allow thread switch as `persistBlocks()` might be slow
await sleepAsync(10.milliseconds)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------