Minor updates for testing and cosmetics (#1476)

* Fix locked database file annoyance with unit tests on Windows

why:
  Need to clean up old files first from previous session as files remain
  locked despite closing of database.

* Fix initialisation order

detail:
  Apparently this has no real effect as the ticker is only initialised
  here but started later.

  This possible bug has been in all for a while and was running with the
  previous compiler and libraries.

* Better naming of data fields for sync descriptors

details:
* BuddyRef[S,W]: buddy.data -> buddy.only
* CtxRef[S]: ctx.data -> ctx.pool
This commit is contained in:
Jordan Hrycaj 2023-02-23 13:13:02 +00:00 committed by GitHub
parent 2f6b4de3e9
commit bf53226c2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 133 additions and 142 deletions

View File

@ -113,7 +113,7 @@ proc init*(
enableTicker = false): T = enableTicker = false): T =
new result new result
result.initSync(ethNode, chain, maxPeers, enableTicker) result.initSync(ethNode, chain, maxPeers, enableTicker)
result.ctx.data.rng = rng result.ctx.pool.rng = rng
proc start*(ctx: FullSyncRef) = proc start*(ctx: FullSyncRef) =
doAssert ctx.startSync() doAssert ctx.startSync()

View File

@ -98,7 +98,7 @@ proc topUsedNumber(
proc tickerUpdater(ctx: FullCtxRef): TickerStatsUpdater = proc tickerUpdater(ctx: FullCtxRef): TickerStatsUpdater =
result = proc: TickerStats = result = proc: TickerStats =
var stats: BlockQueueStats var stats: BlockQueueStats
ctx.data.bCtx.blockQueueStats(stats) ctx.pool.bCtx.blockQueueStats(stats)
TickerStats( TickerStats(
topPersistent: stats.topAccepted, topPersistent: stats.topAccepted,
@ -116,7 +116,7 @@ proc processStaged(buddy: FullBuddyRef): bool =
peer = buddy.peer peer = buddy.peer
chainDb = buddy.ctx.chain.db chainDb = buddy.ctx.chain.db
chain = buddy.ctx.chain chain = buddy.ctx.chain
bq = buddy.data.bQueue bq = buddy.only.bQueue
# Get a work item, a list of headers + bodies # Get a work item, a list of headers + bodies
wi = block: wi = block:
@ -135,16 +135,6 @@ proc processStaged(buddy: FullBuddyRef): bool =
except CatchableError as e: except CatchableError as e:
error "Storing persistent blocks failed", peer, range=($wi.blocks), error "Storing persistent blocks failed", peer, range=($wi.blocks),
error = $e.name, msg = e.msg error = $e.name, msg = e.msg
#except Defect as e:
# # Pass through
# raise e
#except Exception as e:
# # Notorious case where the `Chain` reference applied to
# # `persistBlocks()` has the compiler traced a possible `Exception`
# # (i.e. `ctx.chain` could be uninitialised.)
# error "Exception while storing persistent blocks", peer,
# range=($wi.blocks), error=($e.name), msg=e.msg
# raise (ref Defect)(msg: $e.name & ": " & e.msg)
# Something went wrong. Recycle work item (needs to be re-fetched, anyway) # Something went wrong. Recycle work item (needs to be re-fetched, anyway)
let let
@ -185,23 +175,23 @@ proc processStaged(buddy: FullBuddyRef): bool =
proc setup*(ctx: FullCtxRef; tickerOK: bool): bool = proc setup*(ctx: FullCtxRef; tickerOK: bool): bool =
## Global set up ## Global set up
ctx.data.pivot = BestPivotCtxRef.init(ctx.data.rng) ctx.pool.pivot = BestPivotCtxRef.init(ctx.pool.rng)
if tickerOK:
ctx.data.ticker = TickerRef.init(ctx.tickerUpdater)
else:
debug "Ticker is disabled"
let rc = ctx.topUsedNumber(backBlocks = 0) let rc = ctx.topUsedNumber(backBlocks = 0)
if rc.isErr: if rc.isErr:
ctx.data.bCtx = BlockQueueCtxRef.init() ctx.pool.bCtx = BlockQueueCtxRef.init()
return false return false
ctx.data.bCtx = BlockQueueCtxRef.init(rc.value + 1) ctx.pool.bCtx = BlockQueueCtxRef.init(rc.value + 1)
if tickerOK:
ctx.pool.ticker = TickerRef.init(ctx.tickerUpdater)
else:
debug "Ticker is disabled"
true true
proc release*(ctx: FullCtxRef) = proc release*(ctx: FullCtxRef) =
## Global clean up ## Global clean up
ctx.data.pivot = nil ctx.pool.pivot = nil
if not ctx.data.ticker.isNil: if not ctx.pool.ticker.isNil:
ctx.data.ticker.stop() ctx.pool.ticker.stop()
proc start*(buddy: FullBuddyRef): bool = proc start*(buddy: FullBuddyRef): bool =
## Initialise worker peer ## Initialise worker peer
@ -210,20 +200,20 @@ proc start*(buddy: FullBuddyRef): bool =
peer = buddy.peer peer = buddy.peer
if peer.supports(protocol.eth) and if peer.supports(protocol.eth) and
peer.state(protocol.eth).initialized: peer.state(protocol.eth).initialized:
if not ctx.data.ticker.isNil: if not ctx.pool.ticker.isNil:
ctx.data.ticker.startBuddy() ctx.pool.ticker.startBuddy()
buddy.data.pivot = buddy.only.pivot =
BestPivotWorkerRef.init(ctx.data.pivot, buddy.ctrl, buddy.peer) BestPivotWorkerRef.init(ctx.pool.pivot, buddy.ctrl, buddy.peer)
buddy.data.bQueue = BlockQueueWorkerRef.init( buddy.only.bQueue = BlockQueueWorkerRef.init(
ctx.data.bCtx, buddy.ctrl, peer) ctx.pool.bCtx, buddy.ctrl, peer)
return true return true
proc stop*(buddy: FullBuddyRef) = proc stop*(buddy: FullBuddyRef) =
## Clean up this peer ## Clean up this peer
buddy.ctrl.stopped = true buddy.ctrl.stopped = true
buddy.data.pivot.clear() buddy.only.pivot.clear()
if not buddy.ctx.data.ticker.isNil: if not buddy.ctx.pool.ticker.isNil:
buddy.ctx.data.ticker.stopBuddy() buddy.ctx.pool.ticker.stopBuddy()
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Public functions # Public functions
@ -236,39 +226,39 @@ proc runDaemon*(ctx: FullCtxRef) {.async.} =
## as `true` not before there is some activity on the `runPool()`, ## as `true` not before there is some activity on the `runPool()`,
## `runSingle()`, or `runMulti()` functions. ## `runSingle()`, or `runMulti()` functions.
## ##
case ctx.data.pivotState: case ctx.pool.pivotState:
of FirstPivotSeen: of FirstPivotSeen:
let elapsed = Moment.now() - ctx.data.pivotStamp let elapsed = Moment.now() - ctx.pool.pivotStamp
if FirstPivotSeenTimeout < elapsed: if FirstPivotSeenTimeout < elapsed:
# Switch to single peer pivot negotiation # Switch to single peer pivot negotiation
ctx.data.pivot.pivotRelaxedMode(enable = true) ctx.pool.pivot.pivotRelaxedMode(enable = true)
# Currently no need for other monitor tasks # Currently no need for other monitor tasks
ctx.daemon = false ctx.daemon = false
when extraTraceMessages: when extraTraceMessages:
trace "First seen pivot timeout", elapsed, trace "First seen pivot timeout", elapsed,
pivotState=ctx.data.pivotState pivotState=ctx.pool.pivotState
return return
# Otherwise delay for some time # Otherwise delay for some time
of FirstPivotAccepted: of FirstPivotAccepted:
let elapsed = Moment.now() - ctx.data.pivotStamp let elapsed = Moment.now() - ctx.pool.pivotStamp
if FirstPivotAcceptedTimeout < elapsed: if FirstPivotAcceptedTimeout < elapsed:
# Switch to single peer pivot negotiation # Switch to single peer pivot negotiation
ctx.data.pivot.pivotRelaxedMode(enable = true) ctx.pool.pivot.pivotRelaxedMode(enable = true)
# Use currents pivot next time `runSingle()` is visited. This bent is # Use currents pivot next time `runSingle()` is visited. This bent is
# necessary as there must be a peer initialising and syncing blocks. But # necessary as there must be a peer initialising and syncing blocks. But
# this daemon has no peer assigned. # this daemon has no peer assigned.
ctx.data.pivotState = FirstPivotUseRegardless ctx.pool.pivotState = FirstPivotUseRegardless
# Currently no need for other monitor tasks # Currently no need for other monitor tasks
ctx.daemon = false ctx.daemon = false
when extraTraceMessages: when extraTraceMessages:
trace "First accepted pivot timeout", elapsed, trace "First accepted pivot timeout", elapsed,
pivotState=ctx.data.pivotState pivotState=ctx.pool.pivotState
return return
# Otherwise delay for some time # Otherwise delay for some time
@ -297,17 +287,17 @@ proc runSingle*(buddy: FullBuddyRef) {.async.} =
let let
ctx = buddy.ctx ctx = buddy.ctx
peer {.used.} = buddy.peer peer {.used.} = buddy.peer
bq = buddy.data.bQueue bq = buddy.only.bQueue
pv = buddy.data.pivot pv = buddy.only.pivot
when extraTraceMessages: when extraTraceMessages:
trace "Single mode begin", peer, pivotState=ctx.data.pivotState trace "Single mode begin", peer, pivotState=ctx.pool.pivotState
case ctx.data.pivotState: case ctx.pool.pivotState:
of PivotStateInitial: of PivotStateInitial:
# Set initial state on first encounter # Set initial state on first encounter
ctx.data.pivotState = FirstPivotSeen ctx.pool.pivotState = FirstPivotSeen
ctx.data.pivotStamp = Moment.now() ctx.pool.pivotStamp = Moment.now()
ctx.daemon = true # Start monitor ctx.daemon = true # Start monitor
of FirstPivotSeen, FirstPivotAccepted: of FirstPivotSeen, FirstPivotAccepted:
@ -319,13 +309,13 @@ proc runSingle*(buddy: FullBuddyRef) {.async.} =
if rc.isOK: if rc.isOK:
# Update/activate `bestNumber` from the pivot header # Update/activate `bestNumber` from the pivot header
bq.bestNumber = some(rc.value.blockNumber) bq.bestNumber = some(rc.value.blockNumber)
ctx.data.pivotState = PivotRunMode ctx.pool.pivotState = PivotRunMode
buddy.ctrl.multiOk = true buddy.ctrl.multiOk = true
trace "Single pivot accepted", peer, pivot=('#' & $bq.bestNumber.get) trace "Single pivot accepted", peer, pivot=('#' & $bq.bestNumber.get)
return # stop logging, otherwise unconditional return for this case return # stop logging, otherwise unconditional return for this case
when extraTraceMessages: when extraTraceMessages:
trace "Single mode stopped", peer, pivotState=ctx.data.pivotState trace "Single mode stopped", peer, pivotState=ctx.pool.pivotState
return # unconditional return for this case return # unconditional return for this case
of PivotRunMode: of PivotRunMode:
@ -352,26 +342,26 @@ proc runSingle*(buddy: FullBuddyRef) {.async.} =
# Negotiate in order to derive the pivot header from this `peer`. This code # Negotiate in order to derive the pivot header from this `peer`. This code
# location here is reached when there was no compelling reason for the # location here is reached when there was no compelling reason for the
# `case()` handler to process and `return`. # `case()` handler to process and `return`.
if await pv.pivotNegotiate(buddy.data.bQueue.bestNumber): if await pv.pivotNegotiate(buddy.only.bQueue.bestNumber):
# Update/activate `bestNumber` from the pivot header # Update/activate `bestNumber` from the pivot header
bq.bestNumber = some(pv.pivotHeader.value.blockNumber) bq.bestNumber = some(pv.pivotHeader.value.blockNumber)
ctx.data.pivotState = PivotRunMode ctx.pool.pivotState = PivotRunMode
buddy.ctrl.multiOk = true buddy.ctrl.multiOk = true
trace "Pivot accepted", peer, pivot=('#' & $bq.bestNumber.get) trace "Pivot accepted", peer, pivot=('#' & $bq.bestNumber.get)
return return
if buddy.ctrl.stopped: if buddy.ctrl.stopped:
when extraTraceMessages: when extraTraceMessages:
trace "Single mode stopped", peer, pivotState=ctx.data.pivotState trace "Single mode stopped", peer, pivotState=ctx.pool.pivotState
return # done with this buddy return # done with this buddy
var napping = 2.seconds var napping = 2.seconds
case ctx.data.pivotState: case ctx.pool.pivotState:
of FirstPivotSeen: of FirstPivotSeen:
# Possible state transition # Possible state transition
if pv.pivotHeader(relaxedMode=true).isOk: if pv.pivotHeader(relaxedMode=true).isOk:
ctx.data.pivotState = FirstPivotAccepted ctx.pool.pivotState = FirstPivotAccepted
ctx.data.pivotStamp = Moment.now() ctx.pool.pivotStamp = Moment.now()
napping = 300.milliseconds napping = 300.milliseconds
of FirstPivotAccepted: of FirstPivotAccepted:
napping = 300.milliseconds napping = 300.milliseconds
@ -379,7 +369,7 @@ proc runSingle*(buddy: FullBuddyRef) {.async.} =
discard discard
when extraTraceMessages: when extraTraceMessages:
trace "Single mode end", peer, pivotState=ctx.data.pivotState, napping trace "Single mode end", peer, pivotState=ctx.pool.pivotState, napping
# Without waiting, this function repeats every 50ms (as set with the constant # Without waiting, this function repeats every 50ms (as set with the constant
# `sync_sched.execLoopTimeElapsedMin`.) # `sync_sched.execLoopTimeElapsedMin`.)
@ -402,7 +392,7 @@ proc runPool*(buddy: FullBuddyRef; last: bool): bool =
## Note that this function does not run in `async` mode. ## Note that this function does not run in `async` mode.
## ##
# Mind the gap, fill in if necessary (function is peer independent) # Mind the gap, fill in if necessary (function is peer independent)
buddy.data.bQueue.blockQueueGrout() buddy.only.bQueue.blockQueueGrout()
# Stop after running once regardless of peer # Stop after running once regardless of peer
buddy.ctx.poolMode = false buddy.ctx.poolMode = false
@ -417,7 +407,7 @@ proc runMulti*(buddy: FullBuddyRef) {.async.} =
# Fetch work item # Fetch work item
let let
ctx {.used.} = buddy.ctx ctx {.used.} = buddy.ctx
bq = buddy.data.bQueue bq = buddy.only.bQueue
rc = await bq.blockQueueWorker() rc = await bq.blockQueueWorker()
if rc.isErr: if rc.isErr:
if rc.error == StagedQueueOverflow: if rc.error == StagedQueueOverflow:

View File

@ -119,9 +119,9 @@ proc init*(
new result new result
result.initSync(ethNode, chain, maxPeers, enableTicker) result.initSync(ethNode, chain, maxPeers, enableTicker)
result.ctx.chain = chain # explicitely override result.ctx.chain = chain # explicitely override
result.ctx.data.rng = rng result.ctx.pool.rng = rng
result.ctx.data.dbBackend = dbBackend result.ctx.pool.dbBackend = dbBackend
result.ctx.data.noRecovery = noRecovery result.ctx.pool.noRecovery = noRecovery
# Required to have been initialised via `addEthHandlerCapability()` # Required to have been initialised via `addEthHandlerCapability()`
doAssert not result.ctx.ethWireCtx.isNil doAssert not result.ctx.ethWireCtx.isNil

View File

@ -48,7 +48,7 @@ template noExceptionOops(info: static[string]; code: untyped) =
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} = proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} =
let recov = ctx.data.recovery let recov = ctx.pool.recovery
if recov.isNil: if recov.isNil:
return false return false
@ -57,7 +57,7 @@ proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} =
"#" & $recov.state.header.blockNumber & "(" & $recov.level & ")" "#" & $recov.state.header.blockNumber & "(" & $recov.level & ")"
topLevel = recov.level == 0 topLevel = recov.level == 0
env = block: env = block:
let rc = ctx.data.pivotTable.eq recov.state.header.stateRoot let rc = ctx.pool.pivotTable.eq recov.state.header.stateRoot
if rc.isErr: if rc.isErr:
error "Recovery pivot context gone", checkpoint, topLevel error "Recovery pivot context gone", checkpoint, topLevel
return false return false
@ -79,19 +79,19 @@ proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} =
#when extraTraceMessages: #when extraTraceMessages:
# trace "Recovery done", checkpoint, topLevel # trace "Recovery done", checkpoint, topLevel
return false return false
let rc = ctx.data.snapDb.recoverPivot(recov.state.predecessor) let rc = ctx.pool.snapDb.recoverPivot(recov.state.predecessor)
if rc.isErr: if rc.isErr:
when extraTraceMessages: when extraTraceMessages:
trace "Recovery stopped at pivot stale checkpoint", checkpoint, topLevel trace "Recovery stopped at pivot stale checkpoint", checkpoint, topLevel
return false return false
# Set up next level pivot checkpoint # Set up next level pivot checkpoint
ctx.data.recovery = SnapRecoveryRef( ctx.pool.recovery = SnapRecoveryRef(
state: rc.value, state: rc.value,
level: recov.level + 1) level: recov.level + 1)
# Push onto pivot table and continue recovery (i.e. do not stop it yet) # Push onto pivot table and continue recovery (i.e. do not stop it yet)
ctx.data.pivotTable.reverseUpdate(ctx.data.recovery.state.header, ctx) ctx.pool.pivotTable.reverseUpdate(ctx.pool.recovery.state.header, ctx)
return true # continue recovery return true # continue recovery
@ -101,38 +101,38 @@ proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} =
proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool = proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool =
## Global set up ## Global set up
ctx.data.coveredAccounts = NodeTagRangeSet.init() ctx.pool.coveredAccounts = NodeTagRangeSet.init()
noExceptionOops("worker.setup()"): noExceptionOops("worker.setup()"):
ctx.ethWireCtx.txPoolEnabled false ctx.ethWireCtx.txPoolEnabled false
ctx.chain.com.syncReqNewHead = ctx.pivotUpdateBeaconHeaderCB ctx.chain.com.syncReqNewHead = ctx.pivotUpdateBeaconHeaderCB
ctx.data.snapDb = ctx.pool.snapDb =
if ctx.data.dbBackend.isNil: SnapDbRef.init(ctx.chain.db.db) if ctx.pool.dbBackend.isNil: SnapDbRef.init(ctx.chain.db.db)
else: SnapDbRef.init(ctx.data.dbBackend) else: SnapDbRef.init(ctx.pool.dbBackend)
if tickerOK: if tickerOK:
ctx.data.ticker = TickerRef.init(ctx.data.pivotTable.tickerStats(ctx)) ctx.pool.ticker = TickerRef.init(ctx.pool.pivotTable.tickerStats(ctx))
else: else:
trace "Ticker is disabled" trace "Ticker is disabled"
# Check for recovery mode # Check for recovery mode
if not ctx.data.noRecovery: if not ctx.pool.noRecovery:
let rc = ctx.data.snapDb.recoverPivot() let rc = ctx.pool.snapDb.recoverPivot()
if rc.isOk: if rc.isOk:
ctx.data.recovery = SnapRecoveryRef(state: rc.value) ctx.pool.recovery = SnapRecoveryRef(state: rc.value)
ctx.daemon = true ctx.daemon = true
# Set up early initial pivot # Set up early initial pivot
ctx.data.pivotTable.reverseUpdate(ctx.data.recovery.state.header, ctx) ctx.pool.pivotTable.reverseUpdate(ctx.pool.recovery.state.header, ctx)
trace "Recovery started", trace "Recovery started",
checkpoint=("#" & $ctx.data.pivotTable.topNumber() & "(0)") checkpoint=("#" & $ctx.pool.pivotTable.topNumber() & "(0)")
if not ctx.data.ticker.isNil: if not ctx.pool.ticker.isNil:
ctx.data.ticker.startRecovery() ctx.pool.ticker.startRecovery()
true true
proc release*(ctx: SnapCtxRef) = proc release*(ctx: SnapCtxRef) =
## Global clean up ## Global clean up
if not ctx.data.ticker.isNil: if not ctx.pool.ticker.isNil:
ctx.data.ticker.stop() ctx.pool.ticker.stop()
ctx.data.ticker = nil ctx.pool.ticker = nil
noExceptionOops("worker.release()"): noExceptionOops("worker.release()"):
ctx.ethWireCtx.txPoolEnabled true ctx.ethWireCtx.txPoolEnabled true
ctx.chain.com.syncReqNewHead = nil ctx.chain.com.syncReqNewHead = nil
@ -145,16 +145,16 @@ proc start*(buddy: SnapBuddyRef): bool =
if peer.supports(protocol.snap) and if peer.supports(protocol.snap) and
peer.supports(protocol.eth) and peer.supports(protocol.eth) and
peer.state(protocol.eth).initialized: peer.state(protocol.eth).initialized:
buddy.data.errors = ComErrorStatsRef() buddy.only.errors = ComErrorStatsRef()
if not ctx.data.ticker.isNil: if not ctx.pool.ticker.isNil:
ctx.data.ticker.startBuddy() ctx.pool.ticker.startBuddy()
return true return true
proc stop*(buddy: SnapBuddyRef) = proc stop*(buddy: SnapBuddyRef) =
## Clean up this peer ## Clean up this peer
let ctx = buddy.ctx let ctx = buddy.ctx
if not ctx.data.ticker.isNil: if not ctx.pool.ticker.isNil:
ctx.data.ticker.stopBuddy() ctx.pool.ticker.stopBuddy()
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Public functions # Public functions
@ -163,15 +163,15 @@ proc stop*(buddy: SnapBuddyRef) =
proc runDaemon*(ctx: SnapCtxRef) {.async.} = proc runDaemon*(ctx: SnapCtxRef) {.async.} =
## Enabled while `ctx.daemon` is `true` ## Enabled while `ctx.daemon` is `true`
## ##
if not ctx.data.recovery.isNil: if not ctx.pool.recovery.isNil:
if not await ctx.recoveryStepContinue(): if not await ctx.recoveryStepContinue():
# Done, stop recovery # Done, stop recovery
ctx.data.recovery = nil ctx.pool.recovery = nil
ctx.daemon = false ctx.daemon = false
# Update logging # Update logging
if not ctx.data.ticker.isNil: if not ctx.pool.ticker.isNil:
ctx.data.ticker.stopRecovery() ctx.pool.ticker.stopRecovery()
proc runSingle*(buddy: SnapBuddyRef) {.async.} = proc runSingle*(buddy: SnapBuddyRef) {.async.} =
@ -191,12 +191,12 @@ proc runPool*(buddy: SnapBuddyRef, last: bool): bool =
result = true result = true
# Clean up empty pivot slots (never the top one) # Clean up empty pivot slots (never the top one)
var rc = ctx.data.pivotTable.beforeLast var rc = ctx.pool.pivotTable.beforeLast
while rc.isOK: while rc.isOK:
let (key, env) = (rc.value.key, rc.value.data) let (key, env) = (rc.value.key, rc.value.data)
if env.fetchAccounts.processed.isEmpty: if env.fetchAccounts.processed.isEmpty:
ctx.data.pivotTable.del key ctx.pool.pivotTable.del key
rc = ctx.data.pivotTable.prev(key) rc = ctx.pool.pivotTable.prev(key)
proc runMulti*(buddy: SnapBuddyRef) {.async.} = proc runMulti*(buddy: SnapBuddyRef) {.async.} =
@ -211,13 +211,13 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} =
# Set up current state root environment for accounts snapshot # Set up current state root environment for accounts snapshot
let let
env = block: env = block:
let rc = ctx.data.pivotTable.lastValue let rc = ctx.pool.pivotTable.lastValue
if rc.isErr: if rc.isErr:
return # nothing to do return # nothing to do
rc.value rc.value
pivot = "#" & $env.stateHeader.blockNumber # for logging pivot = "#" & $env.stateHeader.blockNumber # for logging
buddy.data.pivotEnv = env buddy.only.pivotEnv = env
# Full sync processsing based on current snapshot # Full sync processsing based on current snapshot
# ----------------------------------------------- # -----------------------------------------------
@ -231,7 +231,7 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} =
# If this is a new pivot, the previous one can be cleaned up. There is no # If this is a new pivot, the previous one can be cleaned up. There is no
# point in keeping some older space consuming state data any longer. # point in keeping some older space consuming state data any longer.
ctx.data.pivotTable.beforeTopMostlyClean() ctx.pool.pivotTable.beforeTopMostlyClean()
when extraTraceMessages: when extraTraceMessages:
block: block:

View File

@ -60,7 +60,7 @@ proc init(
# Initialise accounts range fetch batch, the pair of `fetchAccounts[]` range # Initialise accounts range fetch batch, the pair of `fetchAccounts[]` range
# sets. Deprioritise already processed ranges by moving it to the second set. # sets. Deprioritise already processed ranges by moving it to the second set.
for iv in ctx.data.coveredAccounts.increasing: for iv in ctx.pool.coveredAccounts.increasing:
discard result.unprocessed[0].reduce iv discard result.unprocessed[0].reduce iv
discard result.unprocessed[1].merge iv discard result.unprocessed[1].merge iv
@ -134,7 +134,7 @@ proc tickerStats*(
var var
aSum, aSqSum, uSum, uSqSum, sSum, sSqSum: float aSum, aSqSum, uSum, uSqSum, sSum, sSqSum: float
count = 0 count = 0
for kvp in ctx.data.pivotTable.nextPairs: for kvp in ctx.pool.pivotTable.nextPairs:
# Accounts mean & variance # Accounts mean & variance
let aLen = kvp.data.nAccounts.float let aLen = kvp.data.nAccounts.float
@ -152,9 +152,9 @@ proc tickerStats*(
sSum += sLen sSum += sLen
sSqSum += sLen * sLen sSqSum += sLen * sLen
let let
env = ctx.data.pivotTable.lastValue.get(otherwise = nil) env = ctx.pool.pivotTable.lastValue.get(otherwise = nil)
accCoverage = (ctx.data.coveredAccounts.fullFactor + accCoverage = (ctx.pool.coveredAccounts.fullFactor +
ctx.data.covAccTimesFull.float) ctx.pool.covAccTimesFull.float)
accFill = meanStdDev(uSum, uSqSum, count) accFill = meanStdDev(uSum, uSqSum, count)
var var
beaconBlock = none(BlockNumber) beaconBlock = none(BlockNumber)
@ -165,13 +165,13 @@ proc tickerStats*(
pivotBlock = some(env.stateHeader.blockNumber) pivotBlock = some(env.stateHeader.blockNumber)
procChunks = env.fetchAccounts.processed.chunks procChunks = env.fetchAccounts.processed.chunks
stoQuLen = some(env.storageQueueTotal()) stoQuLen = some(env.storageQueueTotal())
if 0 < ctx.data.beaconHeader.blockNumber: if 0 < ctx.pool.beaconHeader.blockNumber:
beaconBlock = some(ctx.data.beaconHeader.blockNumber) beaconBlock = some(ctx.pool.beaconHeader.blockNumber)
SnapTickerStats( SnapTickerStats(
beaconBlock: beaconBlock, beaconBlock: beaconBlock,
pivotBlock: pivotBlock, pivotBlock: pivotBlock,
nQueues: ctx.data.pivotTable.len, nQueues: ctx.pool.pivotTable.len,
nAccounts: meanStdDev(aSum, aSqSum, count), nAccounts: meanStdDev(aSum, aSqSum, count),
nSlotLists: meanStdDev(sSum, sSqSum, count), nSlotLists: meanStdDev(sSum, sSqSum, count),
accountsFill: (accFill[0], accFill[1], accCoverage), accountsFill: (accFill[0], accFill[1], accCoverage),
@ -273,7 +273,7 @@ proc saveCheckpoint*(
if accountsSaveStorageSlotsMax < nStoQu: if accountsSaveStorageSlotsMax < nStoQu:
return err(TooManySlotAccounts) return err(TooManySlotAccounts)
ctx.data.snapDb.savePivot SnapDbPivotRegistry( ctx.pool.snapDb.savePivot SnapDbPivotRegistry(
header: env.stateHeader, header: env.stateHeader,
nAccounts: env.nAccounts, nAccounts: env.nAccounts,
nSlotLists: env.nSlotLists, nSlotLists: env.nSlotLists,
@ -294,7 +294,7 @@ proc recoverPivotFromCheckpoint*(
## `processed`, `unprocessed`, and the `fetchStorageFull` lists are ## `processed`, `unprocessed`, and the `fetchStorageFull` lists are
## initialised. ## initialised.
## ##
let recov = ctx.data.recovery let recov = ctx.pool.recovery
if recov.isNil: if recov.isNil:
return return
@ -306,7 +306,7 @@ proc recoverPivotFromCheckpoint*(
if topLevel: if topLevel:
env.fetchAccounts.unprocessed.reduce(minPt, maxPt) env.fetchAccounts.unprocessed.reduce(minPt, maxPt)
discard env.fetchAccounts.processed.merge(minPt, maxPt) discard env.fetchAccounts.processed.merge(minPt, maxPt)
discard ctx.data.coveredAccounts.merge(minPt, maxPt) discard ctx.pool.coveredAccounts.merge(minPt, maxPt)
ctx.pivotAccountsCoverage100PcRollOver() # update coverage level roll over ctx.pivotAccountsCoverage100PcRollOver() # update coverage level roll over
# Handle storage slots # Handle storage slots
@ -316,7 +316,7 @@ proc recoverPivotFromCheckpoint*(
if 0 < env.fetchAccounts.processed.covered(pt): if 0 < env.fetchAccounts.processed.covered(pt):
# Ignoring slots that have accounts to be downloaded, anyway # Ignoring slots that have accounts to be downloaded, anyway
let rc = ctx.data.snapDb.getAccountsData(stateRoot, w) let rc = ctx.pool.snapDb.getAccountsData(stateRoot, w)
if rc.isErr: if rc.isErr:
# Oops, how did that account get lost? # Oops, how did that account get lost?
discard env.fetchAccounts.processed.reduce pt discard env.fetchAccounts.processed.reduce pt
@ -342,12 +342,12 @@ proc pivotApprovePeer*(buddy: SnapBuddyRef) {.async.} =
let let
ctx = buddy.ctx ctx = buddy.ctx
peer {.used.} = buddy.peer peer {.used.} = buddy.peer
beaconHeader = ctx.data.beaconHeader beaconHeader = ctx.pool.beaconHeader
var var
pivotHeader: BlockHeader pivotHeader: BlockHeader
block: block:
let rc = ctx.data.pivotTable.lastValue let rc = ctx.pool.pivotTable.lastValue
if rc.isOk: if rc.isOk:
pivotHeader = rc.value.stateHeader pivotHeader = rc.value.stateHeader
@ -356,7 +356,7 @@ proc pivotApprovePeer*(buddy: SnapBuddyRef) {.async.} =
# If the entry before the previous entry is unused, then run a pool mode # If the entry before the previous entry is unused, then run a pool mode
# based session (which should enable a pivot table purge). # based session (which should enable a pivot table purge).
block: block:
let rc = ctx.data.pivotTable.beforeLast let rc = ctx.pool.pivotTable.beforeLast
if rc.isOk and rc.value.data.fetchAccounts.processed.isEmpty: if rc.isOk and rc.value.data.fetchAccounts.processed.isEmpty:
ctx.poolMode = true ctx.poolMode = true
@ -365,7 +365,7 @@ proc pivotApprovePeer*(buddy: SnapBuddyRef) {.async.} =
pivot=("#" & $pivotHeader.blockNumber), pivot=("#" & $pivotHeader.blockNumber),
beacon=("#" & $beaconHeader.blockNumber), poolMode=ctx.poolMode beacon=("#" & $beaconHeader.blockNumber), poolMode=ctx.poolMode
discard ctx.data.pivotTable.lruAppend( discard ctx.pool.pivotTable.lruAppend(
beaconHeader.stateRoot, SnapPivotRef.init(ctx, beaconHeader), beaconHeader.stateRoot, SnapPivotRef.init(ctx, beaconHeader),
pivotTableLruEntriesMax) pivotTableLruEntriesMax)
@ -380,10 +380,10 @@ proc pivotUpdateBeaconHeaderCB*(ctx: SnapCtxRef): SyncReqNewHeadCB =
## Update beacon header. This function is intended as a call back function ## Update beacon header. This function is intended as a call back function
## for the RPC module. ## for the RPC module.
result = proc(h: BlockHeader) {.gcsafe.} = result = proc(h: BlockHeader) {.gcsafe.} =
if ctx.data.beaconHeader.blockNumber < h.blockNumber: if ctx.pool.beaconHeader.blockNumber < h.blockNumber:
# when extraTraceMessages: # when extraTraceMessages:
# trace "External beacon info update", header=("#" & $h.blockNumber) # trace "External beacon info update", header=("#" & $h.blockNumber)
ctx.data.beaconHeader = h ctx.pool.beaconHeader = h
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# End # End

View File

@ -86,7 +86,7 @@ proc healingCtx(
"pivot=" & "#" & $env.stateHeader.blockNumber & "," & "pivot=" & "#" & $env.stateHeader.blockNumber & "," &
"nAccounts=" & $env.nAccounts & "," & "nAccounts=" & $env.nAccounts & "," &
("covered=" & $env.fetchAccounts.processed & "/" & ("covered=" & $env.fetchAccounts.processed & "/" &
$ctx.data.coveredAccounts ) & "}" $ctx.pool.coveredAccounts ) & "}"
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Private helpers # Private helpers
@ -118,7 +118,7 @@ proc compileMissingNodesList(
ctx = buddy.ctx ctx = buddy.ctx
peer {.used.} = buddy.peer peer {.used.} = buddy.peer
rootKey = env.stateHeader.stateRoot.to(NodeKey) rootKey = env.stateHeader.stateRoot.to(NodeKey)
getFn = ctx.data.snapDb.getAccountFn getFn = ctx.pool.snapDb.getAccountFn
fa {.used.} = env.fetchAccounts fa {.used.} = env.fetchAccounts
# Import from earlier run # Import from earlier run
@ -171,7 +171,7 @@ proc fetchMissingNodes(
let rc = await buddy.getTrieNodes(stateRoot, pathList, pivot) let rc = await buddy.getTrieNodes(stateRoot, pathList, pivot)
if rc.isOk: if rc.isOk:
# Reset error counts for detecting repeated timeouts, network errors, etc. # Reset error counts for detecting repeated timeouts, network errors, etc.
buddy.data.errors.resetComError() buddy.only.errors.resetComError()
# Forget about unfetched missing nodes, will be picked up later # Forget about unfetched missing nodes, will be picked up later
return rc.value.nodes.mapIt(NodeSpecs( return rc.value.nodes.mapIt(NodeSpecs(
@ -182,7 +182,7 @@ proc fetchMissingNodes(
# Process error ... # Process error ...
let let
error = rc.error error = rc.error
ok = await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors) ok = await buddy.ctrl.stopAfterSeriousComError(error, buddy.only.errors)
when extraTraceMessages: when extraTraceMessages:
if ok: if ok:
trace logTxt "fetch nodes error => stop", peer, trace logTxt "fetch nodes error => stop", peer,
@ -241,7 +241,7 @@ proc registerAccountLeaf(
if 0 < env.fetchAccounts.processed.merge(pt,pt) : if 0 < env.fetchAccounts.processed.merge(pt,pt) :
env.nAccounts.inc env.nAccounts.inc
env.fetchAccounts.unprocessed.reduce(pt,pt) env.fetchAccounts.unprocessed.reduce(pt,pt)
discard buddy.ctx.data.coveredAccounts.merge(pt,pt) discard buddy.ctx.pool.coveredAccounts.merge(pt,pt)
# Update storage slots batch # Update storage slots batch
if acc.storageRoot != emptyRlpHash: if acc.storageRoot != emptyRlpHash:
@ -260,7 +260,7 @@ proc accountsHealingImpl(
## number of nodes fetched from the network, and -1 upon error. ## number of nodes fetched from the network, and -1 upon error.
let let
ctx = buddy.ctx ctx = buddy.ctx
db = ctx.data.snapDb db = ctx.pool.snapDb
peer = buddy.peer peer = buddy.peer
# Import from earlier runs (if any) # Import from earlier runs (if any)

View File

@ -130,7 +130,7 @@ proc compileMissingNodesList(
peer {.used.} = buddy.peer peer {.used.} = buddy.peer
slots = kvp.data.slots slots = kvp.data.slots
rootKey = kvp.key.to(NodeKey) rootKey = kvp.key.to(NodeKey)
getFn = ctx.data.snapDb.getStorageSlotsFn(kvp.data.accKey) getFn = ctx.pool.snapDb.getStorageSlotsFn(kvp.data.accKey)
if not slots.processed.isFull: if not slots.processed.isFull:
noExceptionOops("compileMissingNodesList"): noExceptionOops("compileMissingNodesList"):
@ -177,7 +177,7 @@ proc getNodesFromNetwork(
rc = await buddy.getTrieNodes(storageRoot, @[req], pivot) rc = await buddy.getTrieNodes(storageRoot, @[req], pivot)
if rc.isOk: if rc.isOk:
# Reset error counts for detecting repeated timeouts, network errors, etc. # Reset error counts for detecting repeated timeouts, network errors, etc.
buddy.data.errors.resetComError() buddy.only.errors.resetComError()
return rc.value.nodes.mapIt(NodeSpecs( return rc.value.nodes.mapIt(NodeSpecs(
partialPath: it.partialPath, partialPath: it.partialPath,
@ -185,7 +185,7 @@ proc getNodesFromNetwork(
data: it.data)) data: it.data))
let error = rc.error let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): if await buddy.ctrl.stopAfterSeriousComError(error, buddy.only.errors):
when extraTraceMessages: when extraTraceMessages:
trace logTxt "fetch nodes error => stop", peer, trace logTxt "fetch nodes error => stop", peer,
ctx=buddy.healingCtx(kvp,env), error ctx=buddy.healingCtx(kvp,env), error
@ -217,7 +217,7 @@ proc storageSlotsHealing(
## `false` if there are nodes left to be completed. ## `false` if there are nodes left to be completed.
let let
ctx = buddy.ctx ctx = buddy.ctx
db = ctx.data.snapDb db = ctx.pool.snapDb
peer = buddy.peer peer = buddy.peer
missing = buddy.compileMissingNodesList(kvp, env) missing = buddy.compileMissingNodesList(kvp, env)

View File

@ -113,7 +113,7 @@ proc accountsRangefetchImpl(
let let
ctx = buddy.ctx ctx = buddy.ctx
peer = buddy.peer peer = buddy.peer
db = ctx.data.snapDb db = ctx.pool.snapDb
fa = env.fetchAccounts fa = env.fetchAccounts
stateRoot = env.stateHeader.stateRoot stateRoot = env.stateHeader.stateRoot
@ -134,7 +134,7 @@ proc accountsRangefetchImpl(
if rc.isErr: if rc.isErr:
fa.unprocessed.merge iv # fail => interval back to pool fa.unprocessed.merge iv # fail => interval back to pool
let error = rc.error let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): if await buddy.ctrl.stopAfterSeriousComError(error, buddy.only.errors):
when extraTraceMessages: when extraTraceMessages:
trace logTxt "fetch error", peer, ctx=buddy.fetchCtx(env), trace logTxt "fetch error", peer, ctx=buddy.fetchCtx(env),
reqLen=iv.len, error reqLen=iv.len, error
@ -142,7 +142,7 @@ proc accountsRangefetchImpl(
rc.value rc.value
# Reset error counts for detecting repeated timeouts, network errors, etc. # Reset error counts for detecting repeated timeouts, network errors, etc.
buddy.data.errors.resetComError() buddy.only.errors.resetComError()
let let
gotAccounts = dd.data.accounts.len # comprises `gotStorage` gotAccounts = dd.data.accounts.len # comprises `gotStorage`
@ -188,7 +188,7 @@ proc accountsRangefetchImpl(
fa.unprocessed.reduce w fa.unprocessed.reduce w
# Register consumed intervals on the accumulators over all state roots. # Register consumed intervals on the accumulators over all state roots.
discard fa.processed.merge w discard fa.processed.merge w
discard ctx.data.coveredAccounts.merge w discard ctx.pool.coveredAccounts.merge w
ctx.pivotAccountsCoverage100PcRollOver() # update coverage level roll over ctx.pivotAccountsCoverage100PcRollOver() # update coverage level roll over
# Register accounts with storage slots on the storage TODO list. # Register accounts with storage slots on the storage TODO list.

View File

@ -124,20 +124,20 @@ proc storeStoragesSingleBatch(
let rc = await buddy.getStorageRanges(stateRoot, req, pivot) let rc = await buddy.getStorageRanges(stateRoot, req, pivot)
if rc.isErr: if rc.isErr:
let error = rc.error let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): if await buddy.ctrl.stopAfterSeriousComError(error, buddy.only.errors):
trace logTxt "fetch error => stop", peer, ctx=buddy.fetchCtx(env), trace logTxt "fetch error => stop", peer, ctx=buddy.fetchCtx(env),
nReq=req.len, error nReq=req.len, error
return false # all of `req` failed return false # all of `req` failed
rc.value rc.value
# Reset error counts for detecting repeated timeouts, network errors, etc. # Reset error counts for detecting repeated timeouts, network errors, etc.
buddy.data.errors.resetComError() buddy.only.errors.resetComError()
var gotSlotLists = stoRange.data.storages.len var gotSlotLists = stoRange.data.storages.len
if 0 < gotSlotLists: if 0 < gotSlotLists:
# Verify/process storages data and save it to disk # Verify/process storages data and save it to disk
let report = ctx.data.snapDb.importStorageSlots( let report = ctx.pool.snapDb.importStorageSlots(
peer, stoRange.data, noBaseBoundCheck = true) peer, stoRange.data, noBaseBoundCheck = true)
if 0 < report.len: if 0 < report.len:

View File

@ -215,7 +215,7 @@ proc storageQueueFetchFull*(
noExceptionOops("getNextSlotItemsFull"): noExceptionOops("getNextSlotItemsFull"):
for kvp in env.fetchStorageFull.nextPairs: for kvp in env.fetchStorageFull.nextPairs:
let let
getFn = ctx.data.snapDb.getStorageSlotsFn kvp.data.accKey getFn = ctx.pool.snapDb.getStorageSlotsFn kvp.data.accKey
rootKey = kvp.key.to(NodeKey) rootKey = kvp.key.to(NodeKey)
accItem = AccountSlotsHeader( accItem = AccountSlotsHeader(
accKey: kvp.data.accKey, accKey: kvp.data.accKey,

View File

@ -259,9 +259,9 @@ proc swapInAccounts*(
let let
pivot {.used.} = "#" & $env.stateHeader.blockNumber # Logging & debugging pivot {.used.} = "#" & $env.stateHeader.blockNumber # Logging & debugging
rootKey = env.stateHeader.stateRoot.to(NodeKey) rootKey = env.stateHeader.stateRoot.to(NodeKey)
getFn = ctx.data.snapDb.getAccountFn getFn = ctx.pool.snapDb.getAccountFn
others = toSeq(ctx.data.pivotTable.nextPairs) others = toSeq(ctx.pool.pivotTable.nextPairs)
# Swap in from mothballed pivots different from the current one # Swap in from mothballed pivots different from the current one
.filterIt(it.data.archived and it.key.to(NodeKey) != rootKey) .filterIt(it.data.archived and it.key.to(NodeKey) != rootKey)

View File

@ -129,15 +129,15 @@ proc hash*(a: Hash256): Hash =
proc pivotAccountsCoverage*(ctx: SnapCtxRef): float = proc pivotAccountsCoverage*(ctx: SnapCtxRef): float =
## Returns the accounts coverage factor ## Returns the accounts coverage factor
ctx.data.coveredAccounts.fullFactor + ctx.data.covAccTimesFull.float ctx.pool.coveredAccounts.fullFactor + ctx.pool.covAccTimesFull.float
proc pivotAccountsCoverage100PcRollOver*(ctx: SnapCtxRef) = proc pivotAccountsCoverage100PcRollOver*(ctx: SnapCtxRef) =
## Roll over `coveredAccounts` registry when it reaches 100%. ## Roll over `coveredAccounts` registry when it reaches 100%.
if ctx.data.coveredAccounts.isFull: if ctx.pool.coveredAccounts.isFull:
# All of accounts hashes are covered by completed range fetch processes # All of accounts hashes are covered by completed range fetch processes
# for all pivot environments. So reset covering and record full-ness level. # for all pivot environments. So reset covering and record full-ness level.
ctx.data.covAccTimesFull.inc ctx.pool.covAccTimesFull.inc
ctx.data.coveredAccounts.clear() ctx.pool.coveredAccounts.clear()
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Public helpers: SnapTodoRanges # Public helpers: SnapTodoRanges

View File

@ -42,7 +42,7 @@ type
ctx*: CtxRef[S] ## Shared data descriptor back reference ctx*: CtxRef[S] ## Shared data descriptor back reference
peer*: Peer ## Reference to eth p2pProtocol entry peer*: Peer ## Reference to eth p2pProtocol entry
ctrl*: BuddyCtrlRef ## Control and state settings ctrl*: BuddyCtrlRef ## Control and state settings
data*: W ## Worker peer specific data only*: W ## Worker peer specific data
CtxRef*[S] = ref object CtxRef*[S] = ref object
## Shared state among all syncing peer workers (aka buddies.) ## Shared state among all syncing peer workers (aka buddies.)
@ -51,7 +51,7 @@ type
chain*: ChainRef ## Block chain database (no need for `Peer`) chain*: ChainRef ## Block chain database (no need for `Peer`)
poolMode*: bool ## Activate `runPool()` workers if set `true` poolMode*: bool ## Activate `runPool()` workers if set `true`
daemon*: bool ## Enable global background job daemon*: bool ## Enable global background job
data*: S ## Shared context for all worker peers pool*: S ## Shared context for all worker peers
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Public functions # Public functions

View File

@ -178,7 +178,7 @@ proc testDbs(
else: else:
result.dbDir = workDir / "tmp" result.dbDir = workDir / "tmp"
if result.persistent: if result.persistent:
result.dbDir.flushDbDir workDir.flushDbDir(subDir)
for n in 0 ..< min(result.cdb.len, instances): for n in 0 ..< min(result.cdb.len, instances):
result.cdb[n] = (result.dbDir / $n).newChainDB result.cdb[n] = (result.dbDir / $n).newChainDB
@ -510,7 +510,7 @@ when isMainModule:
#setTraceLevel() #setTraceLevel()
setErrorLevel() setErrorLevel()
# Test constant, calculations etc. # Test constants, calculations etc.
when true: # and false: when true: # and false:
noisy.miscRunner() noisy.miscRunner()
@ -546,7 +546,8 @@ when isMainModule:
false.storagesRunner(persistent=true, sam) false.storagesRunner(persistent=true, sam)
# This one uses the readily available dump: `bulkTest0` and some huge replay # This one uses the readily available dump: `bulkTest0` and some huge replay
# dumps `bulkTest1`, `bulkTest2`, .. from the `nimbus-eth1-blobs` package # dumps `bulkTest1`, `bulkTest2`, .. from the `nimbus-eth1-blobs` package.
# For specs see `tests/test_sync_snap/bulk_test_xx.nim`.
when true and false: when true and false:
# ---- database storage timings ------- # ---- database storage timings -------