Update sync scheduler (#1297)

* Add `stop()` methods to shutdown to shutdown procedure

why:
  Nasty behaviour when hitting Ctrl-C, otherwise

* Add background service to sync scheduler

why:
  The background service will be used for sync data import and recovery
  after restart.

   It is controlled by the sync scheduler for an easy turn/on off API.

also:
  Simplified snap ticker time calc.

* Fix typo
This commit is contained in:
Jordan Hrycaj 2022-11-14 14:13:00 +00:00 committed by GitHub
parent 43f4b99a1b
commit 9aa925cf36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 102 additions and 43 deletions

View File

@ -58,6 +58,8 @@ type
networkLoop: Future[void]
dbBackend: ChainDB
peerManager: PeerManagerRef
snapSyncRef: SnapSyncRef
fullSyncRef: FullSyncRef
merger: MergerRef
proc importBlocks(conf: NimbusConf, chainDB: BaseChainDB) =
@ -167,11 +169,15 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
conf.logLevel in {LogLevel.INFO, LogLevel.DEBUG, LogLevel.TRACE}
case conf.syncMode:
of SyncMode.Full:
FullSyncRef.init(nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng,
conf.maxPeers, tickerOK).start
nimbus.fullSyncRef = FullSyncRef.init(
nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers,
tickerOK)
nimbus.fullSyncRef.start
of SyncMode.Snap:
SnapSyncRef.init(nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng,
conf.maxPeers, nimbus.dbBackend, tickerOK).start
nimbus.snapSyncRef = SnapSyncRef.init(
nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers,
nimbus.dbBackend, tickerOK)
nimbus.snapSyncRef.start
of SyncMode.Default:
discard
@ -438,6 +444,10 @@ proc stop*(nimbus: NimbusNode, conf: NimbusConf) {.async, gcsafe.} =
await nimbus.networkLoop.cancelAndWait()
if nimbus.peerManager.isNil.not:
await nimbus.peerManager.stop()
if nimbus.snapSyncRef.isNil.not:
nimbus.snapSyncRef.stop()
if nimbus.fullSyncRef.isNil.not:
nimbus.fullSyncRef.stop()
proc process*(nimbus: NimbusNode, conf: NimbusConf) =
# Main event loop

View File

@ -33,6 +33,9 @@ proc runSetup(ctx: FullCtxRef; ticker: bool): bool =
proc runRelease(ctx: FullCtxRef) =
worker.release(ctx)
proc runDaemon(ctx: FullCtxRef) {.async.} =
discard
proc runStart(buddy: FullBuddyRef): bool =
worker.start(buddy)

View File

@ -35,6 +35,9 @@ proc runSetup(ctx: SnapCtxRef; ticker: bool): bool =
proc runRelease(ctx: SnapCtxRef) =
worker.release(ctx)
proc runDaemon(ctx: SnapCtxRef) {.async.} =
await worker.runDaemon(ctx)
proc runStart(buddy: SnapBuddyRef): bool =
worker.start(buddy)
@ -71,7 +74,7 @@ proc init*(
doAssert not result.ctx.ethWireCtx.isNil
proc start*(ctx: SnapSyncRef) =
doAssert ctx.startSync()
doAssert ctx.startSync(daemon = true)
proc stop*(ctx: SnapSyncRef) =
ctx.stopSync()

View File

@ -273,17 +273,19 @@ proc stop*(buddy: SnapBuddyRef) =
# Public functions
# ------------------------------------------------------------------------------
proc runSingle*(buddy: SnapBuddyRef) {.async.} =
## This peer worker is invoked if the peer-local flag `buddy.ctrl.multiOk`
## is set `false` which is the default mode. This flag is updated by the
## worker when deemed appropriate.
## * For all worker peerss, there can be only one `runSingle()` function
## active simultaneously.
## * There will be no `runMulti()` function active for the very same worker
## peer that runs the `runSingle()` function.
## * There will be no `runPool()` iterator active simultaneously.
proc runDaemon*(ctx: SnapCtxRef) {.async.} =
## Enabled while `ctx.daemon` is `true`
##
## Note that this function runs in `async` mode.
let nPivots = ctx.data.pivotTable.len
trace "I am the mighty recovery daemon ... stopped for now", nPivots
# To be populated ...
ctx.daemon = false
proc runSingle*(buddy: SnapBuddyRef) {.async.} =
## Enabled while
## * `buddy.ctrl.multiOk` is `false`
## * `buddy.ctrl.poolMode` is `false`
##
let peer = buddy.peer
# This pivot finder one harmonises assigned difficulties of at least two
@ -299,18 +301,7 @@ proc runSingle*(buddy: SnapBuddyRef) {.async.} =
proc runPool*(buddy: SnapBuddyRef, last: bool) =
## Ocne started, the function `runPool()` is called for all worker peers in
## a row (as the body of an iteration.) There will be no other worker peer
## functions activated simultaneously.
##
## This procedure is started if the global flag `buddy.ctx.poolMode` is set
## `true` (default is `false`.) It is the responsibility of the `runPool()`
## instance to reset the flag `buddy.ctx.poolMode`, typically at the first
## peer instance.
##
## The argument `last` is set `true` if the last entry is reached.
##
## Note that this function does not run in `async` mode.
## Enabled when `buddy.ctrl.poolMode` is `true`
##
let ctx = buddy.ctx
if ctx.poolMode:
@ -349,9 +340,9 @@ proc runPool*(buddy: SnapBuddyRef, last: bool) =
proc runMulti*(buddy: SnapBuddyRef) {.async.} =
## This peer worker is invoked if the `buddy.ctrl.multiOk` flag is set
## `true` which is typically done after finishing `runSingle()`. This
## instance can be simultaneously active for all peer workers.
## Enabled while
## * `buddy.ctrl.multiOk` is `true`
## * `buddy.ctrl.poolMode` is `false`
##
let
ctx = buddy.ctx

View File

@ -10,7 +10,7 @@
# except according to those terms.
import
std/[strformat, strutils, times],
std/[strformat, strutils],
chronos,
chronicles,
eth/[common, p2p],
@ -41,13 +41,13 @@ type
lastStats: TickerStats
statsCb: TickerStatsUpdater
logTicker: TimerCallback
started: Time
visited: Time
started: Moment
visited: Moment
const
tickerStartDelay = chronos.milliseconds(100)
tickerLogInterval = chronos.seconds(1)
tickerLogSuppressMax = initDuration(seconds = 100)
tickerLogSuppressMax = chronos.seconds(100)
# ------------------------------------------------------------------------------
# Private functions: pretty printing
@ -112,7 +112,7 @@ proc setLogTicker(t: TickerRef; at: Moment) {.gcsafe.}
proc runLogTicker(t: TickerRef) {.gcsafe.} =
let
data = t.statsCb()
now = getTime().utc.toTime
now = Moment.now()
if data != t.lastStats or tickerLogSuppressMax < (now - t.visited):
t.lastStats = data
@ -128,7 +128,7 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
buddies = t.nBuddies
# With `int64`, there are more than 29*10^10 years range for seconds
up = (now - t.started).inSeconds.uint64.toSI
up = (now - t.started).seconds.uint64.toSI
mem = getTotalMem().uint.toSI
noFmtError("runLogTicker"):
@ -164,8 +164,8 @@ proc start*(t: TickerRef) =
## Re/start ticker unconditionally
#debug "Started ticker"
t.logTicker = safeSetTimer(Moment.fromNow(tickerStartDelay), runLogTicker, t)
if t.started == Time.default:
t.started = getTime().utc.toTime
if t.started == Moment.default:
t.started = Moment.now()
proc stop*(t: TickerRef) =
## Stop ticker unconditionally

View File

@ -50,6 +50,7 @@ type
ethWireCtx*: EthWireRef ## Eth protocol wire context (if available)
chain*: Chain ## Block chain database (no need for `Peer`)
poolMode*: bool ## Activate `runPool()` workers if set `true`
daemon*: bool ## Enable global background job
data*: S ## Shared context for all worker peers
# ------------------------------------------------------------------------------

View File

@ -20,6 +20,13 @@
## *runRelease(ctx: CtxRef[S])*
## Global clean up, done with all the worker peers.
##
## *runDaemon(ctx: CtxRef[S]) {.async.}*
## Global background job that will be re-started as long as the variable
## `ctx.daemon` is set `true`. If that job was stopped due to setting
## `ctx.daemon` to `false`, it will be restarted when reset to `true` when
## there is some activity on the `runPool()`, `runSingle()`, or `runMulti()`
## functions.
##
##
## *runStart(buddy: BuddyRef[S,W]): bool*
## Initialise a new worker peer.
@ -99,6 +106,7 @@ type
pool: PeerPool ## For starting the system
buddies: ActiveBuddies[S,W] ## LRU cache with worker descriptors
tickerOk: bool ## Ticker logger
daemonRunning: bool ## Run global background job
singleRunLock: bool ## Some single mode runner is activated
monitorLock: bool ## Monitor mode is activated
activeMulti: int ## Number of activated runners in multi-mode
@ -110,12 +118,12 @@ type
const
execLoopTimeElapsedMin = 50.milliseconds
## Minimum elapsed time the event loop needs for a single lap. If it
## is faster, asynchroneous sleep seconds are added. in order to avoid
## Minimum elapsed time an exec loop needs for a single lap. If it is
## faster, asynchroneous sleep seconds are added. in order to avoid
## cpu overload.
execLoopTaskSwitcher = 1.nanoseconds
## Asynchroneous waiting time at the end of the exec loop unless some sleep
## Asynchroneous waiting time at the end of an exec loop unless some sleep
## seconds were added as decribed by `execLoopTimeElapsedMin`, above.
execLoopPollingTime = 50.milliseconds
@ -133,6 +141,34 @@ proc hash(peer: Peer): Hash =
# Private functions
# ------------------------------------------------------------------------------
proc daemonLoop[S,W](dsc: RunnerSyncRef[S,W]) {.async.} =
mixin runDaemon
if dsc.ctx.daemon:
dsc.daemonRunning = true
# Continue until stopped
while true:
# Enforce minimum time spend on this loop
let startMoment = Moment.now()
await dsc.ctx.runDaemon()
if not dsc.ctx.daemon:
break
# Enforce minimum time spend on this loop so we never each 100% cpu load
# caused by some empty sub-tasks which are out of this scheduler control.
let
elapsed = Moment.now() - startMoment
suspend = if execLoopTimeElapsedMin <= elapsed: execLoopTaskSwitcher
else: execLoopTimeElapsedMin - elapsed
await sleepAsync suspend
# End while
dsc.daemonRunning = false
proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} =
mixin runMulti, runSingle, runPool, runStop
let
@ -190,6 +226,11 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} =
await worker.runSingle()
dsc.singleRunLock = false
# Dispatch daemon sevice if needed
if not dsc.daemonRunning and dsc.ctx.daemon:
asyncSpawn dsc.daemonLoop()
# Check for termination
if worker.ctrl.stopped:
break taskExecLoop
@ -294,7 +335,7 @@ proc initSync*[S,W](
dsc.tickerOk = noisy
dsc.buddies.init(dsc.ctx.buddiesMax)
proc startSync*[S,W](dsc: RunnerSyncRef[S,W]): bool =
proc startSync*[S,W](dsc: RunnerSyncRef[S,W]; daemon = false): bool =
## Set up `PeerObserver` handlers and start syncing.
mixin runSetup
# Initialise sub-systems
@ -309,14 +350,24 @@ proc startSync*[S,W](dsc: RunnerSyncRef[S,W]): bool =
po.setProtocol eth
dsc.pool.addObserver(dsc, po)
if daemon:
dsc.ctx.daemon = true
asyncSpawn dsc.daemonLoop()
return true
proc stopSync*[S,W](dsc: RunnerSyncRef[S,W]) =
## Stop syncing and free peer handlers .
mixin runRelease
dsc.ctx.runRelease()
dsc.pool.delObserver(dsc)
# Shut down async services
for buddy in dsc.buddies.nextValues:
buddy.worker.ctrl.stopped = true
dsc.ctx.daemon = false
# Final shutdown (note that some workers might still linger on)
dsc.ctx.runRelease()
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------