diff --git a/nimbus/nimbus_execution_client.nim b/nimbus/nimbus_execution_client.nim index 289a3a24a..85e54642b 100644 --- a/nimbus/nimbus_execution_client.nim +++ b/nimbus/nimbus_execution_client.nim @@ -224,11 +224,10 @@ proc run(nimbus: NimbusNode, conf: NimbusConf) = setupP2P(nimbus, conf, com, protocols) setupRpc(nimbus, conf, com, protocols) - if conf.maxPeers > 0: + if conf.maxPeers > 0 and conf.engineApiServerEnabled(): # Not starting syncer if there is definitely no way to run it. This # avoids polling (i.e. waiting for instructions) and some logging. - let resumeOnly = not conf.engineApiServerEnabled() - if not nimbus.beaconSyncRef.start(resumeOnly): + if not nimbus.beaconSyncRef.start(): nimbus.beaconSyncRef = BeaconSyncRef(nil) if nimbus.state == NimbusState.Starting: diff --git a/nimbus/sync/beacon.nim b/nimbus/sync/beacon.nim index 8de012520..5ce7ef384 100644 --- a/nimbus/sync/beacon.nim +++ b/nimbus/sync/beacon.nim @@ -18,7 +18,7 @@ import "."/[sync_desc, sync_sched, protocol] logScope: - topics = "beacon" + topics = "beacon sync" type BeaconSyncRef* = RunnerSyncRef[BeaconCtxData,BeaconBuddyData] @@ -28,25 +28,25 @@ type # ------------------------------------------------------------------------------ proc runSetup(ctx: BeaconCtxRef): bool = - worker.setup(ctx) + worker.setup(ctx, "RunSetup") proc runRelease(ctx: BeaconCtxRef) = - worker.release(ctx) + worker.release(ctx, "RunRelease") proc runDaemon(ctx: BeaconCtxRef) {.async.} = - await worker.runDaemon(ctx) + await worker.runDaemon(ctx, "RunDaemon") proc runStart(buddy: BeaconBuddyRef): bool = - worker.start(buddy) + worker.start(buddy, "RunStart") proc runStop(buddy: BeaconBuddyRef) = - worker.stop(buddy) + worker.stop(buddy, "RunStop") proc runPool(buddy: BeaconBuddyRef; last: bool; laps: int): bool = - worker.runPool(buddy, last, laps) + worker.runPool(buddy, last, laps, "RunPool") proc runPeer(buddy: BeaconBuddyRef) {.async.} = - await worker.runPeer(buddy) + await worker.runPeer(buddy, "RunPeer") # ------------------------------------------------------------------------------ # Public functions @@ -57,7 +57,7 @@ proc init*( ethNode: EthereumNode; chain: ForkedChainRef; maxPeers: int; - chunkSize: int; + chunkSize = 0; ): T = var desc = T() desc.initSync(ethNode, maxPeers) @@ -65,13 +65,7 @@ proc init*( desc.ctx.pool.chain = chain desc -proc start*(desc: BeaconSyncRef; resumeOnly = false): bool = - ## Start beacon sync. If `resumeOnly` is set `true` the syncer will only - ## start up if it can resume work, e.g. after being previously interrupted. - if resumeOnly: - desc.ctx.dbLoadSyncStateLayout() - if not desc.ctx.layout.headLocked: - return false +proc start*(desc: BeaconSyncRef): bool = desc.startSync() proc stop*(desc: BeaconSyncRef) = diff --git a/nimbus/sync/beacon/worker.nim b/nimbus/sync/beacon/worker.nim index a45882efc..9832363d3 100644 --- a/nimbus/sync/beacon/worker.nim +++ b/nimbus/sync/beacon/worker.nim @@ -15,13 +15,9 @@ import pkg/eth/[common, p2p], pkg/stew/[interval_set, sorted_set], ../../common, - ./worker/[blocks_staged, db, headers_staged, headers_unproc, - start_stop, update], + ./worker/[blocks_staged, headers_staged, headers_unproc, start_stop, update], ./worker_desc -logScope: - topics = "beacon" - # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ @@ -36,15 +32,11 @@ proc bodiesToFetchOk(buddy: BeaconBuddyRef): bool = buddy.ctrl.running and not buddy.ctx.poolMode -proc napUnlessSomethingToFetch( - buddy: BeaconBuddyRef; - info: static[string]; - ): Future[bool] {.async.} = +proc napUnlessSomethingToFetch(buddy: BeaconBuddyRef): Future[bool] {.async.} = ## When idle, save cpu cycles waiting for something to do. if buddy.ctx.pool.importRunningOk or not (buddy.headersToFetchOk() or buddy.bodiesToFetchOk()): - debug info & ": idly wasting time", peer=buddy.peer await sleepAsync workerIdleWaitInterval return true return false @@ -53,13 +45,12 @@ proc napUnlessSomethingToFetch( # Public start/stop and admin functions # ------------------------------------------------------------------------------ -proc setup*(ctx: BeaconCtxRef): bool = +proc setup*(ctx: BeaconCtxRef; info: static[string]): bool = ## Global set up - debug "RUNSETUP" ctx.setupRpcMagic() # Load initial state from database if there is any - ctx.setupDatabase() + ctx.setupDatabase info # Debugging stuff, might be an empty template ctx.setupTicker() @@ -68,31 +59,31 @@ proc setup*(ctx: BeaconCtxRef): bool = ctx.daemon = true true -proc release*(ctx: BeaconCtxRef) = +proc release*(ctx: BeaconCtxRef; info: static[string]) = ## Global clean up - debug "RUNRELEASE" ctx.destroyRpcMagic() ctx.destroyTicker() -proc start*(buddy: BeaconBuddyRef): bool = +proc start*(buddy: BeaconBuddyRef; info: static[string]): bool = ## Initialise worker peer - const info = "RUNSTART" + let peer = buddy.peer if runsThisManyPeersOnly <= buddy.ctx.pool.nBuddies: - debug info & " peer limit reached", peer=buddy.peer + debug info & ": peers limit reached", peer return false if not buddy.startBuddy(): - debug info & " failed", peer=buddy.peer + debug info & ": failed", peer return false - debug info, peer=buddy.peer + debug info & ": new peer", peer true -proc stop*(buddy: BeaconBuddyRef) = +proc stop*(buddy: BeaconBuddyRef; info: static[string]) = ## Clean up this peer - debug "RUNSTOP", peer=buddy.peer, nInvocations=buddy.only.nMultiLoop, + debug info & ": release peer", peer=buddy.peer, + nInvocations=buddy.only.nMultiLoop, lastIdleGap=buddy.only.multiRunIdle.toStr buddy.stopBuddy() @@ -100,16 +91,13 @@ proc stop*(buddy: BeaconBuddyRef) = # Public functions # ------------------------------------------------------------------------------ -proc runDaemon*(ctx: BeaconCtxRef) {.async.} = +proc runDaemon*(ctx: BeaconCtxRef; info: static[string]) {.async.} = ## Global background job that will be re-started as long as the variable ## `ctx.daemon` is set `true`. If that job was stopped due to re-setting ## `ctx.daemon` to `false`, it will be restarted next after it was reset ## as `true` not before there is some activity on the `runPool()`, ## `runSingle()`, or `runMulti()` functions. ## - const info = "RUNDAEMON" - debug info - # Check for a possible header layout and body request changes ctx.updateSyncStateLayout info ctx.updateBlockRequests info @@ -127,15 +115,20 @@ proc runDaemon*(ctx: BeaconCtxRef) {.async.} = # Import from staged queue. while await ctx.blocksStagedImport(info): - ctx.updateMetrics() + if not ctx.daemon: + # Implied by external sync shutdown? + return # At the end of the cycle, leave time to trigger refill headers/blocks await sleepAsync daemonWaitInterval - ctx.updateMetrics() - -proc runPool*(buddy: BeaconBuddyRef; last: bool; laps: int): bool = +proc runPool*( + buddy: BeaconBuddyRef; + last: bool; + laps: int; + info: static[string]; + ): bool = ## Once started, the function `runPool()` is called for all worker peers in ## sequence as long as this function returns `false`. There will be no other ## `runPeer()` functions activated while `runPool()` is active. @@ -150,31 +143,25 @@ proc runPool*(buddy: BeaconBuddyRef; last: bool; laps: int): bool = ## ## Note that this function does not run in `async` mode. ## - const info = "RUNPOOL" - #debug info, peer=buddy.peer, laps - buddy.ctx.headersStagedReorg info # reorg + buddy.ctx.headersStagedReorg info true # stop -proc runPeer*(buddy: BeaconBuddyRef) {.async.} = +proc runPeer*(buddy: BeaconBuddyRef; info: static[string]) {.async.} = ## This peer worker method is repeatedly invoked (exactly one per peer) while ## the `buddy.ctrl.poolMode` flag is set `false`. ## - const info = "RUNPEER" let peer = buddy.peer if 0 < buddy.only.nMultiLoop: # statistics/debugging buddy.only.multiRunIdle = Moment.now() - buddy.only.stoppedMultiRun buddy.only.nMultiLoop.inc # statistics/debugging - trace info, peer, nInvocations=buddy.only.nMultiLoop, - lastIdleGap=buddy.only.multiRunIdle.toStr - # Update consensus header target when needed. It comes with a finalised # header hash where we need to complete the block number. await buddy.headerStagedUpdateTarget info - if not await buddy.napUnlessSomethingToFetch info: + if not await buddy.napUnlessSomethingToFetch(): # # Layout of a triple of linked header chains (see `README.md`) # :: diff --git a/nimbus/sync/beacon/worker/blocks_staged.nim b/nimbus/sync/beacon/worker/blocks_staged.nim index b322bac62..1fc8c01bb 100644 --- a/nimbus/sync/beacon/worker/blocks_staged.nim +++ b/nimbus/sync/beacon/worker/blocks_staged.nim @@ -17,11 +17,9 @@ import ../../../core/chain, ../worker_desc, ./blocks_staged/bodies, + ./update/metrics, "."/[blocks_unproc, db] -logScope: - topics = "beacon blocks" - # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ @@ -116,6 +114,7 @@ func blocksStagedCanImportOk*(ctx: BeaconCtxRef): bool = false + func blocksStagedFetchOk*(ctx: BeaconCtxRef): bool = ## Check whether body records can be fetched and stored on the `staged` queue. ## @@ -182,7 +181,7 @@ proc blocksStagedCollect*( if 0 < buddy.only.nBdyRespErrors and buddy.ctrl.stopped: # Make sure that this peer does not immediately reconnect buddy.ctrl.zombie = true - trace info & ": completely failed", peer, iv, ivReq, + trace info & ": list completely failed", peer, iv, ivReq, ctrl=buddy.ctrl.state, nRespErrors=buddy.only.nBdyRespErrors ctx.blocksUnprocCommit(iv.len, iv) # At this stage allow a task switch so that some other peer might try @@ -192,7 +191,7 @@ proc blocksStagedCollect*( # So there were some bodies downloaded already. Turn back unused data # and proceed with staging. - trace info & ": partially failed", peer, iv, ivReq, + trace info & ": list partially failed", peer, iv, ivReq, unused=BnRange.new(ivBottom,iv.maxPt) # There is some left over to store back ctx.blocksUnprocCommit(iv.len, ivBottom, iv.maxPt) @@ -240,6 +239,7 @@ proc blocksStagedImport*( if qItem.key != imported + 1: trace info & ": there is a gap L vs. staged", B=ctx.chain.baseNumber.bnStr, L=imported.bnStr, staged=qItem.key.bnStr + doAssert imported < qItem.key return false # Remove from queue @@ -249,35 +249,49 @@ proc blocksStagedImport*( nBlocks = qItem.data.blocks.len iv = BnRange.new(qItem.key, qItem.key + nBlocks.uint64 - 1) + trace info & ": import blocks ..", iv, nBlocks, + B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr + var maxImport = iv.maxPt for n in 0 ..< nBlocks: let nBn = qItem.data.blocks[n].header.number ctx.pool.chain.importBlock(qItem.data.blocks[n]).isOkOr: - warn info & ": import block error", iv, - B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr, - nBn=nBn.bnStr, txLevel=ctx.chain.db.level, `error`=error + warn info & ": import block error", iv, B=ctx.chain.baseNumber.bnStr, + L=ctx.chain.latestNumber.bnStr, nBn=nBn.bnStr, `error`=error # Restore what is left over below maxImport = ctx.chain.latestNumber() break + # Allow pseudo/async thread switch. + await sleepAsync asyncThreadSwitchTimeSlot + if not ctx.daemon: + # Shutdown? + maxImport = ctx.chain.latestNumber() + break + + # Update, so it can be followed nicely + ctx.updateMetrics() + # Occasionally mark the chain finalized if (n + 1) mod finaliserChainLengthMax == 0 or (n + 1) == nBlocks: let - nHash = qItem.data.getNthHash(n) - finHash = if nBn < ctx.layout.final: nHash else: ctx.layout.finalHash + nthHash = qItem.data.getNthHash(n) + finHash = if nBn < ctx.layout.final: nthHash else: ctx.layout.finalHash doAssert nBn == ctx.chain.latestNumber() - ctx.pool.chain.forkChoice(headHash=nHash, finalizedHash=finHash).isOkOr: - warn info & ": fork choice error", iv, - B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr, - F=ctx.layout.final.bnStr, txLevel=ctx.chain.db.level, nHash, - finHash=(if finHash == nHash: "nHash" else: "F"), `error`=error + ctx.pool.chain.forkChoice(headHash=nthHash, finalizedHash=finHash).isOkOr: + warn info & ": fork choice error", n, iv, B=ctx.chain.baseNumber.bnStr, + L=ctx.chain.latestNumber.bnStr, F=ctx.layout.final.bnStr, nthHash, + finHash=(if finHash == nthHash: "nHash" else: "F"), `error`=error # Restore what is left over below maxImport = ctx.chain.latestNumber() break # Allow pseudo/async thread switch. await sleepAsync asyncThreadSwitchTimeSlot + if not ctx.daemon: + maxImport = ctx.chain.latestNumber() + break # Import probably incomplete, so a partial roll back may be needed if maxImport < iv.maxPt: @@ -287,32 +301,13 @@ proc blocksStagedImport*( for bn in iv.minPt .. maxImport: ctx.dbUnstashHeader bn - trace info & ": import done", iv, - B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr, - F=ctx.layout.final.bnStr, txLevel=ctx.chain.db.level + # Update, so it can be followed nicely + ctx.updateMetrics() + + trace info & ": import done", iv, nBlocks, B=ctx.chain.baseNumber.bnStr, + L=ctx.chain.latestNumber.bnStr, F=ctx.layout.final.bnStr return true - -func blocksStagedBottomKey*(ctx: BeaconCtxRef): BlockNumber = - ## Retrieve to staged block number - let qItem = ctx.blk.staged.ge(0).valueOr: - return high(BlockNumber) - qItem.key - -func blocksStagedQueueLen*(ctx: BeaconCtxRef): int = - ## Number of staged records - ctx.blk.staged.len - -func blocksStagedQueueIsEmpty*(ctx: BeaconCtxRef): bool = - ## `true` iff no data are on the queue. - ctx.blk.staged.len == 0 - -# ---------------- - -func blocksStagedInit*(ctx: BeaconCtxRef) = - ## Constructor - ctx.blk.staged = StagedBlocksQueue.init() - # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/beacon/worker/blocks_staged/bodies.nim b/nimbus/sync/beacon/worker/blocks_staged/bodies.nim index 957499851..9678024bd 100644 --- a/nimbus/sync/beacon/worker/blocks_staged/bodies.nim +++ b/nimbus/sync/beacon/worker/blocks_staged/bodies.nim @@ -18,9 +18,6 @@ import ../../../protocol, ../../worker_desc -logScope: - topics = "beacon bodies" - # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/beacon/worker/blocks_staged/staged_queue.nim b/nimbus/sync/beacon/worker/blocks_staged/staged_queue.nim new file mode 100644 index 000000000..7b90a6575 --- /dev/null +++ b/nimbus/sync/beacon/worker/blocks_staged/staged_queue.nim @@ -0,0 +1,38 @@ +# Nimbus +# Copyright (c) 2023-2024 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at +# https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at +# https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +{.push raises:[].} + +import + pkg/eth/common, + pkg/stew/[interval_set, sorted_set], + ../../worker_desc + +func blocksStagedQueueBottomKey*(ctx: BeaconCtxRef): BlockNumber = + ## Retrieve to staged block number + let qItem = ctx.blk.staged.ge(0).valueOr: + return high(BlockNumber) + qItem.key + +func blocksStagedQueueLen*(ctx: BeaconCtxRef): int = + ## Number of staged records + ctx.blk.staged.len + +func blocksStagedQueueIsEmpty*(ctx: BeaconCtxRef): bool = + ## `true` iff no data are on the queue. + ctx.blk.staged.len == 0 + +# ---------------- + +func blocksStagedQueueInit*(ctx: BeaconCtxRef) = + ## Constructor + ctx.blk.staged = StagedBlocksQueue.init() + +# End diff --git a/nimbus/sync/beacon/worker/db.nim b/nimbus/sync/beacon/worker/db.nim index 05894bf81..143ca418b 100644 --- a/nimbus/sync/beacon/worker/db.nim +++ b/nimbus/sync/beacon/worker/db.nim @@ -19,9 +19,6 @@ import ../worker_desc, "."/[blocks_unproc, headers_unproc] -logScope: - topics = "beacon db" - const LhcStateKey = 1.beaconStateKey @@ -49,9 +46,8 @@ proc fetchSyncStateLayout(ctx: BeaconCtxRef): Opt[SyncStateLayout] = # Public functions # ------------------------------------------------------------------------------ -proc dbStoreSyncStateLayout*(ctx: BeaconCtxRef) = +proc dbStoreSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) = ## Save chain layout to persistent db - const info = "dbStoreSyncStateLayout" if ctx.layout == ctx.sst.lastLayout: return @@ -65,24 +61,25 @@ proc dbStoreSyncStateLayout*(ctx: BeaconCtxRef) = if txLevel == 0: let number = ctx.db.getSavedStateBlockNumber() ctx.db.persistent(number).isOkOr: - debug info & ": failed to save persistently", error=($$error) + debug info & ": failed to save sync state persistently", error=($$error) return else: - trace info & ": not saved, tx pending", txLevel + trace info & ": sync state not saved, tx pending", txLevel return - trace info & ": saved pesistently on DB" + trace info & ": saved sync state persistently" -proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef) = +proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) = ## Restore chain layout from persistent db - const info = "dbLoadLinkedHChainsLayout" - let rc = ctx.fetchSyncStateLayout() latest = ctx.chain.latestNumber() - if rc.isOk: + # See `dbLoadSyncStateAvailable()` for comments + if rc.isOk and + ctx.chain.baseNumber() <= rc.value.final and + latest < rc.value.head: ctx.sst.layout = rc.value # Add interval of unprocessed block range `(L,C]` from `README.md` @@ -92,7 +89,7 @@ proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef) = # Add interval of unprocessed header range `(C,D)` from `README.md` ctx.headersUnprocSet(ctx.layout.coupler+1, ctx.layout.dangling-1) - trace info & ": restored layout", L=latest.bnStr, + trace info & ": restored sync state", L=latest.bnStr, C=ctx.layout.coupler.bnStr, D=ctx.layout.dangling.bnStr, F=ctx.layout.final.bnStr, H=ctx.layout.head.bnStr @@ -106,12 +103,19 @@ proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef) = couplerHash: latestHash, dangling: latest, danglingParent: latestParent, + # There is no need to record a separate finalised head `F` as its only + # use is to serve as second argument in `forkChoice()` when committing + # a batch of imported blocks. Currently, there are no blocks to fetch + # and import. The system must wait for instructions and update the fields + # `final` and `head` while the latter will be increased so that import + # can start. final: latest, finalHash: latestHash, head: latest, - headHash: latestHash) + headHash: latestHash, + headLocked: false) - trace info & ": new layout", L="C", C="D", D="F", F="H", H=latest.bnStr + trace info & ": new sync state", L="C", C="D", D="F", F="H", H=latest.bnStr ctx.sst.lastLayout = ctx.layout @@ -121,6 +125,7 @@ proc dbStashHeaders*( ctx: BeaconCtxRef; first: BlockNumber; revBlobs: openArray[seq[byte]]; + info: static[string]; ) = ## Temporarily store header chain to persistent db (oblivious of the chain ## layout.) The headers should not be stashed if they are imepreted and @@ -133,7 +138,6 @@ proc dbStashHeaders*( ## #(first+1) -- revBlobs[^2] ## .. ## - const info = "dbStashHeaders" let kvt = ctx.db.ctx.getKvt() last = first + revBlobs.len.uint64 - 1 diff --git a/nimbus/sync/beacon/worker/headers_staged.nim b/nimbus/sync/beacon/worker/headers_staged.nim index f4f2434d6..7637d9ecc 100644 --- a/nimbus/sync/beacon/worker/headers_staged.nim +++ b/nimbus/sync/beacon/worker/headers_staged.nim @@ -17,12 +17,10 @@ import pkg/stew/[interval_set, sorted_set], ../../../common, ../worker_desc, + ./update/metrics, ./headers_staged/[headers, linked_hchain], ./headers_unproc -logScope: - topics = "beacon headers" - # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ @@ -59,7 +57,9 @@ proc headerStagedUpdateTarget*( info: static[string]; ) {.async.} = ## Fetch finalised beacon header if there is an update available - let ctx = buddy.ctx + let + ctx = buddy.ctx + peer = buddy.peer if not ctx.layout.headLocked and ctx.target.final == 0 and ctx.target.finalHash != zeroHash32 and @@ -75,10 +75,18 @@ proc headerStagedUpdateTarget*( if hash != ctx.target.finalHash: # Oops buddy.ctrl.zombie = true - trace info & ": finalised header hash mismatch", peer=buddy.peer, hash, + trace info & ": finalised header hash mismatch", peer, hash, expected=ctx.target.finalHash else: - ctx.target.final = rc.value[0].number + let final = rc.value[0].number + if final < ctx.chain.baseNumber(): + trace info & ": finalised number too low", peer, + B=ctx.chain.baseNumber.bnStr, finalised=rc.value[0].number.bnStr + else: + ctx.target.final = final + + # Update, so it can be followed nicely + ctx.updateMetrics() proc headersStagedCollect*( @@ -221,6 +229,9 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int = # anymore. discard ctx.hdr.staged.delete(iv.maxPt) + # Update, so it can be followed nicely + ctx.updateMetrics() + if qItem.data.hash != ctx.layout.danglingParent: # Discard wrong chain and merge back the range into the `unproc` list. ctx.headersUnprocCommit(0,iv) @@ -229,21 +240,24 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int = break # Store headers on database - ctx.dbStashHeaders(iv.minPt, qItem.data.revHdrs) + ctx.dbStashHeaders(iv.minPt, qItem.data.revHdrs, info) ctx.layout.dangling = iv.minPt ctx.layout.danglingParent = qItem.data.parentHash - ctx.dbStoreSyncStateLayout() + ctx.dbStoreSyncStateLayout info result.inc # count records - trace info & ": staged records saved", + trace info & ": staged header lists saved", nStaged=ctx.hdr.staged.len, nSaved=result if headersStagedQueueLengthLwm < ctx.hdr.staged.len: ctx.poolMode = true + # Update, so it can be followed nicely + ctx.updateMetrics() -func headersStagedReorg*(ctx: BeaconCtxRef; info: static[string]) = + +proc headersStagedReorg*(ctx: BeaconCtxRef; info: static[string]) = ## Some pool mode intervention. The effect is that all concurrent peers ## finish up their current work and run this function here (which might ## do nothing.) This stopping should be enough in most cases to re-organise @@ -277,26 +291,8 @@ func headersStagedReorg*(ctx: BeaconCtxRef; info: static[string]) = ctx.headersUnprocCommit(0, key - nHeaders + 1, key) discard ctx.hdr.staged.delete key - -func headersStagedTopKey*(ctx: BeaconCtxRef): BlockNumber = - ## Retrieve to staged block number - let qItem = ctx.hdr.staged.le(high BlockNumber).valueOr: - return BlockNumber(0) - qItem.key - -func headersStagedQueueLen*(ctx: BeaconCtxRef): int = - ## Number of staged records - ctx.hdr.staged.len - -func headersStagedQueueIsEmpty*(ctx: BeaconCtxRef): bool = - ## `true` iff no data are on the queue. - ctx.hdr.staged.len == 0 - -# ---------------- - -func headersStagedInit*(ctx: BeaconCtxRef) = - ## Constructor - ctx.hdr.staged = LinkedHChainQueue.init() + # Update, so it can be followed nicely + ctx.updateMetrics() # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/beacon/worker/headers_staged/headers.nim b/nimbus/sync/beacon/worker/headers_staged/headers.nim index b07da7c7c..347608c87 100644 --- a/nimbus/sync/beacon/worker/headers_staged/headers.nim +++ b/nimbus/sync/beacon/worker/headers_staged/headers.nim @@ -19,9 +19,6 @@ import ../../../protocol/eth/eth_types, ../../worker_desc -logScope: - topics = "beacon headers" - # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/beacon/worker/headers_staged/staged_queue.nim b/nimbus/sync/beacon/worker/headers_staged/staged_queue.nim new file mode 100644 index 000000000..725d948ab --- /dev/null +++ b/nimbus/sync/beacon/worker/headers_staged/staged_queue.nim @@ -0,0 +1,38 @@ +# Nimbus +# Copyright (c) 2023-2024 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at +# https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at +# https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +{.push raises:[].} + +import + pkg/eth/common, + pkg/stew/[interval_set, sorted_set], + ../../worker_desc + +func headersStagedQueueTopKey*(ctx: BeaconCtxRef): BlockNumber = + ## Retrieve to staged block number + let qItem = ctx.hdr.staged.le(high BlockNumber).valueOr: + return BlockNumber(0) + qItem.key + +func headersStagedQueueLen*(ctx: BeaconCtxRef): int = + ## Number of staged records + ctx.hdr.staged.len + +func headersStagedQueueIsEmpty*(ctx: BeaconCtxRef): bool = + ## `true` iff no data are on the queue. + ctx.hdr.staged.len == 0 + +# ---------------- + +func headersStagedQueueInit*(ctx: BeaconCtxRef) = + ## Constructor + ctx.hdr.staged = LinkedHChainQueue.init() + +# End diff --git a/nimbus/sync/beacon/worker/helpers.nim b/nimbus/sync/beacon/worker/helpers.nim index b55c25a71..2ed702866 100644 --- a/nimbus/sync/beacon/worker/helpers.nim +++ b/nimbus/sync/beacon/worker/helpers.nim @@ -30,7 +30,9 @@ func bnStr*(w: Interval[BlockNumber,uint64]): string = if w.len == 1: w.minPt.bnStr else: w.minPt.bnStr & ".." & w.maxPt.bnStr func toStr*(a: chronos.Duration): string = - a.toString 2 + var s = a.toString 2 + if s.len == 0: s="0" + s proc `$`*(w: Interval[BlockNumber,uint64]): string = diff --git a/nimbus/sync/beacon/worker/start_stop.nim b/nimbus/sync/beacon/worker/start_stop.nim index b616d3f0b..16dc878c8 100644 --- a/nimbus/sync/beacon/worker/start_stop.nim +++ b/nimbus/sync/beacon/worker/start_stop.nim @@ -15,7 +15,9 @@ import ../../../core/chain, ../../protocol, ../worker_desc, - "."/[blocks_staged, blocks_unproc, db, headers_staged, headers_unproc] + ./blocks_staged/staged_queue, + ./headers_staged/staged_queue, + "."/[blocks_unproc, db, headers_unproc] when enableTicker: import ./start_stop/ticker @@ -36,17 +38,18 @@ when enableTicker: dangling: ctx.layout.dangling, final: ctx.layout.final, head: ctx.layout.head, + headOk: ctx.layout.headLocked, target: ctx.target.consHead.number, targetOk: ctx.target.final != 0, nHdrStaged: ctx.headersStagedQueueLen(), - hdrStagedTop: ctx.headersStagedTopKey(), + hdrStagedTop: ctx.headersStagedQueueTopKey(), hdrUnprocTop: ctx.headersUnprocTop(), nHdrUnprocessed: ctx.headersUnprocTotal() + ctx.headersUnprocBorrowed(), nHdrUnprocFragm: ctx.headersUnprocChunks(), nBlkStaged: ctx.blocksStagedQueueLen(), - blkStagedBottom: ctx.blocksStagedBottomKey(), + blkStagedBottom: ctx.blocksStagedQueueBottomKey(), blkUnprocTop: ctx.blk.topRequest, nBlkUnprocessed: ctx.blocksUnprocTotal() + ctx.blocksUnprocBorrowed(), nBlkUnprocFragm: ctx.blocksUnprocChunks(), @@ -58,9 +61,12 @@ proc updateBeaconHeaderCB(ctx: BeaconCtxRef): ReqBeaconSyncTargetCB = ## Update beacon header. This function is intended as a call back function ## for the RPC module. return proc(h: Header; f: Hash32) {.gcsafe, raises: [].} = - # Rpc checks empty header against a zero hash rather than `emptyRoot` + # Check whether there is an update running (otherwise take next upate) if not ctx.target.locked: - if f != zeroHash32 and ctx.target.consHead.number < h.number: + # Rpc checks empty header against a zero hash rather than `emptyRoot` + if f != zeroHash32 and + ctx.layout.head < h.number and + ctx.target.consHead.number < h.number: ctx.target.consHead = h ctx.target.final = BlockNumber(0) ctx.target.finalHash = f @@ -86,17 +92,17 @@ else: # --------- -proc setupDatabase*(ctx: BeaconCtxRef) = +proc setupDatabase*(ctx: BeaconCtxRef; info: static[string]) = ## Initalise database related stuff # Initialise up queues and lists - ctx.headersStagedInit() - ctx.blocksStagedInit() + ctx.headersStagedQueueInit() + ctx.blocksStagedQueueInit() ctx.headersUnprocInit() ctx.blocksUnprocInit() # Load initial state from database if there is any - ctx.dbLoadSyncStateLayout() + ctx.dbLoadSyncStateLayout info # Set blocks batch import value for block import if ctx.pool.nBodiesBatch < nFetchBodiesRequest: diff --git a/nimbus/sync/beacon/worker/start_stop/ticker.nim b/nimbus/sync/beacon/worker/start_stop/ticker.nim index 9250d1137..7c7cf9010 100644 --- a/nimbus/sync/beacon/worker/start_stop/ticker.nim +++ b/nimbus/sync/beacon/worker/start_stop/ticker.nim @@ -33,6 +33,7 @@ type dangling*: BlockNumber final*: BlockNumber head*: BlockNumber + headOk*: bool target*: BlockNumber targetOk*: bool @@ -79,20 +80,25 @@ proc tickerLogger(t: TickerRef) {.gcsafe.} = B = if data.base == data.latest: "L" else: data.base.bnStr L = if data.latest == data.coupler: "C" else: data.latest.bnStr C = if data.coupler == data.dangling: "D" else: data.coupler.bnStr - D = if data.dangling == data.final: "F" else: data.dangling.bnStr + D = if data.dangling == data.final: "F" + elif data.dangling == data.head: "H" + else: data.dangling.bnStr F = if data.final == data.head: "H" else: data.final.bnStr - H = if data.head == data.target: "T" else: data.head.bnStr + H = if data.headOk: + if data.head == data.target: "T" else: data.head.bnStr + else: + if data.head == data.target: "?T" else: "?" & $data.head T = if data.targetOk: data.target.bnStr else: "?" & $data.target hS = if data.nHdrStaged == 0: "n/a" else: data.hdrStagedTop.bnStr & "(" & $data.nHdrStaged & ")" - hU = if data.nHdrUnprocFragm == 0: "n/a" + hU = if data.nHdrUnprocFragm == 0 and data.nHdrUnprocessed == 0: "n/a" else: data.hdrUnprocTop.bnStr & "(" & data.nHdrUnprocessed.toSI & "," & $data.nHdrUnprocFragm & ")" bS = if data.nBlkStaged == 0: "n/a" else: data.blkStagedBottom.bnStr & "(" & $data.nBlkStaged & ")" - bU = if data.nBlkUnprocFragm == 0: "n/a" + bU = if data.nBlkUnprocFragm == 0 and data.nBlkUnprocessed == 0: "n/a" else: data.blkUnprocTop.bnStr & "(" & data.nBlkUnprocessed.toSI & "," & $data.nBlkUnprocFragm & ")" @@ -121,12 +127,13 @@ proc tickerLogger(t: TickerRef) {.gcsafe.} = proc setLogTicker(t: TickerRef; at: Moment) {.gcsafe.} proc runLogTicker(t: TickerRef) {.gcsafe.} = - t.prettyPrint(t) - t.setLogTicker(Moment.fromNow(tickerLogInterval)) + if not t.statsCb.isNil: + t.prettyPrint(t) + t.setLogTicker(Moment.fromNow(tickerLogInterval)) proc setLogTicker(t: TickerRef; at: Moment) = if t.statsCb.isNil: - debug "Stopped", nBuddies=t.lastStats.nBuddies + debug "Ticker stopped" else: # Store the `runLogTicker()` in a closure to avoid some garbage collection # memory corruption issues that might occur otherwise. diff --git a/nimbus/sync/beacon/worker/update.nim b/nimbus/sync/beacon/worker/update.nim index aab532a6f..080509708 100644 --- a/nimbus/sync/beacon/worker/update.nim +++ b/nimbus/sync/beacon/worker/update.nim @@ -17,10 +17,8 @@ import ../../../core/chain, ../worker_desc, ./update/metrics, - "."/[blocks_unproc, db, headers_staged, headers_unproc] - -logScope: - topics = "beacon update" + ./headers_staged/staged_queue, + "."/[blocks_unproc, db, headers_unproc] # ------------------------------------------------------------------------------ # Private functions @@ -51,12 +49,13 @@ proc updateTargetChange(ctx: BeaconCtxRef; info: static[string]) = var target = ctx.target.consHead.number # Need: `H < T` and `C == D` - if target != 0 and target <= ctx.layout.head: # violates `H < T` - trace info & ": not applicable", H=ctx.layout.head.bnStr, T=target.bnStr + if target != 0 and target <= ctx.layout.head: # violates `H < T` + trace info & ": update not applicable", + H=ctx.layout.head.bnStr, T=target.bnStr return if ctx.layout.coupler != ctx.layout.dangling: # violates `C == D` - trace info & ": not applicable", + trace info & ": update not applicable", C=ctx.layout.coupler.bnStr, D=ctx.layout.dangling.bnStr return @@ -78,10 +77,10 @@ proc updateTargetChange(ctx: BeaconCtxRef; info: static[string]) = # Save this header on the database so it needs not be fetched again from # somewhere else. - ctx.dbStashHeaders(target, @[rlpHeader]) + ctx.dbStashHeaders(target, @[rlpHeader], info) # Save state - ctx.dbStoreSyncStateLayout() + ctx.dbStoreSyncStateLayout info # Update range doAssert ctx.headersUnprocTotal() == 0 @@ -89,10 +88,13 @@ proc updateTargetChange(ctx: BeaconCtxRef; info: static[string]) = doAssert ctx.headersStagedQueueIsEmpty() ctx.headersUnprocSet(ctx.layout.coupler+1, ctx.layout.dangling-1) - trace info & ": updated", C=ctx.layout.coupler.bnStr, + trace info & ": updated sync state", C=ctx.layout.coupler.bnStr, uTop=ctx.headersUnprocTop(), D=ctx.layout.dangling.bnStr, H=ctx.layout.head.bnStr, T=target.bnStr + # Update, so it can be followed nicely + ctx.updateMetrics() + proc mergeAdjacentChains(ctx: BeaconCtxRef; info: static[string]) = ## Merge if `C+1` == `D` @@ -110,7 +112,7 @@ proc mergeAdjacentChains(ctx: BeaconCtxRef; info: static[string]) = raiseAssert info & ": hashes do not match" & " C=" & ctx.layout.coupler.bnStr & " D=" & $ctx.layout.dangling.bnStr - trace info & ": merging", C=ctx.layout.coupler.bnStr, + trace info & ": merging adjacent chains", C=ctx.layout.coupler.bnStr, D=ctx.layout.dangling.bnStr # Merge adjacent linked chains @@ -126,7 +128,10 @@ proc mergeAdjacentChains(ctx: BeaconCtxRef; info: static[string]) = headLocked: ctx.layout.headLocked) # Save state - ctx.dbStoreSyncStateLayout() + ctx.dbStoreSyncStateLayout info + + # Update, so it can be followed nicely + ctx.updateMetrics() # ------------------------------------------------------------------------------ # Public functions @@ -145,8 +150,11 @@ proc updateSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) = doAssert ctx.layout.head == latest ctx.layout.headLocked = false + # Check whether there is something to do regarding beacon node change - if not ctx.layout.headLocked and ctx.target.changed and ctx.target.final != 0: + if not ctx.layout.headLocked and # there was an active import request + ctx.target.changed and # and there is a new target from CL + ctx.target.final != 0: # .. ditto ctx.target.changed = false ctx.updateTargetChange info @@ -162,20 +170,13 @@ proc updateBlockRequests*(ctx: BeaconCtxRef; info: static[string]) = # One can fill/import/execute blocks by number from `(L,C]` if ctx.blk.topRequest < ctx.layout.coupler: # So there is some space - trace info & ": updating", L=latest.bnStr, + trace info & ": updating block requests", L=latest.bnStr, topReq=ctx.blk.topRequest.bnStr, C=ctx.layout.coupler.bnStr ctx.blocksUnprocCommit( 0, max(latest, ctx.blk.topRequest) + 1, ctx.layout.coupler) ctx.blk.topRequest = ctx.layout.coupler - -proc updateMetrics*(ctx: BeaconCtxRef) = - let now = Moment.now() - if ctx.pool.nextUpdate < now: - ctx.updateMetricsImpl() - ctx.pool.nextUpdate = now + metricsUpdateInterval - # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/beacon/worker/update/metrics.nim b/nimbus/sync/beacon/worker/update/metrics.nim index 5012defad..60d72274c 100644 --- a/nimbus/sync/beacon/worker/update/metrics.nim +++ b/nimbus/sync/beacon/worker/update/metrics.nim @@ -11,10 +11,12 @@ {.push raises:[].} import - pkg/metrics, + pkg/[chronos, metrics], ../../../../core/chain, ../../worker_desc, - ".."/[blocks_staged, headers_staged] + ../blocks_staged/staged_queue, + ../headers_staged/staged_queue, + ".."/[blocks_unproc, headers_unproc] declareGauge beacon_base, "" & "Max block number of imported finalised blocks" @@ -55,7 +57,7 @@ declareGauge beacon_buddies, "" & "Number of currently active worker instances" -template updateMetricsImpl*(ctx: BeaconCtxRef) = +template updateMetricsImpl(ctx: BeaconCtxRef) = metrics.set(beacon_base, ctx.chain.baseNumber().int64) metrics.set(beacon_latest, ctx.chain.latestNumber().int64) metrics.set(beacon_coupler, ctx.layout.coupler.int64) @@ -74,4 +76,12 @@ template updateMetricsImpl*(ctx: BeaconCtxRef) = metrics.set(beacon_buddies, ctx.pool.nBuddies) +# --------------- + +proc updateMetrics*(ctx: BeaconCtxRef) = + let now = Moment.now() + if ctx.pool.nextUpdate < now: + ctx.updateMetricsImpl() + ctx.pool.nextUpdate = now + metricsUpdateInterval + # End diff --git a/nimbus/sync/beacon/worker_config.nim b/nimbus/sync/beacon/worker_config.nim index 6a616332c..cd1aa80f7 100644 --- a/nimbus/sync/beacon/worker_config.nim +++ b/nimbus/sync/beacon/worker_config.nim @@ -14,7 +14,7 @@ import pkg/chronos const - enableTicker* = true + enableTicker* = false ## Log regular status updates similar to metrics. Great for debugging. runsThisManyPeersOnly* = 8 @@ -43,7 +43,7 @@ const workerIdleWaitInterval* = chronos.seconds(10) ## Sleep some time in multi-mode if there is nothing to do - asyncThreadSwitchTimeSlot* = chronos.nanoseconds(10) + asyncThreadSwitchTimeSlot* = chronos.nanoseconds(1) ## Nano-sleep to allows pseudo/async thread switch # ---------------------- diff --git a/nimbus/sync/beacon/worker_desc.nim b/nimbus/sync/beacon/worker_desc.nim index d462f107a..8251f3149 100644 --- a/nimbus/sync/beacon/worker_desc.nim +++ b/nimbus/sync/beacon/worker_desc.nim @@ -138,7 +138,7 @@ type nBodiesBatch*: int ## Default `nFetchBodiesBatchDefault` blocksStagedQuLenMax*: int ## Default `blocksStagedQueueLenMaxDefault` - # Info stuff, no functional contribution + # Info & debugging stuff, no functional contribution nReorg*: int ## Number of reorg invocations (info only) # Debugging stuff diff --git a/nimbus/sync/sync_desc.nim b/nimbus/sync/sync_desc.nim index 8cae2da1b..236d795f0 100644 --- a/nimbus/sync/sync_desc.nim +++ b/nimbus/sync/sync_desc.nim @@ -38,7 +38,6 @@ type CtxRef*[S] = ref object ## Shared state among all syncing peer workers (aka buddies.) - buddiesMax*: int ## Max number of buddies poolMode*: bool ## Activate `runPool()` workers if set `true` daemon*: bool ## Enable global background job pool*: S ## Shared context for all worker peers diff --git a/nimbus/sync/sync_sched.nim b/nimbus/sync/sync_sched.nim index a51e733a4..d6a0011ea 100644 --- a/nimbus/sync/sync_sched.nim +++ b/nimbus/sync/sync_sched.nim @@ -63,6 +63,16 @@ ## These peer worker methods run concurrently in `async` mode. ## ## +## These are the control variables that can be set from within the above +## listed method/interface functions. +## +## *buddy.ctx.poolMode* +## Activate `runPool()` workers loop if set `true` (default is `false`.) +## +## *buddy.ctx.daemon* +## Activate `runDaemon()` background job if set `true`(default is `false`.) +## +## ## Additional import files needed when using this template: ## * eth/[common, p2p] ## * chronicles @@ -84,21 +94,28 @@ type ## List of active workers, using `Hash(Peer)` rather than `Peer` KeyedQueue[ENode,RunnerBuddyRef[S,W]] + RunCtrl = enum + terminated = 0 + shutdown + running + RunnerSyncRef*[S,W] = ref object ## Module descriptor ctx*: CtxRef[S] ## Shared data pool: PeerPool ## For starting the system + buddiesMax: int ## Max number of buddies buddies: ActiveBuddies[S,W] ## LRU cache with worker descriptors - daemonRunning: bool ## Run global background job - monitorLock: bool ## Monitor mode is activated - activeMulti: int ## Number of activated runners in multi-mode - shutdown: bool ## Internal shut down flag + daemonRunning: bool ## Running background job (in async mode) + monitorLock: bool ## Monitor mode is activated (non-async mode) + activeMulti: int ## Number of async workers active/running + runCtrl: RunCtrl ## Start/stop control RunnerBuddyRef[S,W] = ref object ## Per worker peer descriptor dsc: RunnerSyncRef[S,W] ## Scheduler descriptor worker: BuddyRef[S,W] ## Worker peer data - zombified: Moment ## When it became undead (if any) + zombified: Moment ## Time when it became undead (if any) + isRunning: bool ## Peer worker is active (in async mode) const zombieTimeToLinger = 20.seconds @@ -119,6 +136,9 @@ const execPoolModeLoopMax = 100 ## Avoids continuous looping + termWaitPollingTime = 10.milliseconds + ## Wait for instance to have terminated for shutdown + # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ @@ -140,10 +160,48 @@ proc key(peer: Peer): ENode = # Private functions # ------------------------------------------------------------------------------ +proc terminate[S,W](dsc: RunnerSyncRef[S,W]) = + ## Reqest termination and wait + mixin runRelease + + if dsc.runCtrl == running: + # Gracefully shut down async services + dsc.runCtrl = shutdown + dsc.ctx.daemon = false + + # Wait for workers and daemon to have terminated + while 0 < dsc.buddies.len: + for w in dsc.buddies.nextPairs: + if w.data.isRunning: + w.data.worker.ctrl.stopped = true + # Activate async job so it can finish + try: waitFor sleepAsync termWaitPollingTime + except CancelledError: discard + else: + dsc.buddies.del w.key # this is OK to delete + + while dsc.daemonRunning: + # Activate async job so it can finish + try: waitFor sleepAsync termWaitPollingTime + except CancelledError: discard + + # Final shutdown + dsc.ctx.runRelease() + + # Remove call back from pool manager. This comes last as it will + # potentially unlink references which are used in the worker instances + # (e.g. peer for logging.) + dsc.pool.delObserver(dsc) + + # Clean up, free memory from sub-objects + dsc.ctx = CtxRef[S]() + dsc.runCtrl = terminated + + proc daemonLoop[S,W](dsc: RunnerSyncRef[S,W]) {.async.} = mixin runDaemon - if dsc.ctx.daemon and not dsc.shutdown: + if dsc.ctx.daemon and dsc.runCtrl == running: dsc.daemonRunning = true # Continue until stopped @@ -178,7 +236,15 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} = # Continue until stopped block taskExecLoop: - while worker.ctrl.running and not dsc.shutdown: + buddy.isRunning = true + + proc isShutdown(): bool = + dsc.runCtrl != running + + proc isActive(): bool = + worker.ctrl.running and not isShutdown() + + while isActive(): # Enforce minimum time spend on this loop let startMoment = Moment.now() @@ -192,7 +258,7 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} = dsc.monitorLock = true while 0 < dsc.activeMulti: await sleepAsync execLoopPollingTime - if worker.ctrl.stopped: + if not isActive(): dsc.monitorLock = false break taskExecLoop @@ -209,6 +275,10 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} = else: delayed = nil # not executing any final item break # `true` => stop + # Shutdown in progress? + if isShutdown(): + dsc.monitorLock = false + break taskExecLoop if not delayed.isNil: discard delayed.runPool(last=true, laps=count) # final item if not ctx.poolMode: @@ -221,17 +291,22 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} = # end. So zombies will end up leftish. discard dsc.buddies.lruFetch peer.key - # Peer mode + # Peer worker in async mode dsc.activeMulti.inc # Continue doing something, work a bit await worker.runPeer() dsc.activeMulti.dec + # Check for shutdown + if isShutdown(): + worker.ctrl.stopped = true + break taskExecLoop + # Dispatch daemon sevice if needed if not dsc.daemonRunning and dsc.ctx.daemon: asyncSpawn dsc.daemonLoop() - # Check for termination + # Check for worker termination if worker.ctrl.stopped: break taskExecLoop @@ -245,17 +320,20 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} = # End while # Note that `runStart()` was dispatched in `onPeerConnected()` - if worker.ctrl.running: - # So shutdown was called - worker.ctrl.stopped = true worker.runStop() + buddy.isRunning = false proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = mixin runStart, runStop + + # Ignore if shutdown is processing + if dsc.runCtrl != running: + return + # Check for known entry (which should not exist.) let - maxWorkers {.used.} = dsc.ctx.buddiesMax + maxWorkers {.used.} = dsc.buddiesMax nPeers {.used.} = dsc.pool.len zombie = dsc.buddies.eq peer.key if zombie.isOk: @@ -290,7 +368,7 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = # # In the past, one could not rely on the peer pool for having the number of # connections limited. - if dsc.ctx.buddiesMax <= dsc.buddies.len: + if dsc.buddiesMax <= dsc.buddies.len: let leastVal = dsc.buddies.shift.value # unqueue first/least item oldest = leastVal.data.worker @@ -310,10 +388,7 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = oldest.ctrl.zombie = true # Add peer entry - discard dsc.buddies.lruAppend(peer.key, buddy, dsc.ctx.buddiesMax) - - trace "Running peer worker", peer, nPeers, - nWorkers=dsc.buddies.len, maxWorkers + discard dsc.buddies.lruAppend(peer.key, buddy, dsc.buddiesMax) asyncSpawn buddy.workerLoop() @@ -321,7 +396,7 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = proc onPeerDisconnected[S,W](dsc: RunnerSyncRef[S,W], peer: Peer) = let nPeers = dsc.pool.len - maxWorkers = dsc.ctx.buddiesMax + maxWorkers = dsc.buddiesMax nWorkers = dsc.buddies.len rc = dsc.buddies.eq peer.key if rc.isErr: @@ -343,8 +418,6 @@ proc onPeerDisconnected[S,W](dsc: RunnerSyncRef[S,W], peer: Peer) = else: rc.value.worker.ctrl.stopped = true # in case it is hanging somewhere dsc.buddies.del peer.key - trace "Disconnected buddy", peer, nPeers, - nWorkers=dsc.buddies.len, maxWorkers # ------------------------------------------------------------------------------ # Public functions @@ -356,47 +429,45 @@ proc initSync*[S,W]( slots: int; ) = ## Constructor - # Leave one extra slot so that it can holds a *zombie* even if all slots # are full. The effect is that a re-connect on the latest zombie will be # rejected as long as its worker descriptor is registered. - dsc.ctx = CtxRef[S](buddiesMax: max(1, slots + 1)) - + dsc.buddiesMax = max(1, slots + 1) dsc.pool = node.peerPool - dsc.buddies.init(dsc.ctx.buddiesMax) + dsc.buddies.init(dsc.buddiesMax) + dsc.ctx = CtxRef[S]() + proc startSync*[S,W](dsc: RunnerSyncRef[S,W]): bool = ## Set up `PeerObserver` handlers and start syncing. mixin runSetup - # Initialise sub-systems - if dsc.ctx.runSetup(): - var po = PeerObserver( - onPeerConnected: - proc(p: Peer) {.gcsafe.} = + + if dsc.runCtrl == terminated: + # Initialise sub-systems + if dsc.ctx.runSetup(): + dsc.runCtrl = running + + var po = PeerObserver( + onPeerConnected: proc(p: Peer) {.gcsafe.} = dsc.onPeerConnected(p), - onPeerDisconnected: - proc(p: Peer) {.gcsafe.} = + onPeerDisconnected: proc(p: Peer) {.gcsafe.} = dsc.onPeerDisconnected(p)) - po.setProtocol eth - dsc.pool.addObserver(dsc, po) - if dsc.ctx.daemon: - asyncSpawn dsc.daemonLoop() - return true + po.setProtocol eth + dsc.pool.addObserver(dsc, po) + if dsc.ctx.daemon: + asyncSpawn dsc.daemonLoop() + return true + proc stopSync*[S,W](dsc: RunnerSyncRef[S,W]) = ## Stop syncing and free peer handlers . - mixin runRelease - dsc.pool.delObserver(dsc) + dsc.terminate() - # Gracefully shut down async services - dsc.shutdown = true - for buddy in dsc.buddies.nextValues: - buddy.worker.ctrl.stopped = true - dsc.ctx.daemon = false - # Final shutdown (note that some workers might still linger on) - dsc.ctx.runRelease() +proc isRunning*[S,W](dsc: RunnerSyncRef[S,W]): bool = + ## Check start/stop state + dsc.runCtrl == running # ------------------------------------------------------------------------------ # End