Full sync peer negotiation control (#1390)
* Additional logging for scheduler * Fix duplicate occurrence of `bestNumber` why: Happened when the `block_queue` module was separated out of the `worker` module. Somehow testing was insufficient or skipped, at all. * Update `runPool()` mixin for scheduler details: Could be simplified * Dynamically adapt pivot header negotiation mode details: After accepting one peer and some timeout, do not search for more peers for start syncing but rather continue in relaxed mode with a single peer.
This commit is contained in:
parent
69204fffe9
commit
d55a72ae49
|
@ -23,33 +23,82 @@ logScope:
|
|||
type
|
||||
FullSyncRef* = RunnerSyncRef[CtxData,BuddyData]
|
||||
|
||||
const
|
||||
extraTraceMessages = false # or true
|
||||
## Enable additional logging noise
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private logging helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
template traceMsg(f, info: static[string]; args: varargs[untyped]) =
|
||||
trace "Snap scheduler " & f & "() " & info, args
|
||||
|
||||
template traceMsgCtx(f, info: static[string]; c: FullCtxRef) =
|
||||
when extraTraceMessages:
|
||||
block:
|
||||
let
|
||||
poolMode {.inject.} = c.poolMode
|
||||
daemon {.inject.} = c.daemon
|
||||
f.traceMsg info, poolMode, daemon
|
||||
|
||||
template traceMsgBuddy(f, info: static[string]; b: FullBuddyRef) =
|
||||
when extraTraceMessages:
|
||||
block:
|
||||
let
|
||||
peer {.inject.} = b.peer
|
||||
runState {.inject.} = b.ctrl.state
|
||||
multiOk {.inject.} = b.ctrl.multiOk
|
||||
poolMode {.inject.} = b.ctx.poolMode
|
||||
daemon {.inject.} = b.ctx.daemon
|
||||
f.traceMsg info, peer, runState, multiOk, poolMode, daemon
|
||||
|
||||
|
||||
template tracerFrameCtx(f: static[string]; c: FullCtxRef; code: untyped) =
|
||||
f.traceMsgCtx "begin", c
|
||||
code
|
||||
f.traceMsgCtx "end", c
|
||||
|
||||
template tracerFrameBuddy(f: static[string]; b: FullBuddyRef; code: untyped) =
|
||||
f.traceMsgBuddy "begin", b
|
||||
code
|
||||
f.traceMsgBuddy "end", b
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Virtual methods/interface, `mixin` functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc runSetup(ctx: FullCtxRef; ticker: bool): bool =
|
||||
worker.setup(ctx,ticker)
|
||||
tracerFrameCtx("runSetup", ctx):
|
||||
result = worker.setup(ctx,ticker)
|
||||
|
||||
proc runRelease(ctx: FullCtxRef) =
|
||||
worker.release(ctx)
|
||||
tracerFrameCtx("runRelease", ctx):
|
||||
worker.release(ctx)
|
||||
|
||||
proc runDaemon(ctx: FullCtxRef) {.async.} =
|
||||
discard
|
||||
tracerFrameCtx("runDaemon", ctx):
|
||||
await worker.runDaemon(ctx)
|
||||
|
||||
proc runStart(buddy: FullBuddyRef): bool =
|
||||
worker.start(buddy)
|
||||
tracerFrameBuddy("runStart", buddy):
|
||||
result = worker.start(buddy)
|
||||
|
||||
proc runStop(buddy: FullBuddyRef) =
|
||||
worker.stop(buddy)
|
||||
tracerFrameBuddy("runStop", buddy):
|
||||
worker.stop(buddy)
|
||||
|
||||
proc runPool(buddy: FullBuddyRef; last: bool): bool =
|
||||
worker.runPool(buddy, last)
|
||||
tracerFrameBuddy("runPool", buddy):
|
||||
result = worker.runPool(buddy, last)
|
||||
|
||||
proc runSingle(buddy: FullBuddyRef) {.async.} =
|
||||
await worker.runSingle(buddy)
|
||||
tracerFrameBuddy("runSingle", buddy):
|
||||
await worker.runSingle(buddy)
|
||||
|
||||
proc runMulti(buddy: FullBuddyRef) {.async.} =
|
||||
await worker.runMulti(buddy)
|
||||
tracerFrameBuddy("runMulti", buddy):
|
||||
await worker.runMulti(buddy)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
|
|
|
@ -20,29 +20,48 @@ import
|
|||
{.push raises:[Defect].}
|
||||
|
||||
logScope:
|
||||
topics = "full-sync"
|
||||
topics = "full-buddy"
|
||||
|
||||
type
|
||||
PivotState = enum
|
||||
PivotStateInitial, ## Initial state
|
||||
FirstPivotSeen, ## Starting, first pivot seen
|
||||
FirstPivotAccepted, ## Accepted, waiting for second
|
||||
FirstPivotUseRegardless ## Force pivot if available
|
||||
PivotRunMode ## SNAFU after some magic
|
||||
|
||||
BuddyData* = object
|
||||
## Local descriptor data extension
|
||||
pivot: BestPivotWorkerRef ## Local pivot worker descriptor
|
||||
bQueue: BlockQueueWorkerRef ## Block queue worker
|
||||
bestNumber: Option[BlockNumber] ## Largest block number reported
|
||||
|
||||
CtxData* = object
|
||||
## Globally shared data extension
|
||||
rng*: ref HmacDrbgContext ## Random generator, pre-initialised
|
||||
pivot: BestPivotCtxRef ## Global pivot descriptor
|
||||
pivotState: PivotState ## For initial pivot control
|
||||
pivotStamp: Moment ## `PivotState` driven timing control
|
||||
bCtx: BlockQueueCtxRef ## Global block queue descriptor
|
||||
ticker: TickerRef ## Logger ticker
|
||||
|
||||
FullBuddyRef* = ##\
|
||||
FullBuddyRef* = BuddyRef[CtxData,BuddyData]
|
||||
## Extended worker peer descriptor
|
||||
BuddyRef[CtxData,BuddyData]
|
||||
|
||||
FullCtxRef* = ##\
|
||||
FullCtxRef* = CtxRef[CtxData]
|
||||
## Extended global descriptor
|
||||
CtxRef[CtxData]
|
||||
|
||||
const
|
||||
extraTraceMessages = false # or true
|
||||
## Enabled additional logging noise
|
||||
|
||||
FirstPivotSeenTimeout = 3.minutes
|
||||
## Turn on relaxed pivot negotiation after some waiting time when there
|
||||
## was a `peer` seen but was rejected. This covers a rare event. Typically
|
||||
## useless peers do not appear ready for negotiation.
|
||||
|
||||
FirstPivotAcceptedTimeout = 50.seconds
|
||||
## Turn on relaxed pivot negotiation after some waiting time when there
|
||||
## was a `peer` accepted but no second one yet.
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private helpers
|
||||
|
@ -210,6 +229,59 @@ proc stop*(buddy: FullBuddyRef) =
|
|||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc runDaemon*(ctx: FullCtxRef) {.async.} =
|
||||
## Global background job that will be re-started as long as the variable
|
||||
## `ctx.daemon` is set `true`. If that job was stopped due to re-setting
|
||||
## `ctx.daemon` to `false`, it will be restarted next after it was reset
|
||||
## as `true` not before there is some activity on the `runPool()`,
|
||||
## `runSingle()`, or `runMulti()` functions.
|
||||
##
|
||||
case ctx.data.pivotState:
|
||||
of FirstPivotSeen:
|
||||
let elapsed = Moment.now() - ctx.data.pivotStamp
|
||||
if FirstPivotSeenTimeout < elapsed:
|
||||
# Switch to single peer pivot negotiation
|
||||
ctx.data.pivot.pivotRelaxedMode(enable = true)
|
||||
|
||||
# Currently no need for other monitor tasks
|
||||
ctx.daemon = false
|
||||
|
||||
when extraTraceMessages:
|
||||
trace "First seen pivot timeout", elapsed,
|
||||
pivotState=ctx.data.pivotState
|
||||
return
|
||||
# Otherwise delay for some time
|
||||
|
||||
of FirstPivotAccepted:
|
||||
let elapsed = Moment.now() - ctx.data.pivotStamp
|
||||
if FirstPivotAcceptedTimeout < elapsed:
|
||||
# Switch to single peer pivot negotiation
|
||||
ctx.data.pivot.pivotRelaxedMode(enable = true)
|
||||
|
||||
# Use currents pivot next time `runSingle()` is visited. This bent is
|
||||
# necessary as there must be a peer initialising and syncing blocks. But
|
||||
# this daemon has no peer assigned.
|
||||
ctx.data.pivotState = FirstPivotUseRegardless
|
||||
|
||||
# Currently no need for other monitor tasks
|
||||
ctx.daemon = false
|
||||
|
||||
when extraTraceMessages:
|
||||
trace "First accepted pivot timeout", elapsed,
|
||||
pivotState=ctx.data.pivotState
|
||||
return
|
||||
# Otherwise delay for some time
|
||||
|
||||
else:
|
||||
# Currently no need for other monitior tasks
|
||||
ctx.daemon = false
|
||||
return
|
||||
|
||||
# Without waiting, this function repeats every 50ms (as set with the constant
|
||||
# `sync_sched.execLoopTimeElapsedMin`.) Larger waiting time cleans up logging.
|
||||
await sleepAsync 300.milliseconds
|
||||
|
||||
|
||||
proc runSingle*(buddy: FullBuddyRef) {.async.} =
|
||||
## This peer worker is invoked if the peer-local flag `buddy.ctrl.multiOk`
|
||||
## is set `false` which is the default mode. This flag is updated by the
|
||||
|
@ -226,35 +298,99 @@ proc runSingle*(buddy: FullBuddyRef) {.async.} =
|
|||
ctx = buddy.ctx
|
||||
peer = buddy.peer
|
||||
bq = buddy.data.bQueue
|
||||
pv = buddy.data.pivot
|
||||
|
||||
if bq.blockQueueBacktrackOk:
|
||||
let rc = await bq.blockQueueBacktrackWorker()
|
||||
if rc.isOk:
|
||||
when extraTraceMessages:
|
||||
trace "Single mode begin", peer, pivotState=ctx.data.pivotState
|
||||
|
||||
# Update persistent database (may reset `multiOk`)
|
||||
buddy.ctrl.multiOk = true
|
||||
while buddy.processStaged() and not buddy.ctrl.stopped:
|
||||
# Allow thread switch as `persistBlocks()` might be slow
|
||||
await sleepAsync(10.milliseconds)
|
||||
return
|
||||
case ctx.data.pivotState:
|
||||
of PivotStateInitial:
|
||||
# Set initial state on first encounter
|
||||
ctx.data.pivotState = FirstPivotSeen
|
||||
ctx.data.pivotStamp = Moment.now()
|
||||
ctx.daemon = true # Start monitor
|
||||
|
||||
buddy.ctrl.zombie = true
|
||||
of FirstPivotSeen, FirstPivotAccepted:
|
||||
discard
|
||||
|
||||
# Initialise/re-initialise this worker
|
||||
elif await buddy.data.pivot.pivotNegotiate(buddy.data.bestNumber):
|
||||
of FirstPivotUseRegardless:
|
||||
# Magic case when we accept anything under the sun
|
||||
let rc = pv.pivotHeader(relaxedMode=true)
|
||||
if rc.isOK:
|
||||
# Update/activate `bestNumber` from the pivot header
|
||||
bq.bestNumber = some(rc.value.blockNumber)
|
||||
ctx.data.pivotState = PivotRunMode
|
||||
buddy.ctrl.multiOk = true
|
||||
trace "Single pivot accepted", peer, pivot=('#' & $bq.bestNumber.get)
|
||||
return # stop logging, otherwise unconditional return for this case
|
||||
|
||||
when extraTraceMessages:
|
||||
trace "Single mode stopped", peer, pivotState=ctx.data.pivotState
|
||||
return # unconditional return for this case
|
||||
|
||||
of PivotRunMode:
|
||||
# Sync backtrack runs in single mode
|
||||
if bq.blockQueueBacktrackOk:
|
||||
let rc = await bq.blockQueueBacktrackWorker()
|
||||
if rc.isOk:
|
||||
# Update persistent database (may reset `multiOk`)
|
||||
buddy.ctrl.multiOk = true
|
||||
while buddy.processStaged() and not buddy.ctrl.stopped:
|
||||
# Allow thread switch as `persistBlocks()` might be slow
|
||||
await sleepAsync(10.milliseconds)
|
||||
when extraTraceMessages:
|
||||
trace "Single backtrack mode done", peer
|
||||
return
|
||||
|
||||
buddy.ctrl.zombie = true
|
||||
|
||||
when extraTraceMessages:
|
||||
trace "Single backtrack mode stopped", peer
|
||||
return
|
||||
# End case()
|
||||
|
||||
# Negotiate in order to derive the pivot header from this `peer`. This code
|
||||
# location here is reached when there was no compelling reason for the
|
||||
# `case()` handler to process and `return`.
|
||||
if await pv.pivotNegotiate(buddy.data.bQueue.bestNumber):
|
||||
# Update/activate `bestNumber` from the pivot header
|
||||
bq.bestNumber = some(pv.pivotHeader.value.blockNumber)
|
||||
ctx.data.pivotState = PivotRunMode
|
||||
buddy.ctrl.multiOk = true
|
||||
# Update/activate `bestNumber` for local use
|
||||
buddy.data.bestNumber =
|
||||
some(buddy.data.pivot.pivotHeader.value.blockNumber)
|
||||
trace "Pivot accepted", peer, pivot=('#' & $bq.bestNumber.get)
|
||||
return
|
||||
|
||||
elif not buddy.ctrl.stopped:
|
||||
await sleepAsync(2.seconds)
|
||||
if buddy.ctrl.stopped:
|
||||
when extraTraceMessages:
|
||||
trace "Single mode stopped", peer, pivotState=ctx.data.pivotState
|
||||
return # done with this buddy
|
||||
|
||||
var napping = 2.seconds
|
||||
case ctx.data.pivotState:
|
||||
of FirstPivotSeen:
|
||||
# Possible state transition
|
||||
if pv.pivotHeader(relaxedMode=true).isOk:
|
||||
ctx.data.pivotState = FirstPivotAccepted
|
||||
ctx.data.pivotStamp = Moment.now()
|
||||
napping = 300.milliseconds
|
||||
of FirstPivotAccepted:
|
||||
napping = 300.milliseconds
|
||||
else:
|
||||
discard
|
||||
|
||||
when extraTraceMessages:
|
||||
trace "Single mode end", peer, pivotState=ctx.data.pivotState, napping
|
||||
|
||||
# Without waiting, this function repeats every 50ms (as set with the constant
|
||||
# `sync_sched.execLoopTimeElapsedMin`.)
|
||||
await sleepAsync napping
|
||||
|
||||
|
||||
proc runPool*(buddy: FullBuddyRef; last: bool) =
|
||||
## Ocne started, the function `runPool()` is called for all worker peers in
|
||||
## a row (as the body of an iteration.) There will be no other worker peer
|
||||
## functions activated simultaneously.
|
||||
proc runPool*(buddy: FullBuddyRef; last: bool): bool =
|
||||
## Once started, the function `runPool()` is called for all worker peers in
|
||||
## sequence as the body of an iteration as long as the function returns
|
||||
## `false`. There will be no other worker peer functions activated
|
||||
## simultaneously.
|
||||
##
|
||||
## This procedure is started if the global flag `buddy.ctx.poolMode` is set
|
||||
## `true` (default is `false`.) It is the responsibility of the `runPool()`
|
||||
|
@ -265,13 +401,12 @@ proc runPool*(buddy: FullBuddyRef; last: bool) =
|
|||
##
|
||||
## Note that this function does not run in `async` mode.
|
||||
##
|
||||
let
|
||||
ctx = buddy.ctx
|
||||
bq = buddy.data.bQueue
|
||||
if ctx.poolMode:
|
||||
# Mind the gap, fill in if necessary
|
||||
bq.blockQueueGrout()
|
||||
ctx.poolMode = false
|
||||
# Mind the gap, fill in if necessary (function is peer independent)
|
||||
buddy.data.bQueue.blockQueueGrout()
|
||||
|
||||
# Stop after running once regardless of peer
|
||||
buddy.ctx.poolMode = false
|
||||
true
|
||||
|
||||
|
||||
proc runMulti*(buddy: FullBuddyRef) {.async.} =
|
||||
|
|
|
@ -279,6 +279,26 @@ proc pivotHeader*(bp: BestPivotWorkerRef): Result[BlockHeader,void] =
|
|||
|
||||
err()
|
||||
|
||||
proc pivotHeader*(
|
||||
bp: BestPivotWorkerRef; ## Worker peer
|
||||
relaxedMode: bool; ## One time relaxed mode flag
|
||||
): Result[BlockHeader,void] =
|
||||
## Variant of `pivotHeader()` with `relaxedMode` flag as function argument.
|
||||
if bp.header.isSome and
|
||||
bp.peer notin bp.global.untrusted:
|
||||
|
||||
if pivotMinPeersToStartSync <= bp.global.trusted.len and
|
||||
bp.peer in bp.global.trusted:
|
||||
return ok(bp.header.unsafeGet)
|
||||
|
||||
if relaxedMode:
|
||||
when extraTraceMessages:
|
||||
trace "Returning not fully trusted pivot", peer=bp.peer,
|
||||
trusted=bp.global.trusted.len, untrusted=bp.global.untrusted.len
|
||||
return ok(bp.header.unsafeGet)
|
||||
|
||||
err()
|
||||
|
||||
proc pivotNegotiate*(
|
||||
bp: BestPivotWorkerRef; ## Worker peer
|
||||
minBlockNumber: Option[BlockNumber]; ## Minimum block number to expect
|
||||
|
|
|
@ -471,6 +471,19 @@ proc init*(
|
|||
peer: peer,
|
||||
ctrl: ctrl)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions -- getter/setter
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc bestNumber*(qd: BlockQueueWorkerRef): Option[BlockNumber] =
|
||||
## Getter
|
||||
qd.bestNumber
|
||||
|
||||
proc `bestNumber=`*(qd: BlockQueueWorkerRef; val: Option[BlockNumber]) =
|
||||
## Setter, needs to be set to something valid so that `blockQueueWorker()`
|
||||
## does something useful.
|
||||
qd.bestNumber = val
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions -- synchronous
|
||||
# ------------------------------------------------------------------------------
|
||||
|
|
Loading…
Reference in New Issue