Beacon sync activation control update (#2782)

* Clarifying/commenting FCU setup condition & small fixes, comments etc.

* Update some logging

* Reorg metrics updater and activation

* Better `async` responsiveness

why:
  Block import does not allow `async` task activation while
  executing. So allow potential switch after each imported
  block (rather than a group of 32 blocks.)

* Handle resuming after previous sync followed by import

why:
  In this case the ledger state is more recent than the saved
  sync state. So this is considered a pristine sync where any
  previous sync state is forgotten.

  This fixes some assert thrown because of inconsistent internal
  state at some point.

* Provide option for clearing saved beacon sync state before starting syncer

why:
  It would resume with the last state otherwise which might be undesired
  sometimes.

  Without RPC available, the syncer typically stops and terminates with
  the canonical head larger than the base/finalised head. The latter one
  will be saved as database/ledger state and the canonical head as syncer
  target. Resuming syncing here will repeat itself.

  So clearing the syncer state can prevent from starting the syncer
  unnecessarily avoiding useless actions.

* Allow workers to request syncer shutdown from within

why:
  In one-trick-pony mode (after resuming without RPC support) the
  syncer can be stopped from within soavoiding unnecessary polling.
  In that case, the syncer can (theoretically) be restarted externally
  with `startSync()`.

* Terminate beacon sync after a single run target is reached

why:
  Stops doing useless polling (typically when there is no RPC available)

* Remove crufty comments

* Tighten state reload condition when resuming

why:
  Some pathological case might apply if the syncer is stopped while the
  distance between finalised block and head is very large and the FCU
  base becomes larger than the locked finalised state.

* Verify that finalised number from CL is at least FCU base number

why:
  The FCU base number is determined by the database, non zero if
  manually imported. The finalised number is passed via RPC by the CL
  node and will increase over time. Unless fully synced, this number
  will be pretty low.

  On the other hand, the FCU call `forkChoice()` will eventually fail
  if the `finalizedHash` argument refers to something outside the
  internal chain starting at the FCU base block.

* Remove support for completing interrupted sync without RPC support

why:
  Simplifies start/stop logic

* Rmove unused import
This commit is contained in:
Jordan Hrycaj 2024-10-28 16:22:04 +00:00 committed by GitHub
parent ba1cbed14f
commit ea268e81ff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 382 additions and 241 deletions

View File

@ -224,11 +224,10 @@ proc run(nimbus: NimbusNode, conf: NimbusConf) =
setupP2P(nimbus, conf, com, protocols)
setupRpc(nimbus, conf, com, protocols)
if conf.maxPeers > 0:
if conf.maxPeers > 0 and conf.engineApiServerEnabled():
# Not starting syncer if there is definitely no way to run it. This
# avoids polling (i.e. waiting for instructions) and some logging.
let resumeOnly = not conf.engineApiServerEnabled()
if not nimbus.beaconSyncRef.start(resumeOnly):
if not nimbus.beaconSyncRef.start():
nimbus.beaconSyncRef = BeaconSyncRef(nil)
if nimbus.state == NimbusState.Starting:

View File

@ -18,7 +18,7 @@ import
"."/[sync_desc, sync_sched, protocol]
logScope:
topics = "beacon"
topics = "beacon sync"
type
BeaconSyncRef* = RunnerSyncRef[BeaconCtxData,BeaconBuddyData]
@ -28,25 +28,25 @@ type
# ------------------------------------------------------------------------------
proc runSetup(ctx: BeaconCtxRef): bool =
worker.setup(ctx)
worker.setup(ctx, "RunSetup")
proc runRelease(ctx: BeaconCtxRef) =
worker.release(ctx)
worker.release(ctx, "RunRelease")
proc runDaemon(ctx: BeaconCtxRef) {.async.} =
await worker.runDaemon(ctx)
await worker.runDaemon(ctx, "RunDaemon")
proc runStart(buddy: BeaconBuddyRef): bool =
worker.start(buddy)
worker.start(buddy, "RunStart")
proc runStop(buddy: BeaconBuddyRef) =
worker.stop(buddy)
worker.stop(buddy, "RunStop")
proc runPool(buddy: BeaconBuddyRef; last: bool; laps: int): bool =
worker.runPool(buddy, last, laps)
worker.runPool(buddy, last, laps, "RunPool")
proc runPeer(buddy: BeaconBuddyRef) {.async.} =
await worker.runPeer(buddy)
await worker.runPeer(buddy, "RunPeer")
# ------------------------------------------------------------------------------
# Public functions
@ -57,7 +57,7 @@ proc init*(
ethNode: EthereumNode;
chain: ForkedChainRef;
maxPeers: int;
chunkSize: int;
chunkSize = 0;
): T =
var desc = T()
desc.initSync(ethNode, maxPeers)
@ -65,13 +65,7 @@ proc init*(
desc.ctx.pool.chain = chain
desc
proc start*(desc: BeaconSyncRef; resumeOnly = false): bool =
## Start beacon sync. If `resumeOnly` is set `true` the syncer will only
## start up if it can resume work, e.g. after being previously interrupted.
if resumeOnly:
desc.ctx.dbLoadSyncStateLayout()
if not desc.ctx.layout.headLocked:
return false
proc start*(desc: BeaconSyncRef): bool =
desc.startSync()
proc stop*(desc: BeaconSyncRef) =

View File

@ -15,13 +15,9 @@ import
pkg/eth/[common, p2p],
pkg/stew/[interval_set, sorted_set],
../../common,
./worker/[blocks_staged, db, headers_staged, headers_unproc,
start_stop, update],
./worker/[blocks_staged, headers_staged, headers_unproc, start_stop, update],
./worker_desc
logScope:
topics = "beacon"
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
@ -36,15 +32,11 @@ proc bodiesToFetchOk(buddy: BeaconBuddyRef): bool =
buddy.ctrl.running and
not buddy.ctx.poolMode
proc napUnlessSomethingToFetch(
buddy: BeaconBuddyRef;
info: static[string];
): Future[bool] {.async.} =
proc napUnlessSomethingToFetch(buddy: BeaconBuddyRef): Future[bool] {.async.} =
## When idle, save cpu cycles waiting for something to do.
if buddy.ctx.pool.importRunningOk or
not (buddy.headersToFetchOk() or
buddy.bodiesToFetchOk()):
debug info & ": idly wasting time", peer=buddy.peer
await sleepAsync workerIdleWaitInterval
return true
return false
@ -53,13 +45,12 @@ proc napUnlessSomethingToFetch(
# Public start/stop and admin functions
# ------------------------------------------------------------------------------
proc setup*(ctx: BeaconCtxRef): bool =
proc setup*(ctx: BeaconCtxRef; info: static[string]): bool =
## Global set up
debug "RUNSETUP"
ctx.setupRpcMagic()
# Load initial state from database if there is any
ctx.setupDatabase()
ctx.setupDatabase info
# Debugging stuff, might be an empty template
ctx.setupTicker()
@ -68,31 +59,31 @@ proc setup*(ctx: BeaconCtxRef): bool =
ctx.daemon = true
true
proc release*(ctx: BeaconCtxRef) =
proc release*(ctx: BeaconCtxRef; info: static[string]) =
## Global clean up
debug "RUNRELEASE"
ctx.destroyRpcMagic()
ctx.destroyTicker()
proc start*(buddy: BeaconBuddyRef): bool =
proc start*(buddy: BeaconBuddyRef; info: static[string]): bool =
## Initialise worker peer
const info = "RUNSTART"
let peer = buddy.peer
if runsThisManyPeersOnly <= buddy.ctx.pool.nBuddies:
debug info & " peer limit reached", peer=buddy.peer
debug info & ": peers limit reached", peer
return false
if not buddy.startBuddy():
debug info & " failed", peer=buddy.peer
debug info & ": failed", peer
return false
debug info, peer=buddy.peer
debug info & ": new peer", peer
true
proc stop*(buddy: BeaconBuddyRef) =
proc stop*(buddy: BeaconBuddyRef; info: static[string]) =
## Clean up this peer
debug "RUNSTOP", peer=buddy.peer, nInvocations=buddy.only.nMultiLoop,
debug info & ": release peer", peer=buddy.peer,
nInvocations=buddy.only.nMultiLoop,
lastIdleGap=buddy.only.multiRunIdle.toStr
buddy.stopBuddy()
@ -100,16 +91,13 @@ proc stop*(buddy: BeaconBuddyRef) =
# Public functions
# ------------------------------------------------------------------------------
proc runDaemon*(ctx: BeaconCtxRef) {.async.} =
proc runDaemon*(ctx: BeaconCtxRef; info: static[string]) {.async.} =
## Global background job that will be re-started as long as the variable
## `ctx.daemon` is set `true`. If that job was stopped due to re-setting
## `ctx.daemon` to `false`, it will be restarted next after it was reset
## as `true` not before there is some activity on the `runPool()`,
## `runSingle()`, or `runMulti()` functions.
##
const info = "RUNDAEMON"
debug info
# Check for a possible header layout and body request changes
ctx.updateSyncStateLayout info
ctx.updateBlockRequests info
@ -127,15 +115,20 @@ proc runDaemon*(ctx: BeaconCtxRef) {.async.} =
# Import from staged queue.
while await ctx.blocksStagedImport(info):
ctx.updateMetrics()
if not ctx.daemon:
# Implied by external sync shutdown?
return
# At the end of the cycle, leave time to trigger refill headers/blocks
await sleepAsync daemonWaitInterval
ctx.updateMetrics()
proc runPool*(buddy: BeaconBuddyRef; last: bool; laps: int): bool =
proc runPool*(
buddy: BeaconBuddyRef;
last: bool;
laps: int;
info: static[string];
): bool =
## Once started, the function `runPool()` is called for all worker peers in
## sequence as long as this function returns `false`. There will be no other
## `runPeer()` functions activated while `runPool()` is active.
@ -150,31 +143,25 @@ proc runPool*(buddy: BeaconBuddyRef; last: bool; laps: int): bool =
##
## Note that this function does not run in `async` mode.
##
const info = "RUNPOOL"
#debug info, peer=buddy.peer, laps
buddy.ctx.headersStagedReorg info # reorg
buddy.ctx.headersStagedReorg info
true # stop
proc runPeer*(buddy: BeaconBuddyRef) {.async.} =
proc runPeer*(buddy: BeaconBuddyRef; info: static[string]) {.async.} =
## This peer worker method is repeatedly invoked (exactly one per peer) while
## the `buddy.ctrl.poolMode` flag is set `false`.
##
const info = "RUNPEER"
let peer = buddy.peer
if 0 < buddy.only.nMultiLoop: # statistics/debugging
buddy.only.multiRunIdle = Moment.now() - buddy.only.stoppedMultiRun
buddy.only.nMultiLoop.inc # statistics/debugging
trace info, peer, nInvocations=buddy.only.nMultiLoop,
lastIdleGap=buddy.only.multiRunIdle.toStr
# Update consensus header target when needed. It comes with a finalised
# header hash where we need to complete the block number.
await buddy.headerStagedUpdateTarget info
if not await buddy.napUnlessSomethingToFetch info:
if not await buddy.napUnlessSomethingToFetch():
#
# Layout of a triple of linked header chains (see `README.md`)
# ::

View File

@ -17,11 +17,9 @@ import
../../../core/chain,
../worker_desc,
./blocks_staged/bodies,
./update/metrics,
"."/[blocks_unproc, db]
logScope:
topics = "beacon blocks"
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
@ -116,6 +114,7 @@ func blocksStagedCanImportOk*(ctx: BeaconCtxRef): bool =
false
func blocksStagedFetchOk*(ctx: BeaconCtxRef): bool =
## Check whether body records can be fetched and stored on the `staged` queue.
##
@ -182,7 +181,7 @@ proc blocksStagedCollect*(
if 0 < buddy.only.nBdyRespErrors and buddy.ctrl.stopped:
# Make sure that this peer does not immediately reconnect
buddy.ctrl.zombie = true
trace info & ": completely failed", peer, iv, ivReq,
trace info & ": list completely failed", peer, iv, ivReq,
ctrl=buddy.ctrl.state, nRespErrors=buddy.only.nBdyRespErrors
ctx.blocksUnprocCommit(iv.len, iv)
# At this stage allow a task switch so that some other peer might try
@ -192,7 +191,7 @@ proc blocksStagedCollect*(
# So there were some bodies downloaded already. Turn back unused data
# and proceed with staging.
trace info & ": partially failed", peer, iv, ivReq,
trace info & ": list partially failed", peer, iv, ivReq,
unused=BnRange.new(ivBottom,iv.maxPt)
# There is some left over to store back
ctx.blocksUnprocCommit(iv.len, ivBottom, iv.maxPt)
@ -240,6 +239,7 @@ proc blocksStagedImport*(
if qItem.key != imported + 1:
trace info & ": there is a gap L vs. staged",
B=ctx.chain.baseNumber.bnStr, L=imported.bnStr, staged=qItem.key.bnStr
doAssert imported < qItem.key
return false
# Remove from queue
@ -249,35 +249,49 @@ proc blocksStagedImport*(
nBlocks = qItem.data.blocks.len
iv = BnRange.new(qItem.key, qItem.key + nBlocks.uint64 - 1)
trace info & ": import blocks ..", iv, nBlocks,
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr
var maxImport = iv.maxPt
for n in 0 ..< nBlocks:
let nBn = qItem.data.blocks[n].header.number
ctx.pool.chain.importBlock(qItem.data.blocks[n]).isOkOr:
warn info & ": import block error", iv,
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
nBn=nBn.bnStr, txLevel=ctx.chain.db.level, `error`=error
warn info & ": import block error", iv, B=ctx.chain.baseNumber.bnStr,
L=ctx.chain.latestNumber.bnStr, nBn=nBn.bnStr, `error`=error
# Restore what is left over below
maxImport = ctx.chain.latestNumber()
break
# Allow pseudo/async thread switch.
await sleepAsync asyncThreadSwitchTimeSlot
if not ctx.daemon:
# Shutdown?
maxImport = ctx.chain.latestNumber()
break
# Update, so it can be followed nicely
ctx.updateMetrics()
# Occasionally mark the chain finalized
if (n + 1) mod finaliserChainLengthMax == 0 or (n + 1) == nBlocks:
let
nHash = qItem.data.getNthHash(n)
finHash = if nBn < ctx.layout.final: nHash else: ctx.layout.finalHash
nthHash = qItem.data.getNthHash(n)
finHash = if nBn < ctx.layout.final: nthHash else: ctx.layout.finalHash
doAssert nBn == ctx.chain.latestNumber()
ctx.pool.chain.forkChoice(headHash=nHash, finalizedHash=finHash).isOkOr:
warn info & ": fork choice error", iv,
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
F=ctx.layout.final.bnStr, txLevel=ctx.chain.db.level, nHash,
finHash=(if finHash == nHash: "nHash" else: "F"), `error`=error
ctx.pool.chain.forkChoice(headHash=nthHash, finalizedHash=finHash).isOkOr:
warn info & ": fork choice error", n, iv, B=ctx.chain.baseNumber.bnStr,
L=ctx.chain.latestNumber.bnStr, F=ctx.layout.final.bnStr, nthHash,
finHash=(if finHash == nthHash: "nHash" else: "F"), `error`=error
# Restore what is left over below
maxImport = ctx.chain.latestNumber()
break
# Allow pseudo/async thread switch.
await sleepAsync asyncThreadSwitchTimeSlot
if not ctx.daemon:
maxImport = ctx.chain.latestNumber()
break
# Import probably incomplete, so a partial roll back may be needed
if maxImport < iv.maxPt:
@ -287,32 +301,13 @@ proc blocksStagedImport*(
for bn in iv.minPt .. maxImport:
ctx.dbUnstashHeader bn
trace info & ": import done", iv,
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
F=ctx.layout.final.bnStr, txLevel=ctx.chain.db.level
# Update, so it can be followed nicely
ctx.updateMetrics()
trace info & ": import done", iv, nBlocks, B=ctx.chain.baseNumber.bnStr,
L=ctx.chain.latestNumber.bnStr, F=ctx.layout.final.bnStr
return true
func blocksStagedBottomKey*(ctx: BeaconCtxRef): BlockNumber =
## Retrieve to staged block number
let qItem = ctx.blk.staged.ge(0).valueOr:
return high(BlockNumber)
qItem.key
func blocksStagedQueueLen*(ctx: BeaconCtxRef): int =
## Number of staged records
ctx.blk.staged.len
func blocksStagedQueueIsEmpty*(ctx: BeaconCtxRef): bool =
## `true` iff no data are on the queue.
ctx.blk.staged.len == 0
# ----------------
func blocksStagedInit*(ctx: BeaconCtxRef) =
## Constructor
ctx.blk.staged = StagedBlocksQueue.init()
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -18,9 +18,6 @@ import
../../../protocol,
../../worker_desc
logScope:
topics = "beacon bodies"
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,38 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises:[].}
import
pkg/eth/common,
pkg/stew/[interval_set, sorted_set],
../../worker_desc
func blocksStagedQueueBottomKey*(ctx: BeaconCtxRef): BlockNumber =
## Retrieve to staged block number
let qItem = ctx.blk.staged.ge(0).valueOr:
return high(BlockNumber)
qItem.key
func blocksStagedQueueLen*(ctx: BeaconCtxRef): int =
## Number of staged records
ctx.blk.staged.len
func blocksStagedQueueIsEmpty*(ctx: BeaconCtxRef): bool =
## `true` iff no data are on the queue.
ctx.blk.staged.len == 0
# ----------------
func blocksStagedQueueInit*(ctx: BeaconCtxRef) =
## Constructor
ctx.blk.staged = StagedBlocksQueue.init()
# End

View File

@ -19,9 +19,6 @@ import
../worker_desc,
"."/[blocks_unproc, headers_unproc]
logScope:
topics = "beacon db"
const
LhcStateKey = 1.beaconStateKey
@ -49,9 +46,8 @@ proc fetchSyncStateLayout(ctx: BeaconCtxRef): Opt[SyncStateLayout] =
# Public functions
# ------------------------------------------------------------------------------
proc dbStoreSyncStateLayout*(ctx: BeaconCtxRef) =
proc dbStoreSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) =
## Save chain layout to persistent db
const info = "dbStoreSyncStateLayout"
if ctx.layout == ctx.sst.lastLayout:
return
@ -65,24 +61,25 @@ proc dbStoreSyncStateLayout*(ctx: BeaconCtxRef) =
if txLevel == 0:
let number = ctx.db.getSavedStateBlockNumber()
ctx.db.persistent(number).isOkOr:
debug info & ": failed to save persistently", error=($$error)
debug info & ": failed to save sync state persistently", error=($$error)
return
else:
trace info & ": not saved, tx pending", txLevel
trace info & ": sync state not saved, tx pending", txLevel
return
trace info & ": saved pesistently on DB"
trace info & ": saved sync state persistently"
proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef) =
proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) =
## Restore chain layout from persistent db
const info = "dbLoadLinkedHChainsLayout"
let
rc = ctx.fetchSyncStateLayout()
latest = ctx.chain.latestNumber()
if rc.isOk:
# See `dbLoadSyncStateAvailable()` for comments
if rc.isOk and
ctx.chain.baseNumber() <= rc.value.final and
latest < rc.value.head:
ctx.sst.layout = rc.value
# Add interval of unprocessed block range `(L,C]` from `README.md`
@ -92,7 +89,7 @@ proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef) =
# Add interval of unprocessed header range `(C,D)` from `README.md`
ctx.headersUnprocSet(ctx.layout.coupler+1, ctx.layout.dangling-1)
trace info & ": restored layout", L=latest.bnStr,
trace info & ": restored sync state", L=latest.bnStr,
C=ctx.layout.coupler.bnStr, D=ctx.layout.dangling.bnStr,
F=ctx.layout.final.bnStr, H=ctx.layout.head.bnStr
@ -106,12 +103,19 @@ proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef) =
couplerHash: latestHash,
dangling: latest,
danglingParent: latestParent,
# There is no need to record a separate finalised head `F` as its only
# use is to serve as second argument in `forkChoice()` when committing
# a batch of imported blocks. Currently, there are no blocks to fetch
# and import. The system must wait for instructions and update the fields
# `final` and `head` while the latter will be increased so that import
# can start.
final: latest,
finalHash: latestHash,
head: latest,
headHash: latestHash)
headHash: latestHash,
headLocked: false)
trace info & ": new layout", L="C", C="D", D="F", F="H", H=latest.bnStr
trace info & ": new sync state", L="C", C="D", D="F", F="H", H=latest.bnStr
ctx.sst.lastLayout = ctx.layout
@ -121,6 +125,7 @@ proc dbStashHeaders*(
ctx: BeaconCtxRef;
first: BlockNumber;
revBlobs: openArray[seq[byte]];
info: static[string];
) =
## Temporarily store header chain to persistent db (oblivious of the chain
## layout.) The headers should not be stashed if they are imepreted and
@ -133,7 +138,6 @@ proc dbStashHeaders*(
## #(first+1) -- revBlobs[^2]
## ..
##
const info = "dbStashHeaders"
let
kvt = ctx.db.ctx.getKvt()
last = first + revBlobs.len.uint64 - 1

View File

@ -17,12 +17,10 @@ import
pkg/stew/[interval_set, sorted_set],
../../../common,
../worker_desc,
./update/metrics,
./headers_staged/[headers, linked_hchain],
./headers_unproc
logScope:
topics = "beacon headers"
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
@ -59,7 +57,9 @@ proc headerStagedUpdateTarget*(
info: static[string];
) {.async.} =
## Fetch finalised beacon header if there is an update available
let ctx = buddy.ctx
let
ctx = buddy.ctx
peer = buddy.peer
if not ctx.layout.headLocked and
ctx.target.final == 0 and
ctx.target.finalHash != zeroHash32 and
@ -75,10 +75,18 @@ proc headerStagedUpdateTarget*(
if hash != ctx.target.finalHash:
# Oops
buddy.ctrl.zombie = true
trace info & ": finalised header hash mismatch", peer=buddy.peer, hash,
trace info & ": finalised header hash mismatch", peer, hash,
expected=ctx.target.finalHash
else:
ctx.target.final = rc.value[0].number
let final = rc.value[0].number
if final < ctx.chain.baseNumber():
trace info & ": finalised number too low", peer,
B=ctx.chain.baseNumber.bnStr, finalised=rc.value[0].number.bnStr
else:
ctx.target.final = final
# Update, so it can be followed nicely
ctx.updateMetrics()
proc headersStagedCollect*(
@ -221,6 +229,9 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int =
# anymore.
discard ctx.hdr.staged.delete(iv.maxPt)
# Update, so it can be followed nicely
ctx.updateMetrics()
if qItem.data.hash != ctx.layout.danglingParent:
# Discard wrong chain and merge back the range into the `unproc` list.
ctx.headersUnprocCommit(0,iv)
@ -229,21 +240,24 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int =
break
# Store headers on database
ctx.dbStashHeaders(iv.minPt, qItem.data.revHdrs)
ctx.dbStashHeaders(iv.minPt, qItem.data.revHdrs, info)
ctx.layout.dangling = iv.minPt
ctx.layout.danglingParent = qItem.data.parentHash
ctx.dbStoreSyncStateLayout()
ctx.dbStoreSyncStateLayout info
result.inc # count records
trace info & ": staged records saved",
trace info & ": staged header lists saved",
nStaged=ctx.hdr.staged.len, nSaved=result
if headersStagedQueueLengthLwm < ctx.hdr.staged.len:
ctx.poolMode = true
# Update, so it can be followed nicely
ctx.updateMetrics()
func headersStagedReorg*(ctx: BeaconCtxRef; info: static[string]) =
proc headersStagedReorg*(ctx: BeaconCtxRef; info: static[string]) =
## Some pool mode intervention. The effect is that all concurrent peers
## finish up their current work and run this function here (which might
## do nothing.) This stopping should be enough in most cases to re-organise
@ -277,26 +291,8 @@ func headersStagedReorg*(ctx: BeaconCtxRef; info: static[string]) =
ctx.headersUnprocCommit(0, key - nHeaders + 1, key)
discard ctx.hdr.staged.delete key
func headersStagedTopKey*(ctx: BeaconCtxRef): BlockNumber =
## Retrieve to staged block number
let qItem = ctx.hdr.staged.le(high BlockNumber).valueOr:
return BlockNumber(0)
qItem.key
func headersStagedQueueLen*(ctx: BeaconCtxRef): int =
## Number of staged records
ctx.hdr.staged.len
func headersStagedQueueIsEmpty*(ctx: BeaconCtxRef): bool =
## `true` iff no data are on the queue.
ctx.hdr.staged.len == 0
# ----------------
func headersStagedInit*(ctx: BeaconCtxRef) =
## Constructor
ctx.hdr.staged = LinkedHChainQueue.init()
# Update, so it can be followed nicely
ctx.updateMetrics()
# ------------------------------------------------------------------------------
# End

View File

@ -19,9 +19,6 @@ import
../../../protocol/eth/eth_types,
../../worker_desc
logScope:
topics = "beacon headers"
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,38 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises:[].}
import
pkg/eth/common,
pkg/stew/[interval_set, sorted_set],
../../worker_desc
func headersStagedQueueTopKey*(ctx: BeaconCtxRef): BlockNumber =
## Retrieve to staged block number
let qItem = ctx.hdr.staged.le(high BlockNumber).valueOr:
return BlockNumber(0)
qItem.key
func headersStagedQueueLen*(ctx: BeaconCtxRef): int =
## Number of staged records
ctx.hdr.staged.len
func headersStagedQueueIsEmpty*(ctx: BeaconCtxRef): bool =
## `true` iff no data are on the queue.
ctx.hdr.staged.len == 0
# ----------------
func headersStagedQueueInit*(ctx: BeaconCtxRef) =
## Constructor
ctx.hdr.staged = LinkedHChainQueue.init()
# End

View File

@ -30,7 +30,9 @@ func bnStr*(w: Interval[BlockNumber,uint64]): string =
if w.len == 1: w.minPt.bnStr else: w.minPt.bnStr & ".." & w.maxPt.bnStr
func toStr*(a: chronos.Duration): string =
a.toString 2
var s = a.toString 2
if s.len == 0: s="0"
s
proc `$`*(w: Interval[BlockNumber,uint64]): string =

View File

@ -15,7 +15,9 @@ import
../../../core/chain,
../../protocol,
../worker_desc,
"."/[blocks_staged, blocks_unproc, db, headers_staged, headers_unproc]
./blocks_staged/staged_queue,
./headers_staged/staged_queue,
"."/[blocks_unproc, db, headers_unproc]
when enableTicker:
import ./start_stop/ticker
@ -36,17 +38,18 @@ when enableTicker:
dangling: ctx.layout.dangling,
final: ctx.layout.final,
head: ctx.layout.head,
headOk: ctx.layout.headLocked,
target: ctx.target.consHead.number,
targetOk: ctx.target.final != 0,
nHdrStaged: ctx.headersStagedQueueLen(),
hdrStagedTop: ctx.headersStagedTopKey(),
hdrStagedTop: ctx.headersStagedQueueTopKey(),
hdrUnprocTop: ctx.headersUnprocTop(),
nHdrUnprocessed: ctx.headersUnprocTotal() + ctx.headersUnprocBorrowed(),
nHdrUnprocFragm: ctx.headersUnprocChunks(),
nBlkStaged: ctx.blocksStagedQueueLen(),
blkStagedBottom: ctx.blocksStagedBottomKey(),
blkStagedBottom: ctx.blocksStagedQueueBottomKey(),
blkUnprocTop: ctx.blk.topRequest,
nBlkUnprocessed: ctx.blocksUnprocTotal() + ctx.blocksUnprocBorrowed(),
nBlkUnprocFragm: ctx.blocksUnprocChunks(),
@ -58,9 +61,12 @@ proc updateBeaconHeaderCB(ctx: BeaconCtxRef): ReqBeaconSyncTargetCB =
## Update beacon header. This function is intended as a call back function
## for the RPC module.
return proc(h: Header; f: Hash32) {.gcsafe, raises: [].} =
# Rpc checks empty header against a zero hash rather than `emptyRoot`
# Check whether there is an update running (otherwise take next upate)
if not ctx.target.locked:
if f != zeroHash32 and ctx.target.consHead.number < h.number:
# Rpc checks empty header against a zero hash rather than `emptyRoot`
if f != zeroHash32 and
ctx.layout.head < h.number and
ctx.target.consHead.number < h.number:
ctx.target.consHead = h
ctx.target.final = BlockNumber(0)
ctx.target.finalHash = f
@ -86,17 +92,17 @@ else:
# ---------
proc setupDatabase*(ctx: BeaconCtxRef) =
proc setupDatabase*(ctx: BeaconCtxRef; info: static[string]) =
## Initalise database related stuff
# Initialise up queues and lists
ctx.headersStagedInit()
ctx.blocksStagedInit()
ctx.headersStagedQueueInit()
ctx.blocksStagedQueueInit()
ctx.headersUnprocInit()
ctx.blocksUnprocInit()
# Load initial state from database if there is any
ctx.dbLoadSyncStateLayout()
ctx.dbLoadSyncStateLayout info
# Set blocks batch import value for block import
if ctx.pool.nBodiesBatch < nFetchBodiesRequest:

View File

@ -33,6 +33,7 @@ type
dangling*: BlockNumber
final*: BlockNumber
head*: BlockNumber
headOk*: bool
target*: BlockNumber
targetOk*: bool
@ -79,20 +80,25 @@ proc tickerLogger(t: TickerRef) {.gcsafe.} =
B = if data.base == data.latest: "L" else: data.base.bnStr
L = if data.latest == data.coupler: "C" else: data.latest.bnStr
C = if data.coupler == data.dangling: "D" else: data.coupler.bnStr
D = if data.dangling == data.final: "F" else: data.dangling.bnStr
D = if data.dangling == data.final: "F"
elif data.dangling == data.head: "H"
else: data.dangling.bnStr
F = if data.final == data.head: "H" else: data.final.bnStr
H = if data.head == data.target: "T" else: data.head.bnStr
H = if data.headOk:
if data.head == data.target: "T" else: data.head.bnStr
else:
if data.head == data.target: "?T" else: "?" & $data.head
T = if data.targetOk: data.target.bnStr else: "?" & $data.target
hS = if data.nHdrStaged == 0: "n/a"
else: data.hdrStagedTop.bnStr & "(" & $data.nHdrStaged & ")"
hU = if data.nHdrUnprocFragm == 0: "n/a"
hU = if data.nHdrUnprocFragm == 0 and data.nHdrUnprocessed == 0: "n/a"
else: data.hdrUnprocTop.bnStr & "(" &
data.nHdrUnprocessed.toSI & "," & $data.nHdrUnprocFragm & ")"
bS = if data.nBlkStaged == 0: "n/a"
else: data.blkStagedBottom.bnStr & "(" & $data.nBlkStaged & ")"
bU = if data.nBlkUnprocFragm == 0: "n/a"
bU = if data.nBlkUnprocFragm == 0 and data.nBlkUnprocessed == 0: "n/a"
else: data.blkUnprocTop.bnStr & "(" &
data.nBlkUnprocessed.toSI & "," & $data.nBlkUnprocFragm & ")"
@ -121,12 +127,13 @@ proc tickerLogger(t: TickerRef) {.gcsafe.} =
proc setLogTicker(t: TickerRef; at: Moment) {.gcsafe.}
proc runLogTicker(t: TickerRef) {.gcsafe.} =
t.prettyPrint(t)
t.setLogTicker(Moment.fromNow(tickerLogInterval))
if not t.statsCb.isNil:
t.prettyPrint(t)
t.setLogTicker(Moment.fromNow(tickerLogInterval))
proc setLogTicker(t: TickerRef; at: Moment) =
if t.statsCb.isNil:
debug "Stopped", nBuddies=t.lastStats.nBuddies
debug "Ticker stopped"
else:
# Store the `runLogTicker()` in a closure to avoid some garbage collection
# memory corruption issues that might occur otherwise.

View File

@ -17,10 +17,8 @@ import
../../../core/chain,
../worker_desc,
./update/metrics,
"."/[blocks_unproc, db, headers_staged, headers_unproc]
logScope:
topics = "beacon update"
./headers_staged/staged_queue,
"."/[blocks_unproc, db, headers_unproc]
# ------------------------------------------------------------------------------
# Private functions
@ -51,12 +49,13 @@ proc updateTargetChange(ctx: BeaconCtxRef; info: static[string]) =
var target = ctx.target.consHead.number
# Need: `H < T` and `C == D`
if target != 0 and target <= ctx.layout.head: # violates `H < T`
trace info & ": not applicable", H=ctx.layout.head.bnStr, T=target.bnStr
if target != 0 and target <= ctx.layout.head: # violates `H < T`
trace info & ": update not applicable",
H=ctx.layout.head.bnStr, T=target.bnStr
return
if ctx.layout.coupler != ctx.layout.dangling: # violates `C == D`
trace info & ": not applicable",
trace info & ": update not applicable",
C=ctx.layout.coupler.bnStr, D=ctx.layout.dangling.bnStr
return
@ -78,10 +77,10 @@ proc updateTargetChange(ctx: BeaconCtxRef; info: static[string]) =
# Save this header on the database so it needs not be fetched again from
# somewhere else.
ctx.dbStashHeaders(target, @[rlpHeader])
ctx.dbStashHeaders(target, @[rlpHeader], info)
# Save state
ctx.dbStoreSyncStateLayout()
ctx.dbStoreSyncStateLayout info
# Update range
doAssert ctx.headersUnprocTotal() == 0
@ -89,10 +88,13 @@ proc updateTargetChange(ctx: BeaconCtxRef; info: static[string]) =
doAssert ctx.headersStagedQueueIsEmpty()
ctx.headersUnprocSet(ctx.layout.coupler+1, ctx.layout.dangling-1)
trace info & ": updated", C=ctx.layout.coupler.bnStr,
trace info & ": updated sync state", C=ctx.layout.coupler.bnStr,
uTop=ctx.headersUnprocTop(),
D=ctx.layout.dangling.bnStr, H=ctx.layout.head.bnStr, T=target.bnStr
# Update, so it can be followed nicely
ctx.updateMetrics()
proc mergeAdjacentChains(ctx: BeaconCtxRef; info: static[string]) =
## Merge if `C+1` == `D`
@ -110,7 +112,7 @@ proc mergeAdjacentChains(ctx: BeaconCtxRef; info: static[string]) =
raiseAssert info & ": hashes do not match" &
" C=" & ctx.layout.coupler.bnStr & " D=" & $ctx.layout.dangling.bnStr
trace info & ": merging", C=ctx.layout.coupler.bnStr,
trace info & ": merging adjacent chains", C=ctx.layout.coupler.bnStr,
D=ctx.layout.dangling.bnStr
# Merge adjacent linked chains
@ -126,7 +128,10 @@ proc mergeAdjacentChains(ctx: BeaconCtxRef; info: static[string]) =
headLocked: ctx.layout.headLocked)
# Save state
ctx.dbStoreSyncStateLayout()
ctx.dbStoreSyncStateLayout info
# Update, so it can be followed nicely
ctx.updateMetrics()
# ------------------------------------------------------------------------------
# Public functions
@ -145,8 +150,11 @@ proc updateSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) =
doAssert ctx.layout.head == latest
ctx.layout.headLocked = false
# Check whether there is something to do regarding beacon node change
if not ctx.layout.headLocked and ctx.target.changed and ctx.target.final != 0:
if not ctx.layout.headLocked and # there was an active import request
ctx.target.changed and # and there is a new target from CL
ctx.target.final != 0: # .. ditto
ctx.target.changed = false
ctx.updateTargetChange info
@ -162,20 +170,13 @@ proc updateBlockRequests*(ctx: BeaconCtxRef; info: static[string]) =
# One can fill/import/execute blocks by number from `(L,C]`
if ctx.blk.topRequest < ctx.layout.coupler:
# So there is some space
trace info & ": updating", L=latest.bnStr,
trace info & ": updating block requests", L=latest.bnStr,
topReq=ctx.blk.topRequest.bnStr, C=ctx.layout.coupler.bnStr
ctx.blocksUnprocCommit(
0, max(latest, ctx.blk.topRequest) + 1, ctx.layout.coupler)
ctx.blk.topRequest = ctx.layout.coupler
proc updateMetrics*(ctx: BeaconCtxRef) =
let now = Moment.now()
if ctx.pool.nextUpdate < now:
ctx.updateMetricsImpl()
ctx.pool.nextUpdate = now + metricsUpdateInterval
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -11,10 +11,12 @@
{.push raises:[].}
import
pkg/metrics,
pkg/[chronos, metrics],
../../../../core/chain,
../../worker_desc,
".."/[blocks_staged, headers_staged]
../blocks_staged/staged_queue,
../headers_staged/staged_queue,
".."/[blocks_unproc, headers_unproc]
declareGauge beacon_base, "" &
"Max block number of imported finalised blocks"
@ -55,7 +57,7 @@ declareGauge beacon_buddies, "" &
"Number of currently active worker instances"
template updateMetricsImpl*(ctx: BeaconCtxRef) =
template updateMetricsImpl(ctx: BeaconCtxRef) =
metrics.set(beacon_base, ctx.chain.baseNumber().int64)
metrics.set(beacon_latest, ctx.chain.latestNumber().int64)
metrics.set(beacon_coupler, ctx.layout.coupler.int64)
@ -74,4 +76,12 @@ template updateMetricsImpl*(ctx: BeaconCtxRef) =
metrics.set(beacon_buddies, ctx.pool.nBuddies)
# ---------------
proc updateMetrics*(ctx: BeaconCtxRef) =
let now = Moment.now()
if ctx.pool.nextUpdate < now:
ctx.updateMetricsImpl()
ctx.pool.nextUpdate = now + metricsUpdateInterval
# End

View File

@ -14,7 +14,7 @@ import
pkg/chronos
const
enableTicker* = true
enableTicker* = false
## Log regular status updates similar to metrics. Great for debugging.
runsThisManyPeersOnly* = 8
@ -43,7 +43,7 @@ const
workerIdleWaitInterval* = chronos.seconds(10)
## Sleep some time in multi-mode if there is nothing to do
asyncThreadSwitchTimeSlot* = chronos.nanoseconds(10)
asyncThreadSwitchTimeSlot* = chronos.nanoseconds(1)
## Nano-sleep to allows pseudo/async thread switch
# ----------------------

View File

@ -138,7 +138,7 @@ type
nBodiesBatch*: int ## Default `nFetchBodiesBatchDefault`
blocksStagedQuLenMax*: int ## Default `blocksStagedQueueLenMaxDefault`
# Info stuff, no functional contribution
# Info & debugging stuff, no functional contribution
nReorg*: int ## Number of reorg invocations (info only)
# Debugging stuff

View File

@ -38,7 +38,6 @@ type
CtxRef*[S] = ref object
## Shared state among all syncing peer workers (aka buddies.)
buddiesMax*: int ## Max number of buddies
poolMode*: bool ## Activate `runPool()` workers if set `true`
daemon*: bool ## Enable global background job
pool*: S ## Shared context for all worker peers

View File

@ -63,6 +63,16 @@
## These peer worker methods run concurrently in `async` mode.
##
##
## These are the control variables that can be set from within the above
## listed method/interface functions.
##
## *buddy.ctx.poolMode*
## Activate `runPool()` workers loop if set `true` (default is `false`.)
##
## *buddy.ctx.daemon*
## Activate `runDaemon()` background job if set `true`(default is `false`.)
##
##
## Additional import files needed when using this template:
## * eth/[common, p2p]
## * chronicles
@ -84,21 +94,28 @@ type
## List of active workers, using `Hash(Peer)` rather than `Peer`
KeyedQueue[ENode,RunnerBuddyRef[S,W]]
RunCtrl = enum
terminated = 0
shutdown
running
RunnerSyncRef*[S,W] = ref object
## Module descriptor
ctx*: CtxRef[S] ## Shared data
pool: PeerPool ## For starting the system
buddiesMax: int ## Max number of buddies
buddies: ActiveBuddies[S,W] ## LRU cache with worker descriptors
daemonRunning: bool ## Run global background job
monitorLock: bool ## Monitor mode is activated
activeMulti: int ## Number of activated runners in multi-mode
shutdown: bool ## Internal shut down flag
daemonRunning: bool ## Running background job (in async mode)
monitorLock: bool ## Monitor mode is activated (non-async mode)
activeMulti: int ## Number of async workers active/running
runCtrl: RunCtrl ## Start/stop control
RunnerBuddyRef[S,W] = ref object
## Per worker peer descriptor
dsc: RunnerSyncRef[S,W] ## Scheduler descriptor
worker: BuddyRef[S,W] ## Worker peer data
zombified: Moment ## When it became undead (if any)
zombified: Moment ## Time when it became undead (if any)
isRunning: bool ## Peer worker is active (in async mode)
const
zombieTimeToLinger = 20.seconds
@ -119,6 +136,9 @@ const
execPoolModeLoopMax = 100
## Avoids continuous looping
termWaitPollingTime = 10.milliseconds
## Wait for instance to have terminated for shutdown
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
@ -140,10 +160,48 @@ proc key(peer: Peer): ENode =
# Private functions
# ------------------------------------------------------------------------------
proc terminate[S,W](dsc: RunnerSyncRef[S,W]) =
## Reqest termination and wait
mixin runRelease
if dsc.runCtrl == running:
# Gracefully shut down async services
dsc.runCtrl = shutdown
dsc.ctx.daemon = false
# Wait for workers and daemon to have terminated
while 0 < dsc.buddies.len:
for w in dsc.buddies.nextPairs:
if w.data.isRunning:
w.data.worker.ctrl.stopped = true
# Activate async job so it can finish
try: waitFor sleepAsync termWaitPollingTime
except CancelledError: discard
else:
dsc.buddies.del w.key # this is OK to delete
while dsc.daemonRunning:
# Activate async job so it can finish
try: waitFor sleepAsync termWaitPollingTime
except CancelledError: discard
# Final shutdown
dsc.ctx.runRelease()
# Remove call back from pool manager. This comes last as it will
# potentially unlink references which are used in the worker instances
# (e.g. peer for logging.)
dsc.pool.delObserver(dsc)
# Clean up, free memory from sub-objects
dsc.ctx = CtxRef[S]()
dsc.runCtrl = terminated
proc daemonLoop[S,W](dsc: RunnerSyncRef[S,W]) {.async.} =
mixin runDaemon
if dsc.ctx.daemon and not dsc.shutdown:
if dsc.ctx.daemon and dsc.runCtrl == running:
dsc.daemonRunning = true
# Continue until stopped
@ -178,7 +236,15 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} =
# Continue until stopped
block taskExecLoop:
while worker.ctrl.running and not dsc.shutdown:
buddy.isRunning = true
proc isShutdown(): bool =
dsc.runCtrl != running
proc isActive(): bool =
worker.ctrl.running and not isShutdown()
while isActive():
# Enforce minimum time spend on this loop
let startMoment = Moment.now()
@ -192,7 +258,7 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} =
dsc.monitorLock = true
while 0 < dsc.activeMulti:
await sleepAsync execLoopPollingTime
if worker.ctrl.stopped:
if not isActive():
dsc.monitorLock = false
break taskExecLoop
@ -209,6 +275,10 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} =
else:
delayed = nil # not executing any final item
break # `true` => stop
# Shutdown in progress?
if isShutdown():
dsc.monitorLock = false
break taskExecLoop
if not delayed.isNil:
discard delayed.runPool(last=true, laps=count) # final item
if not ctx.poolMode:
@ -221,17 +291,22 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} =
# end. So zombies will end up leftish.
discard dsc.buddies.lruFetch peer.key
# Peer mode
# Peer worker in async mode
dsc.activeMulti.inc
# Continue doing something, work a bit
await worker.runPeer()
dsc.activeMulti.dec
# Check for shutdown
if isShutdown():
worker.ctrl.stopped = true
break taskExecLoop
# Dispatch daemon sevice if needed
if not dsc.daemonRunning and dsc.ctx.daemon:
asyncSpawn dsc.daemonLoop()
# Check for termination
# Check for worker termination
if worker.ctrl.stopped:
break taskExecLoop
@ -245,17 +320,20 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} =
# End while
# Note that `runStart()` was dispatched in `onPeerConnected()`
if worker.ctrl.running:
# So shutdown was called
worker.ctrl.stopped = true
worker.runStop()
buddy.isRunning = false
proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) =
mixin runStart, runStop
# Ignore if shutdown is processing
if dsc.runCtrl != running:
return
# Check for known entry (which should not exist.)
let
maxWorkers {.used.} = dsc.ctx.buddiesMax
maxWorkers {.used.} = dsc.buddiesMax
nPeers {.used.} = dsc.pool.len
zombie = dsc.buddies.eq peer.key
if zombie.isOk:
@ -290,7 +368,7 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) =
#
# In the past, one could not rely on the peer pool for having the number of
# connections limited.
if dsc.ctx.buddiesMax <= dsc.buddies.len:
if dsc.buddiesMax <= dsc.buddies.len:
let
leastVal = dsc.buddies.shift.value # unqueue first/least item
oldest = leastVal.data.worker
@ -310,10 +388,7 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) =
oldest.ctrl.zombie = true
# Add peer entry
discard dsc.buddies.lruAppend(peer.key, buddy, dsc.ctx.buddiesMax)
trace "Running peer worker", peer, nPeers,
nWorkers=dsc.buddies.len, maxWorkers
discard dsc.buddies.lruAppend(peer.key, buddy, dsc.buddiesMax)
asyncSpawn buddy.workerLoop()
@ -321,7 +396,7 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) =
proc onPeerDisconnected[S,W](dsc: RunnerSyncRef[S,W], peer: Peer) =
let
nPeers = dsc.pool.len
maxWorkers = dsc.ctx.buddiesMax
maxWorkers = dsc.buddiesMax
nWorkers = dsc.buddies.len
rc = dsc.buddies.eq peer.key
if rc.isErr:
@ -343,8 +418,6 @@ proc onPeerDisconnected[S,W](dsc: RunnerSyncRef[S,W], peer: Peer) =
else:
rc.value.worker.ctrl.stopped = true # in case it is hanging somewhere
dsc.buddies.del peer.key
trace "Disconnected buddy", peer, nPeers,
nWorkers=dsc.buddies.len, maxWorkers
# ------------------------------------------------------------------------------
# Public functions
@ -356,47 +429,45 @@ proc initSync*[S,W](
slots: int;
) =
## Constructor
# Leave one extra slot so that it can holds a *zombie* even if all slots
# are full. The effect is that a re-connect on the latest zombie will be
# rejected as long as its worker descriptor is registered.
dsc.ctx = CtxRef[S](buddiesMax: max(1, slots + 1))
dsc.buddiesMax = max(1, slots + 1)
dsc.pool = node.peerPool
dsc.buddies.init(dsc.ctx.buddiesMax)
dsc.buddies.init(dsc.buddiesMax)
dsc.ctx = CtxRef[S]()
proc startSync*[S,W](dsc: RunnerSyncRef[S,W]): bool =
## Set up `PeerObserver` handlers and start syncing.
mixin runSetup
# Initialise sub-systems
if dsc.ctx.runSetup():
var po = PeerObserver(
onPeerConnected:
proc(p: Peer) {.gcsafe.} =
if dsc.runCtrl == terminated:
# Initialise sub-systems
if dsc.ctx.runSetup():
dsc.runCtrl = running
var po = PeerObserver(
onPeerConnected: proc(p: Peer) {.gcsafe.} =
dsc.onPeerConnected(p),
onPeerDisconnected:
proc(p: Peer) {.gcsafe.} =
onPeerDisconnected: proc(p: Peer) {.gcsafe.} =
dsc.onPeerDisconnected(p))
po.setProtocol eth
dsc.pool.addObserver(dsc, po)
if dsc.ctx.daemon:
asyncSpawn dsc.daemonLoop()
return true
po.setProtocol eth
dsc.pool.addObserver(dsc, po)
if dsc.ctx.daemon:
asyncSpawn dsc.daemonLoop()
return true
proc stopSync*[S,W](dsc: RunnerSyncRef[S,W]) =
## Stop syncing and free peer handlers .
mixin runRelease
dsc.pool.delObserver(dsc)
dsc.terminate()
# Gracefully shut down async services
dsc.shutdown = true
for buddy in dsc.buddies.nextValues:
buddy.worker.ctrl.stopped = true
dsc.ctx.daemon = false
# Final shutdown (note that some workers might still linger on)
dsc.ctx.runRelease()
proc isRunning*[S,W](dsc: RunnerSyncRef[S,W]): bool =
## Check start/stop state
dsc.runCtrl == running
# ------------------------------------------------------------------------------
# End