From d55a72ae49ea31985e81585c98e550384110c033 Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Sun, 18 Dec 2022 16:06:43 +0000 Subject: [PATCH] Full sync peer negotiation control (#1390) * Additional logging for scheduler * Fix duplicate occurrence of `bestNumber` why: Happened when the `block_queue` module was separated out of the `worker` module. Somehow testing was insufficient or skipped, at all. * Update `runPool()` mixin for scheduler details: Could be simplified * Dynamically adapt pivot header negotiation mode details: After accepting one peer and some timeout, do not search for more peers for start syncing but rather continue in relaxed mode with a single peer. --- nimbus/sync/full.nim | 65 ++++++++-- nimbus/sync/full/worker.nim | 203 +++++++++++++++++++++++++------ nimbus/sync/misc/best_pivot.nim | 20 +++ nimbus/sync/misc/block_queue.nim | 13 ++ 4 files changed, 259 insertions(+), 42 deletions(-) diff --git a/nimbus/sync/full.nim b/nimbus/sync/full.nim index 58c409b78..a7d59c584 100644 --- a/nimbus/sync/full.nim +++ b/nimbus/sync/full.nim @@ -23,33 +23,82 @@ logScope: type FullSyncRef* = RunnerSyncRef[CtxData,BuddyData] +const + extraTraceMessages = false # or true + ## Enable additional logging noise + +# ------------------------------------------------------------------------------ +# Private logging helpers +# ------------------------------------------------------------------------------ + +template traceMsg(f, info: static[string]; args: varargs[untyped]) = + trace "Snap scheduler " & f & "() " & info, args + +template traceMsgCtx(f, info: static[string]; c: FullCtxRef) = + when extraTraceMessages: + block: + let + poolMode {.inject.} = c.poolMode + daemon {.inject.} = c.daemon + f.traceMsg info, poolMode, daemon + +template traceMsgBuddy(f, info: static[string]; b: FullBuddyRef) = + when extraTraceMessages: + block: + let + peer {.inject.} = b.peer + runState {.inject.} = b.ctrl.state + multiOk {.inject.} = b.ctrl.multiOk + poolMode {.inject.} = b.ctx.poolMode + daemon {.inject.} = b.ctx.daemon + f.traceMsg info, peer, runState, multiOk, poolMode, daemon + + +template tracerFrameCtx(f: static[string]; c: FullCtxRef; code: untyped) = + f.traceMsgCtx "begin", c + code + f.traceMsgCtx "end", c + +template tracerFrameBuddy(f: static[string]; b: FullBuddyRef; code: untyped) = + f.traceMsgBuddy "begin", b + code + f.traceMsgBuddy "end", b + # ------------------------------------------------------------------------------ # Virtual methods/interface, `mixin` functions # ------------------------------------------------------------------------------ proc runSetup(ctx: FullCtxRef; ticker: bool): bool = - worker.setup(ctx,ticker) + tracerFrameCtx("runSetup", ctx): + result = worker.setup(ctx,ticker) proc runRelease(ctx: FullCtxRef) = - worker.release(ctx) + tracerFrameCtx("runRelease", ctx): + worker.release(ctx) proc runDaemon(ctx: FullCtxRef) {.async.} = - discard + tracerFrameCtx("runDaemon", ctx): + await worker.runDaemon(ctx) proc runStart(buddy: FullBuddyRef): bool = - worker.start(buddy) + tracerFrameBuddy("runStart", buddy): + result = worker.start(buddy) proc runStop(buddy: FullBuddyRef) = - worker.stop(buddy) + tracerFrameBuddy("runStop", buddy): + worker.stop(buddy) proc runPool(buddy: FullBuddyRef; last: bool): bool = - worker.runPool(buddy, last) + tracerFrameBuddy("runPool", buddy): + result = worker.runPool(buddy, last) proc runSingle(buddy: FullBuddyRef) {.async.} = - await worker.runSingle(buddy) + tracerFrameBuddy("runSingle", buddy): + await worker.runSingle(buddy) proc runMulti(buddy: FullBuddyRef) {.async.} = - await worker.runMulti(buddy) + tracerFrameBuddy("runMulti", buddy): + await worker.runMulti(buddy) # ------------------------------------------------------------------------------ # Public functions diff --git a/nimbus/sync/full/worker.nim b/nimbus/sync/full/worker.nim index b95886bb9..369a802ba 100644 --- a/nimbus/sync/full/worker.nim +++ b/nimbus/sync/full/worker.nim @@ -20,29 +20,48 @@ import {.push raises:[Defect].} logScope: - topics = "full-sync" + topics = "full-buddy" type + PivotState = enum + PivotStateInitial, ## Initial state + FirstPivotSeen, ## Starting, first pivot seen + FirstPivotAccepted, ## Accepted, waiting for second + FirstPivotUseRegardless ## Force pivot if available + PivotRunMode ## SNAFU after some magic + BuddyData* = object ## Local descriptor data extension pivot: BestPivotWorkerRef ## Local pivot worker descriptor bQueue: BlockQueueWorkerRef ## Block queue worker - bestNumber: Option[BlockNumber] ## Largest block number reported CtxData* = object ## Globally shared data extension rng*: ref HmacDrbgContext ## Random generator, pre-initialised pivot: BestPivotCtxRef ## Global pivot descriptor + pivotState: PivotState ## For initial pivot control + pivotStamp: Moment ## `PivotState` driven timing control bCtx: BlockQueueCtxRef ## Global block queue descriptor ticker: TickerRef ## Logger ticker - FullBuddyRef* = ##\ + FullBuddyRef* = BuddyRef[CtxData,BuddyData] ## Extended worker peer descriptor - BuddyRef[CtxData,BuddyData] - FullCtxRef* = ##\ + FullCtxRef* = CtxRef[CtxData] ## Extended global descriptor - CtxRef[CtxData] + +const + extraTraceMessages = false # or true + ## Enabled additional logging noise + + FirstPivotSeenTimeout = 3.minutes + ## Turn on relaxed pivot negotiation after some waiting time when there + ## was a `peer` seen but was rejected. This covers a rare event. Typically + ## useless peers do not appear ready for negotiation. + + FirstPivotAcceptedTimeout = 50.seconds + ## Turn on relaxed pivot negotiation after some waiting time when there + ## was a `peer` accepted but no second one yet. # ------------------------------------------------------------------------------ # Private helpers @@ -210,6 +229,59 @@ proc stop*(buddy: FullBuddyRef) = # Public functions # ------------------------------------------------------------------------------ +proc runDaemon*(ctx: FullCtxRef) {.async.} = + ## Global background job that will be re-started as long as the variable + ## `ctx.daemon` is set `true`. If that job was stopped due to re-setting + ## `ctx.daemon` to `false`, it will be restarted next after it was reset + ## as `true` not before there is some activity on the `runPool()`, + ## `runSingle()`, or `runMulti()` functions. + ## + case ctx.data.pivotState: + of FirstPivotSeen: + let elapsed = Moment.now() - ctx.data.pivotStamp + if FirstPivotSeenTimeout < elapsed: + # Switch to single peer pivot negotiation + ctx.data.pivot.pivotRelaxedMode(enable = true) + + # Currently no need for other monitor tasks + ctx.daemon = false + + when extraTraceMessages: + trace "First seen pivot timeout", elapsed, + pivotState=ctx.data.pivotState + return + # Otherwise delay for some time + + of FirstPivotAccepted: + let elapsed = Moment.now() - ctx.data.pivotStamp + if FirstPivotAcceptedTimeout < elapsed: + # Switch to single peer pivot negotiation + ctx.data.pivot.pivotRelaxedMode(enable = true) + + # Use currents pivot next time `runSingle()` is visited. This bent is + # necessary as there must be a peer initialising and syncing blocks. But + # this daemon has no peer assigned. + ctx.data.pivotState = FirstPivotUseRegardless + + # Currently no need for other monitor tasks + ctx.daemon = false + + when extraTraceMessages: + trace "First accepted pivot timeout", elapsed, + pivotState=ctx.data.pivotState + return + # Otherwise delay for some time + + else: + # Currently no need for other monitior tasks + ctx.daemon = false + return + + # Without waiting, this function repeats every 50ms (as set with the constant + # `sync_sched.execLoopTimeElapsedMin`.) Larger waiting time cleans up logging. + await sleepAsync 300.milliseconds + + proc runSingle*(buddy: FullBuddyRef) {.async.} = ## This peer worker is invoked if the peer-local flag `buddy.ctrl.multiOk` ## is set `false` which is the default mode. This flag is updated by the @@ -226,35 +298,99 @@ proc runSingle*(buddy: FullBuddyRef) {.async.} = ctx = buddy.ctx peer = buddy.peer bq = buddy.data.bQueue + pv = buddy.data.pivot - if bq.blockQueueBacktrackOk: - let rc = await bq.blockQueueBacktrackWorker() - if rc.isOk: + when extraTraceMessages: + trace "Single mode begin", peer, pivotState=ctx.data.pivotState - # Update persistent database (may reset `multiOk`) - buddy.ctrl.multiOk = true - while buddy.processStaged() and not buddy.ctrl.stopped: - # Allow thread switch as `persistBlocks()` might be slow - await sleepAsync(10.milliseconds) - return + case ctx.data.pivotState: + of PivotStateInitial: + # Set initial state on first encounter + ctx.data.pivotState = FirstPivotSeen + ctx.data.pivotStamp = Moment.now() + ctx.daemon = true # Start monitor - buddy.ctrl.zombie = true + of FirstPivotSeen, FirstPivotAccepted: + discard - # Initialise/re-initialise this worker - elif await buddy.data.pivot.pivotNegotiate(buddy.data.bestNumber): + of FirstPivotUseRegardless: + # Magic case when we accept anything under the sun + let rc = pv.pivotHeader(relaxedMode=true) + if rc.isOK: + # Update/activate `bestNumber` from the pivot header + bq.bestNumber = some(rc.value.blockNumber) + ctx.data.pivotState = PivotRunMode + buddy.ctrl.multiOk = true + trace "Single pivot accepted", peer, pivot=('#' & $bq.bestNumber.get) + return # stop logging, otherwise unconditional return for this case + + when extraTraceMessages: + trace "Single mode stopped", peer, pivotState=ctx.data.pivotState + return # unconditional return for this case + + of PivotRunMode: + # Sync backtrack runs in single mode + if bq.blockQueueBacktrackOk: + let rc = await bq.blockQueueBacktrackWorker() + if rc.isOk: + # Update persistent database (may reset `multiOk`) + buddy.ctrl.multiOk = true + while buddy.processStaged() and not buddy.ctrl.stopped: + # Allow thread switch as `persistBlocks()` might be slow + await sleepAsync(10.milliseconds) + when extraTraceMessages: + trace "Single backtrack mode done", peer + return + + buddy.ctrl.zombie = true + + when extraTraceMessages: + trace "Single backtrack mode stopped", peer + return + # End case() + + # Negotiate in order to derive the pivot header from this `peer`. This code + # location here is reached when there was no compelling reason for the + # `case()` handler to process and `return`. + if await pv.pivotNegotiate(buddy.data.bQueue.bestNumber): + # Update/activate `bestNumber` from the pivot header + bq.bestNumber = some(pv.pivotHeader.value.blockNumber) + ctx.data.pivotState = PivotRunMode buddy.ctrl.multiOk = true - # Update/activate `bestNumber` for local use - buddy.data.bestNumber = - some(buddy.data.pivot.pivotHeader.value.blockNumber) + trace "Pivot accepted", peer, pivot=('#' & $bq.bestNumber.get) + return - elif not buddy.ctrl.stopped: - await sleepAsync(2.seconds) + if buddy.ctrl.stopped: + when extraTraceMessages: + trace "Single mode stopped", peer, pivotState=ctx.data.pivotState + return # done with this buddy + + var napping = 2.seconds + case ctx.data.pivotState: + of FirstPivotSeen: + # Possible state transition + if pv.pivotHeader(relaxedMode=true).isOk: + ctx.data.pivotState = FirstPivotAccepted + ctx.data.pivotStamp = Moment.now() + napping = 300.milliseconds + of FirstPivotAccepted: + napping = 300.milliseconds + else: + discard + + when extraTraceMessages: + trace "Single mode end", peer, pivotState=ctx.data.pivotState, napping + + # Without waiting, this function repeats every 50ms (as set with the constant + # `sync_sched.execLoopTimeElapsedMin`.) + await sleepAsync napping -proc runPool*(buddy: FullBuddyRef; last: bool) = - ## Ocne started, the function `runPool()` is called for all worker peers in - ## a row (as the body of an iteration.) There will be no other worker peer - ## functions activated simultaneously. +proc runPool*(buddy: FullBuddyRef; last: bool): bool = + ## Once started, the function `runPool()` is called for all worker peers in + ## sequence as the body of an iteration as long as the function returns + ## `false`. There will be no other worker peer functions activated + ## simultaneously. ## ## This procedure is started if the global flag `buddy.ctx.poolMode` is set ## `true` (default is `false`.) It is the responsibility of the `runPool()` @@ -265,13 +401,12 @@ proc runPool*(buddy: FullBuddyRef; last: bool) = ## ## Note that this function does not run in `async` mode. ## - let - ctx = buddy.ctx - bq = buddy.data.bQueue - if ctx.poolMode: - # Mind the gap, fill in if necessary - bq.blockQueueGrout() - ctx.poolMode = false + # Mind the gap, fill in if necessary (function is peer independent) + buddy.data.bQueue.blockQueueGrout() + + # Stop after running once regardless of peer + buddy.ctx.poolMode = false + true proc runMulti*(buddy: FullBuddyRef) {.async.} = diff --git a/nimbus/sync/misc/best_pivot.nim b/nimbus/sync/misc/best_pivot.nim index 5cc12c7ee..904d81bda 100644 --- a/nimbus/sync/misc/best_pivot.nim +++ b/nimbus/sync/misc/best_pivot.nim @@ -279,6 +279,26 @@ proc pivotHeader*(bp: BestPivotWorkerRef): Result[BlockHeader,void] = err() +proc pivotHeader*( + bp: BestPivotWorkerRef; ## Worker peer + relaxedMode: bool; ## One time relaxed mode flag + ): Result[BlockHeader,void] = + ## Variant of `pivotHeader()` with `relaxedMode` flag as function argument. + if bp.header.isSome and + bp.peer notin bp.global.untrusted: + + if pivotMinPeersToStartSync <= bp.global.trusted.len and + bp.peer in bp.global.trusted: + return ok(bp.header.unsafeGet) + + if relaxedMode: + when extraTraceMessages: + trace "Returning not fully trusted pivot", peer=bp.peer, + trusted=bp.global.trusted.len, untrusted=bp.global.untrusted.len + return ok(bp.header.unsafeGet) + + err() + proc pivotNegotiate*( bp: BestPivotWorkerRef; ## Worker peer minBlockNumber: Option[BlockNumber]; ## Minimum block number to expect diff --git a/nimbus/sync/misc/block_queue.nim b/nimbus/sync/misc/block_queue.nim index 68c319f75..6b63f6a56 100644 --- a/nimbus/sync/misc/block_queue.nim +++ b/nimbus/sync/misc/block_queue.nim @@ -471,6 +471,19 @@ proc init*( peer: peer, ctrl: ctrl) +# ------------------------------------------------------------------------------ +# Public functions -- getter/setter +# ------------------------------------------------------------------------------ + +proc bestNumber*(qd: BlockQueueWorkerRef): Option[BlockNumber] = + ## Getter + qd.bestNumber + +proc `bestNumber=`*(qd: BlockQueueWorkerRef; val: Option[BlockNumber]) = + ## Setter, needs to be set to something valid so that `blockQueueWorker()` + ## does something useful. + qd.bestNumber = val + # ------------------------------------------------------------------------------ # Public functions -- synchronous # ------------------------------------------------------------------------------