diff --git a/nimbus/nimbus.nim b/nimbus/nimbus.nim index 2f9eb2771..fb1361b56 100644 --- a/nimbus/nimbus.nim +++ b/nimbus/nimbus.nim @@ -58,6 +58,8 @@ type networkLoop: Future[void] dbBackend: ChainDB peerManager: PeerManagerRef + snapSyncRef: SnapSyncRef + fullSyncRef: FullSyncRef merger: MergerRef proc importBlocks(conf: NimbusConf, chainDB: BaseChainDB) = @@ -167,11 +169,15 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf, conf.logLevel in {LogLevel.INFO, LogLevel.DEBUG, LogLevel.TRACE} case conf.syncMode: of SyncMode.Full: - FullSyncRef.init(nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, - conf.maxPeers, tickerOK).start + nimbus.fullSyncRef = FullSyncRef.init( + nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers, + tickerOK) + nimbus.fullSyncRef.start of SyncMode.Snap: - SnapSyncRef.init(nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, - conf.maxPeers, nimbus.dbBackend, tickerOK).start + nimbus.snapSyncRef = SnapSyncRef.init( + nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers, + nimbus.dbBackend, tickerOK) + nimbus.snapSyncRef.start of SyncMode.Default: discard @@ -438,6 +444,10 @@ proc stop*(nimbus: NimbusNode, conf: NimbusConf) {.async, gcsafe.} = await nimbus.networkLoop.cancelAndWait() if nimbus.peerManager.isNil.not: await nimbus.peerManager.stop() + if nimbus.snapSyncRef.isNil.not: + nimbus.snapSyncRef.stop() + if nimbus.fullSyncRef.isNil.not: + nimbus.fullSyncRef.stop() proc process*(nimbus: NimbusNode, conf: NimbusConf) = # Main event loop diff --git a/nimbus/sync/full.nim b/nimbus/sync/full.nim index ad9e4f875..3099588a4 100644 --- a/nimbus/sync/full.nim +++ b/nimbus/sync/full.nim @@ -33,6 +33,9 @@ proc runSetup(ctx: FullCtxRef; ticker: bool): bool = proc runRelease(ctx: FullCtxRef) = worker.release(ctx) +proc runDaemon(ctx: FullCtxRef) {.async.} = + discard + proc runStart(buddy: FullBuddyRef): bool = worker.start(buddy) diff --git a/nimbus/sync/snap.nim b/nimbus/sync/snap.nim index a535f9152..80a71abc4 100644 --- a/nimbus/sync/snap.nim +++ b/nimbus/sync/snap.nim @@ -35,6 +35,9 @@ proc runSetup(ctx: SnapCtxRef; ticker: bool): bool = proc runRelease(ctx: SnapCtxRef) = worker.release(ctx) +proc runDaemon(ctx: SnapCtxRef) {.async.} = + await worker.runDaemon(ctx) + proc runStart(buddy: SnapBuddyRef): bool = worker.start(buddy) @@ -71,7 +74,7 @@ proc init*( doAssert not result.ctx.ethWireCtx.isNil proc start*(ctx: SnapSyncRef) = - doAssert ctx.startSync() + doAssert ctx.startSync(daemon = true) proc stop*(ctx: SnapSyncRef) = ctx.stopSync() diff --git a/nimbus/sync/snap/worker.nim b/nimbus/sync/snap/worker.nim index e1f2a160a..483a1db67 100644 --- a/nimbus/sync/snap/worker.nim +++ b/nimbus/sync/snap/worker.nim @@ -273,17 +273,19 @@ proc stop*(buddy: SnapBuddyRef) = # Public functions # ------------------------------------------------------------------------------ -proc runSingle*(buddy: SnapBuddyRef) {.async.} = - ## This peer worker is invoked if the peer-local flag `buddy.ctrl.multiOk` - ## is set `false` which is the default mode. This flag is updated by the - ## worker when deemed appropriate. - ## * For all worker peerss, there can be only one `runSingle()` function - ## active simultaneously. - ## * There will be no `runMulti()` function active for the very same worker - ## peer that runs the `runSingle()` function. - ## * There will be no `runPool()` iterator active simultaneously. +proc runDaemon*(ctx: SnapCtxRef) {.async.} = + ## Enabled while `ctx.daemon` is `true` ## - ## Note that this function runs in `async` mode. + let nPivots = ctx.data.pivotTable.len + trace "I am the mighty recovery daemon ... stopped for now", nPivots + # To be populated ... + ctx.daemon = false + + +proc runSingle*(buddy: SnapBuddyRef) {.async.} = + ## Enabled while + ## * `buddy.ctrl.multiOk` is `false` + ## * `buddy.ctrl.poolMode` is `false` ## let peer = buddy.peer # This pivot finder one harmonises assigned difficulties of at least two @@ -299,18 +301,7 @@ proc runSingle*(buddy: SnapBuddyRef) {.async.} = proc runPool*(buddy: SnapBuddyRef, last: bool) = - ## Ocne started, the function `runPool()` is called for all worker peers in - ## a row (as the body of an iteration.) There will be no other worker peer - ## functions activated simultaneously. - ## - ## This procedure is started if the global flag `buddy.ctx.poolMode` is set - ## `true` (default is `false`.) It is the responsibility of the `runPool()` - ## instance to reset the flag `buddy.ctx.poolMode`, typically at the first - ## peer instance. - ## - ## The argument `last` is set `true` if the last entry is reached. - ## - ## Note that this function does not run in `async` mode. + ## Enabled when `buddy.ctrl.poolMode` is `true` ## let ctx = buddy.ctx if ctx.poolMode: @@ -349,9 +340,9 @@ proc runPool*(buddy: SnapBuddyRef, last: bool) = proc runMulti*(buddy: SnapBuddyRef) {.async.} = - ## This peer worker is invoked if the `buddy.ctrl.multiOk` flag is set - ## `true` which is typically done after finishing `runSingle()`. This - ## instance can be simultaneously active for all peer workers. + ## Enabled while + ## * `buddy.ctrl.multiOk` is `true` + ## * `buddy.ctrl.poolMode` is `false` ## let ctx = buddy.ctx diff --git a/nimbus/sync/snap/worker/ticker.nim b/nimbus/sync/snap/worker/ticker.nim index 8d1924968..cfe277f08 100644 --- a/nimbus/sync/snap/worker/ticker.nim +++ b/nimbus/sync/snap/worker/ticker.nim @@ -10,7 +10,7 @@ # except according to those terms. import - std/[strformat, strutils, times], + std/[strformat, strutils], chronos, chronicles, eth/[common, p2p], @@ -41,13 +41,13 @@ type lastStats: TickerStats statsCb: TickerStatsUpdater logTicker: TimerCallback - started: Time - visited: Time + started: Moment + visited: Moment const tickerStartDelay = chronos.milliseconds(100) tickerLogInterval = chronos.seconds(1) - tickerLogSuppressMax = initDuration(seconds = 100) + tickerLogSuppressMax = chronos.seconds(100) # ------------------------------------------------------------------------------ # Private functions: pretty printing @@ -112,7 +112,7 @@ proc setLogTicker(t: TickerRef; at: Moment) {.gcsafe.} proc runLogTicker(t: TickerRef) {.gcsafe.} = let data = t.statsCb() - now = getTime().utc.toTime + now = Moment.now() if data != t.lastStats or tickerLogSuppressMax < (now - t.visited): t.lastStats = data @@ -128,7 +128,7 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} = buddies = t.nBuddies # With `int64`, there are more than 29*10^10 years range for seconds - up = (now - t.started).inSeconds.uint64.toSI + up = (now - t.started).seconds.uint64.toSI mem = getTotalMem().uint.toSI noFmtError("runLogTicker"): @@ -164,8 +164,8 @@ proc start*(t: TickerRef) = ## Re/start ticker unconditionally #debug "Started ticker" t.logTicker = safeSetTimer(Moment.fromNow(tickerStartDelay), runLogTicker, t) - if t.started == Time.default: - t.started = getTime().utc.toTime + if t.started == Moment.default: + t.started = Moment.now() proc stop*(t: TickerRef) = ## Stop ticker unconditionally diff --git a/nimbus/sync/sync_desc.nim b/nimbus/sync/sync_desc.nim index 9bdb48cc7..4d5239308 100644 --- a/nimbus/sync/sync_desc.nim +++ b/nimbus/sync/sync_desc.nim @@ -50,6 +50,7 @@ type ethWireCtx*: EthWireRef ## Eth protocol wire context (if available) chain*: Chain ## Block chain database (no need for `Peer`) poolMode*: bool ## Activate `runPool()` workers if set `true` + daemon*: bool ## Enable global background job data*: S ## Shared context for all worker peers # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/sync_sched.nim b/nimbus/sync/sync_sched.nim index 00de6701d..b0dd87b81 100644 --- a/nimbus/sync/sync_sched.nim +++ b/nimbus/sync/sync_sched.nim @@ -20,6 +20,13 @@ ## *runRelease(ctx: CtxRef[S])* ## Global clean up, done with all the worker peers. ## +## *runDaemon(ctx: CtxRef[S]) {.async.}* +## Global background job that will be re-started as long as the variable +## `ctx.daemon` is set `true`. If that job was stopped due to setting +## `ctx.daemon` to `false`, it will be restarted when reset to `true` when +## there is some activity on the `runPool()`, `runSingle()`, or `runMulti()` +## functions. +## ## ## *runStart(buddy: BuddyRef[S,W]): bool* ## Initialise a new worker peer. @@ -99,6 +106,7 @@ type pool: PeerPool ## For starting the system buddies: ActiveBuddies[S,W] ## LRU cache with worker descriptors tickerOk: bool ## Ticker logger + daemonRunning: bool ## Run global background job singleRunLock: bool ## Some single mode runner is activated monitorLock: bool ## Monitor mode is activated activeMulti: int ## Number of activated runners in multi-mode @@ -110,12 +118,12 @@ type const execLoopTimeElapsedMin = 50.milliseconds - ## Minimum elapsed time the event loop needs for a single lap. If it - ## is faster, asynchroneous sleep seconds are added. in order to avoid + ## Minimum elapsed time an exec loop needs for a single lap. If it is + ## faster, asynchroneous sleep seconds are added. in order to avoid ## cpu overload. execLoopTaskSwitcher = 1.nanoseconds - ## Asynchroneous waiting time at the end of the exec loop unless some sleep + ## Asynchroneous waiting time at the end of an exec loop unless some sleep ## seconds were added as decribed by `execLoopTimeElapsedMin`, above. execLoopPollingTime = 50.milliseconds @@ -133,6 +141,34 @@ proc hash(peer: Peer): Hash = # Private functions # ------------------------------------------------------------------------------ +proc daemonLoop[S,W](dsc: RunnerSyncRef[S,W]) {.async.} = + mixin runDaemon + + if dsc.ctx.daemon: + dsc.daemonRunning = true + + # Continue until stopped + while true: + # Enforce minimum time spend on this loop + let startMoment = Moment.now() + + await dsc.ctx.runDaemon() + + if not dsc.ctx.daemon: + break + + # Enforce minimum time spend on this loop so we never each 100% cpu load + # caused by some empty sub-tasks which are out of this scheduler control. + let + elapsed = Moment.now() - startMoment + suspend = if execLoopTimeElapsedMin <= elapsed: execLoopTaskSwitcher + else: execLoopTimeElapsedMin - elapsed + await sleepAsync suspend + # End while + + dsc.daemonRunning = false + + proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} = mixin runMulti, runSingle, runPool, runStop let @@ -190,6 +226,11 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} = await worker.runSingle() dsc.singleRunLock = false + # Dispatch daemon sevice if needed + if not dsc.daemonRunning and dsc.ctx.daemon: + asyncSpawn dsc.daemonLoop() + + # Check for termination if worker.ctrl.stopped: break taskExecLoop @@ -294,7 +335,7 @@ proc initSync*[S,W]( dsc.tickerOk = noisy dsc.buddies.init(dsc.ctx.buddiesMax) -proc startSync*[S,W](dsc: RunnerSyncRef[S,W]): bool = +proc startSync*[S,W](dsc: RunnerSyncRef[S,W]; daemon = false): bool = ## Set up `PeerObserver` handlers and start syncing. mixin runSetup # Initialise sub-systems @@ -309,14 +350,24 @@ proc startSync*[S,W](dsc: RunnerSyncRef[S,W]): bool = po.setProtocol eth dsc.pool.addObserver(dsc, po) + if daemon: + dsc.ctx.daemon = true + asyncSpawn dsc.daemonLoop() return true proc stopSync*[S,W](dsc: RunnerSyncRef[S,W]) = ## Stop syncing and free peer handlers . mixin runRelease - dsc.ctx.runRelease() dsc.pool.delObserver(dsc) + # Shut down async services + for buddy in dsc.buddies.nextValues: + buddy.worker.ctrl.stopped = true + dsc.ctx.daemon = false + + # Final shutdown (note that some workers might still linger on) + dsc.ctx.runRelease() + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------