Prepare snap client for continuing with full sync (#1534)

* Somewhat tighten error handling

why:
  Zombie state is invoked when the current peer turns out to be useless
  for further communication. While there is a chance to further talk
  to a peer about another topic (aka healing) after some protocol failure,
  it makes no sense to do so after a network problem.

  The latter state is explained bu the `peerDegraded` flag that goes
  together with the `zombie` state flag. A degraded peer is dropped
  immediately.

* Remove `--sync-mode=snapCtx` option, always start snap in recovery mode

why:
  No need for a snap sync option without recovery mode, can be achieved
  by deleting the database.

* Code cosmetics, typos, prettify logging, debugging helper, etc.

* Split off snap sync sub-mode handler into separate modules

details:
  The original `worker.nim` source has become a multiplexer for several
  snap sync sub-modes `full` and `snap`. The source modules of the
  incarnations of a particular sync sub-mode are places into the
  `worker/play` directory.

* Update ticker for snap and full sync logging
This commit is contained in:
Jordan Hrycaj 2023-04-06 20:42:07 +01:00 committed by GitHub
parent 2d51196645
commit 9facab91cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 758 additions and 317 deletions

View File

@ -139,7 +139,6 @@ type
Default
Full ## Beware, experimental
Snap ## Beware, experimental
SnapCtx ## Beware, experimental
Stateless ## Beware, experimental
NimbusConf* = object of RootObj
@ -173,7 +172,6 @@ type
"- default -- legacy sync mode\n" &
"- full -- full blockchain archive\n" &
"- snap -- experimental snap mode (development only)\n" &
"- snapCtx -- snap considering possible recovery context\n" &
"- stateless -- experimental stateless mode (development only)"
defaultValue: SyncMode.Default
defaultValueDesc: $SyncMode.Default

View File

@ -169,7 +169,6 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
# Early-initialise "--snap-sync" before starting any network connections.
block:
let
noRecovery = conf.syncMode in {SyncMode.SnapCtx}
exCtrlFile = if conf.syncCtrlFile.isNone: none(string)
else: some(conf.syncCtrlFile.get.string)
tickerOK = conf.logLevel in {
@ -179,14 +178,14 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
nimbus.fullSyncRef = FullSyncRef.init(
nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers,
tickerOK, exCtrlFile)
of SyncMode.Snap, SyncMode.SnapCtx:
of SyncMode.Snap:
# Minimal capability needed for sync only
if ProtocolFlag.Snap notin protocols:
nimbus.ethNode.addSnapHandlerCapability(
nimbus.ethNode.peerPool)
nimbus.snapSyncRef = SnapSyncRef.init(
nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers,
nimbus.dbBackend, tickerOK, noRecovery=noRecovery, exCtrlFile)
nimbus.dbBackend, tickerOK, exCtrlFile)
of SyncMode.Stateless:
# FIXME-Adam: what needs to go here?
nimbus.statelessSyncRef = StatelessSyncRef.init()
@ -209,7 +208,7 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
if conf.maxPeers > 0:
var waitForPeers = true
case conf.syncMode:
of SyncMode.Snap, SyncMode.SnapCtx, SyncMode.Stateless:
of SyncMode.Snap, SyncMode.Stateless:
waitForPeers = false
of SyncMode.Full, SyncMode.Default:
discard
@ -434,7 +433,7 @@ proc start(nimbus: NimbusNode, conf: NimbusConf) =
nimbus.fullSyncRef.start
of SyncMode.Stateless:
nimbus.statelessSyncRef.start
of SyncMode.Snap, SyncMode.SnapCtx:
of SyncMode.Snap:
nimbus.snapSyncRef.start
if nimbus.state == Starting:

View File

@ -379,11 +379,11 @@ proc setNewBlockHashesHandler*(ctx: EthWireRef, handler: NewBlockHashesHandler,
# Public getters/setters
# ------------------------------------------------------------------------------
proc `txPoolEnabled=`*(ctx: EthWireRef; ena: bool) =
proc `txPoolEnabled=`*(ctx: EthWireRef; ena: bool) {.gcsafe, raises: [].} =
if ctx.enableTxPool != NotAvailable:
ctx.enableTxPool = if ena: Enabled else: Suspended
proc txPoolEnabled*(ctx: EthWireRef): bool =
proc txPoolEnabled*(ctx: EthWireRef): bool {.gcsafe, raises: [].} =
ctx.enableTxPool == Enabled
# ------------------------------------------------------------------------------

View File

@ -114,7 +114,6 @@ proc init*(
maxPeers: int;
dbBackend: ChainDb;
enableTicker = false;
noRecovery = false;
exCtrlFile = none(string);
): T =
new result
@ -122,7 +121,6 @@ proc init*(
result.ctx.chain = chain # explicitely override
result.ctx.pool.rng = rng
result.ctx.pool.dbBackend = dbBackend
result.ctx.pool.noRecovery = noRecovery
# Required to have been initialised via `addEthHandlerCapability()`
doAssert not result.ctx.ethWireCtx.isNil

View File

@ -26,6 +26,25 @@ logScope:
# Public functions
# ------------------------------------------------------------------------------
proc updateBeaconHeaderbuBlockNumber*(
buddy: SnapBuddyRef; # Worker peer
num: BlockNumber; # Block number to sync against
) {.async.} =
## This function updates the beacon header according to the blok number
## argument.
##
## This function is typically used for testing and debugging.
let
ctx = buddy.ctx
peer = buddy.peer
trace "fetch beacon header", peer, num
if ctx.pool.beaconHeader.blockNumber < num:
let rc = await buddy.getBlockHeader(num)
if rc.isOk:
ctx.pool.beaconHeader = rc.value
proc updateBeaconHeaderFromFile*(
buddy: SnapBuddyRef; # Worker peer
) {.async.} =
@ -62,10 +81,8 @@ proc updateBeaconHeaderFromFile*(
if ctx.pool.beaconHeader.blockNumber < num:
rc = await buddy.getBlockHeader(num)
except CatchableError as e:
let
name {.used.} = $e.name
msg {.used.} = e.msg
trace "Exception while parsing beacon info", peer, isHash, name, msg
trace "Exception while parsing beacon info", peer, isHash,
name=($e.name), msg=(e.msg)
if rc.isOk:
if ctx.pool.beaconHeader.blockNumber < rc.value.blockNumber:

View File

@ -8,91 +8,91 @@
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
std/[options, sets],
chronicles,
chronos,
eth/[common, p2p],
stew/[interval_set, keyed_queue],
../../common as nimcom,
../../db/select_backend,
".."/[handlers, protocol, sync_desc],
./worker/[pivot, ticker],
./worker/com/com_error,
./worker/db/[hexary_desc, snapdb_desc, snapdb_pivot],
"."/[range_desc, update_beacon_header, worker_desc]
{.push raises: [].}
logScope:
topics = "snap-buddy"
import
chronicles,
chronos,
eth/p2p,
stew/[interval_set, keyed_queue],
"../.."/[common, db/select_backend],
".."/[handlers/eth, protocol, sync_desc],
./worker/[pivot, play, ticker],
./worker/com/com_error,
./worker/db/[snapdb_desc, snapdb_pivot],
"."/[range_desc, worker_desc]
const
extraTraceMessages = false or true
## Enabled additional logging noise
logScope:
topics = "snap-worker"
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
template noExceptionOops(info: static[string]; code: untyped) =
template ignoreException(info: static[string]; code: untyped) =
try:
code
except CatchableError as e:
raiseAssert "Inconveivable (" &
info & "): name=" & $e.name & " msg=" & e.msg
error "Exception at " & info & ":", name=($e.name), msg=(e.msg)
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} =
let recov = ctx.pool.recovery
if recov.isNil:
return false
proc disableWireServices(ctx: SnapCtxRef) =
## Helper for `setup()`: Temporarily stop useless wire protocol services.
ctx.ethWireCtx.txPoolEnabled = false
let
checkpoint =
"#" & $recov.state.header.blockNumber & "(" & $recov.level & ")"
topLevel = recov.level == 0
env = block:
let rc = ctx.pool.pivotTable.eq recov.state.header.stateRoot
if rc.isErr:
error "Recovery pivot context gone", checkpoint, topLevel
return false
rc.value
proc enableWireServices(ctx: SnapCtxRef) =
## Helper for `release()`
ctx.ethWireCtx.txPoolEnabled = true
# Cosmetics: allow other processes (e.g. ticker) to log the current recovery
# state. There is no other intended purpose of this wait state.
await sleepAsync 1100.milliseconds
# --------------
#when extraTraceMessages:
# trace "Recovery continued ...", checkpoint, topLevel,
# nAccounts=recov.state.nAccounts, nDangling=recov.state.dangling.len
proc enableTicker(ctx: SnapCtxRef; tickerOK: bool) =
## Helper for `setup()`: Log/status ticker
if tickerOK:
ctx.pool.ticker = TickerRef.init(ctx.pool.pivotTable.tickerStats(ctx))
else:
trace "Ticker is disabled"
# Update pivot data from recovery checkpoint
env.recoverPivotFromCheckpoint(ctx, topLevel)
proc disableTicker(ctx: SnapCtxRef) =
## Helper for `release()`
if not ctx.pool.ticker.isNil:
ctx.pool.ticker.stop()
ctx.pool.ticker = nil
# Fetch next recovery record if there is any
if recov.state.predecessor.isZero:
#when extraTraceMessages:
# trace "Recovery done", checkpoint, topLevel
return false
let rc = ctx.pool.snapDb.recoverPivot(recov.state.predecessor)
if rc.isErr:
when extraTraceMessages:
trace "Recovery stopped at pivot stale checkpoint", checkpoint, topLevel
return false
# --------------
# Set up next level pivot checkpoint
ctx.pool.recovery = SnapRecoveryRef(
state: rc.value,
level: recov.level + 1)
proc enableRpcMagic(ctx: SnapCtxRef) =
## Helper for `setup()`: Enable external pivot update via RPC
ctx.chain.com.syncReqNewHead = ctx.pivotUpdateBeaconHeaderCB
# Push onto pivot table and continue recovery (i.e. do not stop it yet)
ctx.pool.pivotTable.reverseUpdate(ctx.pool.recovery.state.header, ctx)
proc disableRpcMagic(ctx: SnapCtxRef) =
## Helper for `release()`
ctx.chain.com.syncReqNewHead = nil
return true # continue recovery
# --------------
proc detectSnapSyncRecovery(ctx: SnapCtxRef) =
## Helper for `setup()`: Initiate snap sync recovery (if any)
let rc = ctx.pool.snapDb.pivotRecoverDB()
if rc.isOk:
ctx.pool.recovery = SnapRecoveryRef(state: rc.value)
ctx.daemon = true
# Set up early initial pivot
ctx.pool.pivotTable.reverseUpdate(ctx.pool.recovery.state.header, ctx)
trace "Snap sync recovery started",
checkpoint=("#" & $ctx.pool.pivotTable.topNumber() & "(0)")
if not ctx.pool.ticker.isNil:
ctx.pool.ticker.startRecovery()
proc initSnapDb(ctx: SnapCtxRef) =
## Helper for `setup()`: Initialise snap sync database layer
ctx.pool.snapDb =
if ctx.pool.dbBackend.isNil: SnapDbRef.init(ctx.chain.db.db)
else: SnapDbRef.init(ctx.pool.dbBackend)
# ------------------------------------------------------------------------------
# Public start/stop and admin functions
@ -100,32 +100,18 @@ proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} =
proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool =
## Global set up
# For snap sync book keeping
ctx.pool.coveredAccounts = NodeTagRangeSet.init()
noExceptionOops("worker.setup()"):
ctx.ethWireCtx.txPoolEnabled = false
ctx.chain.com.syncReqNewHead = ctx.pivotUpdateBeaconHeaderCB
ctx.pool.snapDb =
if ctx.pool.dbBackend.isNil: SnapDbRef.init(ctx.chain.db.db)
else: SnapDbRef.init(ctx.pool.dbBackend)
if tickerOK:
ctx.pool.ticker = TickerRef.init(ctx.pool.pivotTable.tickerStats(ctx))
else:
trace "Ticker is disabled"
# Check for recovery mode
if not ctx.pool.noRecovery:
let rc = ctx.pool.snapDb.recoverPivot()
if rc.isOk:
ctx.pool.recovery = SnapRecoveryRef(state: rc.value)
ctx.daemon = true
# Set up early initial pivot
ctx.pool.pivotTable.reverseUpdate(ctx.pool.recovery.state.header, ctx)
trace "Recovery started",
checkpoint=("#" & $ctx.pool.pivotTable.topNumber() & "(0)")
if not ctx.pool.ticker.isNil:
ctx.pool.ticker.startRecovery()
ctx.enableRpcMagic() # Allow external pivot update via RPC
ctx.disableWireServices() # Stop unwanted public services
ctx.pool.syncMode.playInit() # Set up sync sub-mode specs.
ctx.initSnapDb() # Set database backend, subject to change
ctx.detectSnapSyncRecovery() # Check for recovery mode
ctx.enableTicker(tickerOK) # Start log/status ticker (if any)
# Experimental, also used for debugging
if ctx.exCtrlFile.isSome:
warn "Snap sync accepts pivot block number or hash",
syncCtrlFile=ctx.exCtrlFile.get
@ -133,12 +119,10 @@ proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool =
proc release*(ctx: SnapCtxRef) =
## Global clean up
if not ctx.pool.ticker.isNil:
ctx.pool.ticker.stop()
ctx.pool.ticker = nil
noExceptionOops("worker.release()"):
ctx.ethWireCtx.txPoolEnabled = true
ctx.chain.com.syncReqNewHead = nil
ctx.disableTicker() # Stop log/status ticker (if any)
ctx.enableWireServices() # re-enable public services
ctx.disableRpcMagic() # Disable external pivot update via RPC
proc start*(buddy: SnapBuddyRef): bool =
## Initialise worker peer
@ -160,127 +144,28 @@ proc stop*(buddy: SnapBuddyRef) =
ctx.pool.ticker.stopBuddy()
# ------------------------------------------------------------------------------
# Public functions
# Public functions, sync handler multiplexers
# ------------------------------------------------------------------------------
proc runDaemon*(ctx: SnapCtxRef) {.async.} =
## Enabled while `ctx.daemon` is `true`
##
if not ctx.pool.recovery.isNil:
if not await ctx.recoveryStepContinue():
# Done, stop recovery
ctx.pool.recovery = nil
ctx.daemon = false
# Update logging
if not ctx.pool.ticker.isNil:
ctx.pool.ticker.stopRecovery()
## Sync processsing multiplexer
ignoreException("runDaemon"):
await ctx.playSyncSpecs.daemon(ctx)
proc runSingle*(buddy: SnapBuddyRef) {.async.} =
## Enabled while
## * `buddy.ctrl.multiOk` is `false`
## * `buddy.ctrl.poolMode` is `false`
##
let ctx = buddy.ctx
# External beacon header updater
await buddy.updateBeaconHeaderFromFile()
await buddy.pivotApprovePeer()
buddy.ctrl.multiOk = true
## Sync processsing multiplexer
ignoreException("runSingle"):
await buddy.ctx.playSyncSpecs.single(buddy)
proc runPool*(buddy: SnapBuddyRef, last: bool): bool =
## Enabled when `buddy.ctrl.poolMode` is `true`
##
let ctx = buddy.ctx
ctx.poolMode = false
result = true
# Clean up empty pivot slots (never the top one)
var rc = ctx.pool.pivotTable.beforeLast
while rc.isOK:
let (key, env) = (rc.value.key, rc.value.data)
if env.fetchAccounts.processed.isEmpty:
ctx.pool.pivotTable.del key
rc = ctx.pool.pivotTable.prev(key)
## Sync processsing multiplexer
ignoreException("runPool"):
result = buddy.ctx.playSyncSpecs.pool(buddy,last)
proc runMulti*(buddy: SnapBuddyRef) {.async.} =
## Enabled while
## * `buddy.ctx.multiOk` is `true`
## * `buddy.ctx.poolMode` is `false`
##
let
ctx = buddy.ctx
peer = buddy.peer
# Set up current state root environment for accounts snapshot
let
env = block:
let rc = ctx.pool.pivotTable.lastValue
if rc.isErr:
return # nothing to do
rc.value
pivot = "#" & $env.stateHeader.blockNumber # for logging
nStorQuAtStart = env.fetchStorageFull.len +
env.fetchStoragePart.len +
env.parkedStorage.len
buddy.only.pivotEnv = env
# Full sync processsing based on current snapshot
# -----------------------------------------------
# Check whether this pivot is fully downloaded
if env.fetchAccounts.processed.isFull and nStorQuAtStart == 0:
trace "Snap full sync -- not implemented yet", peer, pivot
await sleepAsync(5.seconds)
# flip over to single mode for getting new instructins
buddy.ctrl.multiOk = false
return
# Snapshot sync processing
# ------------------------
# If this is a new pivot, the previous one can be cleaned up. There is no
# point in keeping some older space consuming state data any longer.
ctx.pool.pivotTable.beforeTopMostlyClean()
when extraTraceMessages:
block:
trace "Multi sync runner", peer, pivot, nAccounts=env.nAccounts,
nSlotLists=env.nSlotLists,
processed=env.fetchAccounts.processed.fullPC3,
nStoQu=nStorQuAtStart
# This one is the syncing work horse which downloads the database
await env.execSnapSyncAction(buddy)
# Various logging entries (after accounts and storage slots download)
let
nAccounts {.used.} = env.nAccounts
nSlotLists {.used.} = env.nSlotLists
processed {.used.} = env.fetchAccounts.processed.fullPC3
nStoQuLater {.used.} = env.fetchStorageFull.len + env.fetchStoragePart.len
if env.archived:
# Archive pivot if it became stale
when extraTraceMessages:
trace "Mothballing", peer, pivot, nAccounts, nSlotLists
env.pivotMothball()
else:
# Save state so sync can be partially resumed at next start up
let rc = env.saveCheckpoint(ctx)
if rc.isErr:
error "Failed to save recovery checkpoint", peer, pivot, nAccounts,
nSlotLists, processed, nStoQu=nStoQuLater, error=rc.error
else:
when extraTraceMessages:
trace "Saved recovery checkpoint", peer, pivot, nAccounts, nSlotLists,
processed, nStoQu=nStoQuLater, blobSize=rc.value
## Sync processsing multiplexer
ignoreException("runMulti"):
await buddy.ctx.playSyncSpecs.multi(buddy)
# ------------------------------------------------------------------------------
# End

View File

@ -19,6 +19,7 @@ type
ComErrorStatsRef* = ref object
## particular error counters so connections will not be cut immediately
## after a particular error.
peerDegraded*: bool
nTimeouts*: uint
nNoData*: uint
nNetwork*: uint
@ -61,6 +62,7 @@ proc stopAfterSeriousComError*(
if comErrorsTimeoutMax < stats.nTimeouts:
# Mark this peer dead, i.e. avoid fetching from this peer for a while
ctrl.zombie = true
stats.peerDegraded = true
return true
when 0 < comErrorsTimeoutSleepMSecs:
@ -71,6 +73,7 @@ proc stopAfterSeriousComError*(
stats.nNetwork.inc
if comErrorsNetworkMax < stats.nNetwork:
ctrl.zombie = true
stats.peerDegraded = true
return true
when 0 < comErrorsNetworkSleepMSecs:
@ -84,6 +87,7 @@ proc stopAfterSeriousComError*(
ComNoTrieNodesAvailable:
stats.nNoData.inc
if comErrorsNoDataMax < stats.nNoData:
# Mark this peer dead, i.e. avoid fetching from this peer for a while
ctrl.zombie = true
return true

View File

@ -44,37 +44,37 @@ template handleRlpException(info: static[string]; code: untyped) =
# Public functions
# ------------------------------------------------------------------------------
proc savePivot*(
proc pivotSaveDB*(
pv: SnapDbRef; ## Base descriptor on `ChainDBRef`
data: SnapDbPivotRegistry; ## Registered data record
): Result[int,HexaryError] =
## Register pivot environment
handleRlpException("savePivot()"):
handleRlpException("pivotSaveDB()"):
let rlpData = rlp.encode(data)
pv.kvDb.persistentStateRootPut(data.header.stateRoot.to(NodeKey), rlpData)
return ok(rlpData.len)
# notreached
proc recoverPivot*(
proc pivotRecoverDB*(
pv: SnapDbRef; ## Base descriptor on `ChainDBRef`
stateRoot: NodeKey; ## Check for a particular state root
): Result[SnapDbPivotRegistry,HexaryError] =
## Restore pivot environment for a particular state root.
let rc = pv.kvDb.persistentStateRootGet(stateRoot)
if rc.isOk:
handleRlpException("recoverPivot()"):
handleRlpException("rpivotRecoverDB()"):
var r = rlp.decode(rc.value.data, SnapDbPivotRegistry)
r.predecessor = rc.value.key
return ok(r)
err(StateRootNotFound)
proc recoverPivot*(
proc pivotRecoverDB*(
pv: SnapDbRef; ## Base descriptor on `ChainDBRef`
): Result[SnapDbPivotRegistry,HexaryError] =
## Restore pivot environment that was saved latest.
let rc = pv.kvDb.persistentStateRootGet(NodeKey.default)
if rc.isOk:
return pv.recoverPivot(rc.value.key)
return pv.pivotRecoverDB(rc.value.key)
err(StateRootNotFound)
# ------------------------------------------------------------------------------

View File

@ -121,7 +121,7 @@ proc reverseUpdate*(
proc tickerStats*(
pivotTable: var SnapPivotTable; # Pivot table
ctx: SnapCtxRef; # Some global context
): TickerStatsUpdater =
): TickerSnapStatsUpdater =
## This function returns a function of type `TickerStatsUpdater` that prints
## out pivot table statitics. The returned fuction is supposed to drive
## ticker` module.
@ -134,7 +134,7 @@ proc tickerStats*(
if rSq < sqSumAv:
result[1] = sqrt(sqSum / length.float - result[0] * result[0])
result = proc: SnapTickerStats =
result = proc: TickerSnapStats =
var
aSum, aSqSum, uSum, uSqSum, sSum, sSqSum: float
count = 0
@ -172,7 +172,7 @@ proc tickerStats*(
if 0 < ctx.pool.beaconHeader.blockNumber:
beaconBlock = some(ctx.pool.beaconHeader.blockNumber)
SnapTickerStats(
TickerSnapStats(
beaconBlock: beaconBlock,
pivotBlock: pivotBlock,
nQueues: ctx.pool.pivotTable.len,
@ -239,7 +239,7 @@ proc execSnapSyncAction*(
await buddy.rangeFetchStorageSlots(env)
else:
rangeFetchOk = false
if env.archived:
if env.archived or (buddy.ctrl.zombie and buddy.only.errors.peerDegraded):
return
# Uncconditonally try healing if enabled.
@ -250,7 +250,7 @@ proc execSnapSyncAction*(
# physically disconnected.
buddy.ctrl.forceRun = true
await buddy.healAccounts(env)
if env.archived:
if env.archived or (buddy.ctrl.zombie and buddy.only.errors.peerDegraded):
return
# Some additional storage slots might have been popped up
@ -287,7 +287,7 @@ proc saveCheckpoint*(
if accountsSaveStorageSlotsMax < nStoQu:
return err(TooManySlotAccounts)
ctx.pool.snapDb.savePivot SnapDbPivotRegistry(
ctx.pool.snapDb.pivotSaveDB SnapDbPivotRegistry(
header: env.stateHeader,
nAccounts: env.nAccounts,
nSlotLists: env.nSlotLists,
@ -298,7 +298,7 @@ proc saveCheckpoint*(
toSeq(env.parkedStorage.items))
proc recoverPivotFromCheckpoint*(
proc pivotRecoverFromCheckpoint*(
env: SnapPivotRef; # Current pivot environment
ctx: SnapCtxRef; # Global context (containing save state)
topLevel: bool; # Full data set on top level only

View File

@ -56,7 +56,7 @@ logScope:
topics = "snap-acc"
const
extraTraceMessages = false or true
extraTraceMessages = false # or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
@ -64,7 +64,7 @@ const
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"Accounts healing " & info
"Accounts heal " & info
proc `$`(node: NodeSpecs): string =
node.partialPath.toHex
@ -341,9 +341,7 @@ proc healAccounts*(
env: SnapPivotRef;
) {.async.} =
## Fetching and merging missing account trie database nodes.
when extraTraceMessages:
let peer {.used.} = buddy.peer
trace logTxt "started", peer, ctx=buddy.healingCtx(env)
trace logTxt "started", peer=buddy.peer, ctx=buddy.healingCtx(env)
let
fa = env.fetchAccounts
@ -362,9 +360,8 @@ proc healAccounts*(
nNodesFetched.inc(nNodes)
nFetchLoop.inc
when extraTraceMessages:
trace logTxt "done", peer, ctx=buddy.healingCtx(env),
nNodesFetched, nFetchLoop, nIgnore=ignore.len
trace logTxt "done", peer=buddy.peer, ctx=buddy.healingCtx(env),
nNodesFetched, nFetchLoop, nIgnore=ignore.len
# ------------------------------------------------------------------------------
# End

View File

@ -58,7 +58,7 @@ logScope:
topics = "snap-slot"
const
extraTraceMessages = false or true
extraTraceMessages = false # or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
@ -66,7 +66,7 @@ const
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"Storage slots healing " & info
"Storage slots heal " & info
proc `$`(node: NodeSpecs): string =
node.partialPath.toHex
@ -344,9 +344,7 @@ proc healStorageSlots*(
env: SnapPivotRef;
) {.async.} =
## Fetching and merging missing slorage slots trie database nodes.
when extraTraceMessages:
let peer {.used.} = buddy.peer
trace logTxt "started", peer, ctx=buddy.healingCtx(env)
trace logTxt "started", peer=buddy.peer, ctx=buddy.healingCtx(env)
var
nNodesFetched = 0
@ -363,8 +361,8 @@ proc healStorageSlots*(
let rc = env.storageQueueUnlinkPartialItem visited
if rc.isErr:
when extraTraceMessages:
trace logTxt "queue exhausted", peer, ctx=buddy.healingCtx(env),
nIgnore=ignore.len, nVisited=visited.len
trace logTxt "queue exhausted", peer=buddy.peer,
ctx=buddy.healingCtx(env), nIgnore=ignore.len, nVisited=visited.len
break
rc.value
@ -383,9 +381,8 @@ proc healStorageSlots*(
ignore = ignore + rejected
nNodesFetched.inc(nNodes)
when extraTraceMessages:
trace logTxt "done", peer, ctx=buddy.healingCtx(env),
nNodesFetched, nFetchLoop, nIgnore=ignore.len, nVisited=visited.len
trace logTxt "done", peer=buddy.peer, ctx=buddy.healingCtx(env),
nNodesFetched, nFetchLoop, nIgnore=ignore.len, nVisited=visited.len
# ------------------------------------------------------------------------------
# End

View File

@ -60,7 +60,7 @@ logScope:
topics = "snap-acc"
const
extraTraceMessages = false or true
extraTraceMessages = false # or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
@ -68,7 +68,7 @@ const
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"Accounts range " & info
"Accounts fetch " & info
proc `$`(rs: NodeTagRangeSet): string =
rs.fullPC3
@ -166,9 +166,9 @@ proc accountsRangefetchImpl(
if rc.isErr:
# Bad data, just try another peer
buddy.ctrl.zombie = true
when extraTraceMessages:
trace logTxt "import failed", peer, ctx=buddy.fetchCtx(env),
gotAccounts, gotStorage, reqLen=iv, covered, error=rc.error
# Failed to store on database, not much that can be done here
error logTxt "import failed", peer, ctx=buddy.fetchCtx(env),
gotAccounts, gotStorage, reqLen=iv, covered, error=rc.error
return
rc.value
@ -218,13 +218,12 @@ proc rangeFetchAccounts*(
env: SnapPivotRef;
) {.async.} =
## Fetch accounts and store them in the database.
trace logTxt "start", peer=buddy.peer, ctx=buddy.fetchCtx(env)
let fa = env.fetchAccounts
var nFetchAccounts = 0 # for logging
if not fa.processed.isFull():
when extraTraceMessages:
trace logTxt "start", peer=buddy.peer, ctx=buddy.fetchCtx(env)
var nFetchAccounts = 0 # for logging
while not fa.processed.isFull() and
buddy.ctrl.running and
not env.archived:
@ -239,9 +238,7 @@ proc rangeFetchAccounts*(
if storageSlotsQuPrioThresh < nStoQu:
break
when extraTraceMessages:
trace logTxt "done", peer=buddy.peer, ctx=buddy.fetchCtx(env),
nFetchAccounts
trace logTxt "done", peer=buddy.peer, ctx=buddy.fetchCtx(env), nFetchAccounts
# ------------------------------------------------------------------------------
# End

View File

@ -81,14 +81,14 @@ logScope:
topics = "snap-slot"
const
extraTraceMessages = false or true
extraTraceMessages = false # or true
# ------------------------------------------------------------------------------
# Private logging helpers
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"Storage slots range " & info
"Storage slots fetch " & info
proc fetchCtx(
buddy: SnapBuddyRef;
@ -142,6 +142,8 @@ proc fetchStorageSlotsImpl(
let report = ctx.pool.snapDb.importStorageSlots(peer, stoRange.data)
if 0 < report.len:
if report[^1].slot.isNone:
# Bad data, just try another peer
buddy.ctrl.zombie = true
# Failed to store on database, not much that can be done here
error logTxt "import failed", peer, ctx=buddy.fetchCtx(env),
nSlotLists=0, nReq=req.len, error=report[^1].error
@ -202,8 +204,7 @@ proc rangeFetchStorageSlots*(
## each work item on the queue at least once.For partial partial slot range
## items this means in case of success that the outstanding range has become
## at least smaller.
when extraTraceMessages:
trace logTxt "start", peer=buddy.peer, ctx=buddy.fetchCtx(env)
trace logTxt "start", peer=buddy.peer, ctx=buddy.fetchCtx(env)
# Fetch storage data and save it on disk. Storage requests are managed by
# request queues for handling full/partial replies and re-fetch issues. For
@ -252,8 +253,7 @@ proc rangeFetchStorageSlots*(
# End `while`
# End `for`
when extraTraceMessages:
trace logTxt "done", peer=buddy.peer, ctx=buddy.fetchCtx(env)
trace logTxt "done", peer=buddy.peer, ctx=buddy.fetchCtx(env)
# ------------------------------------------------------------------------------
# End

View File

@ -58,7 +58,7 @@ type
pivot: SnapPivotRef ## Accounts only
const
extraTraceMessages = false or true
extraTraceMessages = false # or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,25 @@
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises: [].}
import
../worker_desc,
./play/[play_desc, play_full_sync, play_prep_full, play_snap_sync]
export
PlaySyncSpecs,
playSyncSpecs,
`playMode=`
proc playInit*(desc: var SnapSyncSpecs) =
## Set up sync mode specs table. This cannot be done at compile time.
desc.tab[SnapSyncMode] = playSnapSyncSpecs()
desc.tab[PreFullSyncMode] = playPrepFullSpecs()
desc.tab[FullSyncMode] = playFullSyncSpecs()
# End

View File

@ -0,0 +1,46 @@
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises: [].}
import
chronos,
../../../sync_desc,
../../worker_desc
type
PlayVoidFutureCtxFn* = proc(
ctx: SnapCtxRef): Future[void] {.gcsafe, raises: [CatchableError].}
PlayVoidFutureBuddyFn* = proc(
buddy: SnapBuddyRef): Future[void] {.gcsafe, raises: [CatchableError].}
PlayBoolBuddyFn* = proc(
buddy: SnapBuddyRef, last: bool): bool {.gcsafe, raises: [CatchableError].}
PlaySyncSpecs* = ref object of RootRef
## Holds sync mode specs & methods for a particular sync state
pool*: PlayBoolBuddyFn
daemon*: PlayVoidFutureCtxFn
single*: PlayVoidFutureBuddyFn
multi*: PlayVoidFutureBuddyFn
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc playSyncSpecs*(ctx: SnapCtxRef): PlaySyncSpecs =
## Getter
ctx.pool.syncMode.tab[ctx.pool.syncMode.active].PlaySyncSpecs
proc `playMode=`*(ctx: SnapCtxRef; val: SnapSyncModeType) =
## Setter
ctx.pool.syncMode.active = val
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,65 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises: [].}
import
chronicles,
chronos,
eth/p2p,
../../../sync_desc,
../../worker_desc,
play_desc
const
extraTraceMessages = false or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private functions, full sync handlers
# ------------------------------------------------------------------------------
proc fullSyncPool(buddy: SnapBuddyRef, last: bool): bool =
buddy.ctx.poolMode = false
true
proc fullSyncDaemon(ctx: SnapCtxRef) {.async.} =
ctx.daemon = false
proc fullSyncSingle(buddy: SnapBuddyRef) {.async.} =
buddy.ctrl.multiOk = true
proc fullSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} =
## Full sync processing
let
ctx = buddy.ctx
peer = buddy.peer
trace "Snap full sync -- not implemented yet", peer
await sleepAsync(5.seconds)
# flip over to single mode for getting new instructins
buddy.ctrl.multiOk = false
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc playFullSyncSpecs*: PlaySyncSpecs =
## Return full sync handler environment
PlaySyncSpecs(
pool: fullSyncPool,
daemon: fullSyncDaemon,
single: fullSyncSingle,
multi: fullSyncMulti)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,92 @@
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Transitional handlers preparing for full sync
{.push raises: [].}
import
chronicles,
chronos,
eth/p2p,
stew/keyed_queue,
../../../sync_desc,
../../worker_desc,
../ticker,
play_desc
const
extraTraceMessages = false or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc blindTicker(ctx: SnapCtxRef): TickerFullStatsUpdater =
result = proc: TickerFullStats =
discard
# ------------------------------------------------------------------------------
# Private functions, transitional handlers preparing for full sync
# ------------------------------------------------------------------------------
proc prepFullSyncPool(buddy: SnapBuddyRef, last: bool): bool =
buddy.ctx.poolMode = false
true
proc prepFullSyncDaemon(ctx: SnapCtxRef) {.async.} =
ctx.daemon = false
proc prepFullSyncSingle(buddy: SnapBuddyRef) {.async.} =
## One off, setting up full sync processing in single mode
let
ctx = buddy.ctx
# Fetch latest state root environment
env = block:
let rc = ctx.pool.pivotTable.lastValue
if rc.isErr:
buddy.ctrl.multiOk = false
return
rc.value
peer = buddy.peer
pivot = "#" & $env.stateHeader.blockNumber # for logging
when extraTraceMessages:
trace "Full sync prepare in single mode", peer, pivot
# update ticker (currently blind)
ctx.pool.ticker.init(cb = ctx.blindTicker())
# Cosmetics: allow other processes (e.g. ticker) to log the current
# state. There is no other intended purpose of this wait state.
await sleepAsync 1100.milliseconds
ctx.playMode = FullSyncMode
buddy.ctrl.multiOk = true
proc prepFullSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} =
## One off, setting up full sync processing in single mode
buddy.ctrl.multiOk = false
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc playPrepFullSpecs*: PlaySyncSpecs =
## Return full sync preparation handler environment
PlaySyncSpecs(
pool: prepFullSyncPool,
daemon: prepFullSyncDaemon,
single: prepFullSyncSingle,
multi: prepFullSyncMulti)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,212 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises: [].}
import
chronicles,
chronos,
eth/p2p,
stew/[interval_set, keyed_queue],
../../../sync_desc,
".."/[pivot, ticker],
../pivot/storage_queue_helper,
../db/[hexary_desc, snapdb_pivot],
"../.."/[range_desc, update_beacon_header, worker_desc],
play_desc
logScope:
topics = "snap-play"
const
extraTraceMessages = false or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} =
let recov = ctx.pool.recovery
if recov.isNil:
return false
let
checkpoint =
"#" & $recov.state.header.blockNumber & "(" & $recov.level & ")"
topLevel = recov.level == 0
env = block:
let rc = ctx.pool.pivotTable.eq recov.state.header.stateRoot
if rc.isErr:
error "Recovery pivot context gone", checkpoint, topLevel
return false
rc.value
# Cosmetics: allow other processes (e.g. ticker) to log the current recovery
# state. There is no other intended purpose of this wait state.
await sleepAsync 1100.milliseconds
#when extraTraceMessages:
# trace "Recovery continued ...", checkpoint, topLevel,
# nAccounts=recov.state.nAccounts, nDangling=recov.state.dangling.len
# Update pivot data from recovery checkpoint
env.pivotRecoverFromCheckpoint(ctx, topLevel)
# Fetch next recovery record if there is any
if recov.state.predecessor.isZero:
#when extraTraceMessages:
# trace "Recovery done", checkpoint, topLevel
return false
let rc = ctx.pool.snapDb.pivotRecoverDB(recov.state.predecessor)
if rc.isErr:
when extraTraceMessages:
trace "Recovery stopped at pivot stale checkpoint", checkpoint, topLevel
return false
# Set up next level pivot checkpoint
ctx.pool.recovery = SnapRecoveryRef(
state: rc.value,
level: recov.level + 1)
# Push onto pivot table and continue recovery (i.e. do not stop it yet)
ctx.pool.pivotTable.reverseUpdate(ctx.pool.recovery.state.header, ctx)
return true # continue recovery
# ------------------------------------------------------------------------------
# Private functions, snap sync handlers
# ------------------------------------------------------------------------------
proc snapSyncPool(buddy: SnapBuddyRef, last: bool): bool =
## Enabled when `buddy.ctrl.poolMode` is `true`
##
let ctx = buddy.ctx
ctx.poolMode = false
result = true
# Clean up empty pivot slots (never the top one)
var rc = ctx.pool.pivotTable.beforeLast
while rc.isOK:
let (key, env) = (rc.value.key, rc.value.data)
if env.fetchAccounts.processed.isEmpty:
ctx.pool.pivotTable.del key
rc = ctx.pool.pivotTable.prev(key)
proc snapSyncDaemon(ctx: SnapCtxRef) {.async.} =
## Enabled while `ctx.daemon` is `true`
##
if not ctx.pool.recovery.isNil:
if not await ctx.recoveryStepContinue():
# Done, stop recovery
ctx.pool.recovery = nil
ctx.daemon = false
# Update logging
if not ctx.pool.ticker.isNil:
ctx.pool.ticker.stopRecovery()
proc snapSyncSingle(buddy: SnapBuddyRef) {.async.} =
## Enabled while
## * `buddy.ctrl.multiOk` is `false`
## * `buddy.ctrl.poolMode` is `false`
##
let ctx = buddy.ctx
# External beacon header updater
await buddy.updateBeaconHeaderFromFile()
await buddy.pivotApprovePeer()
buddy.ctrl.multiOk = true
proc snapSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} =
## Enabled while
## * `buddy.ctx.multiOk` is `true`
## * `buddy.ctx.poolMode` is `false`
##
let
ctx = buddy.ctx
# Fetch latest state root environment
env = block:
let rc = ctx.pool.pivotTable.lastValue
if rc.isErr:
buddy.ctrl.multiOk = false
return # nothing to do
rc.value
peer = buddy.peer
pivot = "#" & $env.stateHeader.blockNumber # for logging
fa = env.fetchAccounts
# Check whether this pivot is fully downloaded
if env.fetchAccounts.processed.isFull and env.storageQueueTotal() == 0:
# Switch to full sync => final state
ctx.playMode = PreFullSyncMode
trace "Switch to full sync", peer, pivot, nAccounts=env.nAccounts,
processed=fa.processed.fullPC3, nStoQu=env.storageQueueTotal(),
nSlotLists=env.nSlotLists
return
# If this is a new snap sync pivot, the previous one can be cleaned up and
# archived. There is no point in keeping some older space consuming state
# data any longer.
ctx.pool.pivotTable.beforeTopMostlyClean()
when extraTraceMessages:
trace "Multi sync runner", peer, pivot, nAccounts=env.nAccounts,
processed=fa.processed.fullPC3, nStoQu=env.storageQueueTotal(),
nSlotLists=env.nSlotLists
# This one is the syncing work horse which downloads the database
await env.execSnapSyncAction(buddy)
# Various logging entries (after accounts and storage slots download)
let
nAccounts = env.nAccounts
nSlotLists = env.nSlotLists
processed = fa.processed.fullPC3
# Archive this pivot eveironment if it has become stale
if env.archived:
when extraTraceMessages:
trace "Mothballing", peer, pivot, nAccounts, nSlotLists
env.pivotMothball()
return
# Save state so sync can be resumed at next start up
let rc = env.saveCheckpoint(ctx)
if rc.isOk:
when extraTraceMessages:
trace "Saved recovery checkpoint", peer, pivot, nAccounts, processed,
nStoQu=env.storageQueueTotal(), nSlotLists, blobSize=rc.value
return
error "Failed to save recovery checkpoint", peer, pivot, nAccounts,
processed, nStoQu=env.storageQueueTotal(), nSlotLists, error=rc.error
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc playSnapSyncSpecs*: PlaySyncSpecs =
## Return snap sync handler environment
PlaySyncSpecs(
pool: snapSyncPool,
daemon: snapSyncDaemon,
single: snapSyncSingle,
multi: snapSyncMulti)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -10,7 +10,7 @@
# except according to those terms.
import
std/[strformat, strutils],
std/[options, strformat, strutils],
chronos,
chronicles,
eth/[common, p2p],
@ -24,13 +24,26 @@ logScope:
topics = "snap-tick"
type
# TODO: Seems like a compiler name mangling bug or so. If this is named
# `TickerStats` then `eqeq___syncZsnapZworkerZticker_97` complains
# that the TickerStats object does not have beaconBlock and pivotBlock
# members. So I'm assuming here it seems to take the wrong function, meaning
# the one of the `TickerStats` of full sync, because it has the same name and
# the same module name. Not sure..
SnapTickerStats* = object
TickerSnapStatsUpdater* = proc: TickerSnapStats {.gcsafe, raises: [].}
## Snap sync state update function
TickerFullStatsUpdater* = proc: TickerFullStats {.gcsafe, raises: [].}
## Full sync state update function
SnapDescDetails = object
## Private state descriptor
snapCb: TickerSnapStatsUpdater
recovery: bool
lastRecov: bool
lastStats: TickerSnapStats
FullDescDetails = object
## Private state descriptor
fullCb: TickerFullStatsUpdater
lastStats: TickerFullStats
TickerSnapStats* = object
## Snap sync state (see `TickerSnapStatsUpdater`)
beaconBlock*: Option[BlockNumber]
pivotBlock*: Option[BlockNumber]
nAccounts*: (float,float) ## Mean and standard deviation
@ -40,19 +53,27 @@ type
nStorageQueue*: Option[int]
nQueues*: int
TickerStatsUpdater* =
proc: SnapTickerStats {.gcsafe, raises: [].}
TickerFullStats* = object
## Full sync state (see `TickerFullStatsUpdater`)
topPersistent*: BlockNumber
nextUnprocessed*: Option[BlockNumber]
nextStaged*: Option[BlockNumber]
nStagedQueue*: int
suspended*: bool
reOrg*: bool
TickerRef* = ref object
## Account fetching state that is shared among all peers.
nBuddies: int
recovery: bool
lastRecov: bool
lastStats: SnapTickerStats
statsCb: TickerStatsUpdater
## Ticker descriptor object
nBuddies: int
logTicker: TimerCallback
started: Moment
visited: Moment
started: Moment
visited: Moment
prettyPrint: proc(t: TickerRef) {.gcsafe, raises: [].}
case fullMode: bool
of false:
snap: SnapDescDetails
of true:
full: FullDescDetails
const
tickerStartDelay = chronos.milliseconds(100)
@ -107,8 +128,14 @@ proc pc99(val: float): string =
elif 0.0 < val and val <= 0.01: "1%"
else: val.toPC(0)
proc pp(n: BlockNumber): string =
"#" & $n
proc pp(n: Option[BlockNumber]): string =
if n.isNone: "n/a" else: n.get.pp
# ------------------------------------------------------------------------------
# Private functions: ticking log messages
# Private functions: printing ticker messages
# ------------------------------------------------------------------------------
template noFmtError(info: static[string]; code: untyped) =
@ -117,15 +144,13 @@ template noFmtError(info: static[string]; code: untyped) =
except ValueError as e:
raiseAssert "Inconveivable (" & info & "): " & e.msg
proc setLogTicker(t: TickerRef; at: Moment) {.gcsafe.}
proc runLogTicker(t: TickerRef) {.gcsafe.} =
proc snapTicker(t: TickerRef) {.gcsafe.} =
let
data = t.statsCb()
data = t.snap.snapCb()
now = Moment.now()
if data != t.lastStats or
t.recovery != t.lastRecov or
if data != t.snap.lastStats or
t.snap.recovery != t.snap.lastRecov or
tickerLogSuppressMax < (now - t.visited):
var
nAcc, nSto: string
@ -133,7 +158,7 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
bc = "n/a"
nStoQue = "n/a"
let
recoveryDone = t.lastRecov
recoveryDone = t.snap.lastRecov
accCov = data.accountsFill[0].pc99 &
"(" & data.accountsFill[1].pc99 & ")" &
"/" & data.accountsFill[2].pc99 &
@ -144,9 +169,9 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
up = (now - t.started).seconds.uint64.toSI
mem = getTotalMem().uint.toSI
t.lastStats = data
t.snap.lastStats = data
t.visited = now
t.lastRecov = t.recovery
t.snap.lastRecov = t.snap.recovery
noFmtError("runLogTicker"):
if data.pivotBlock.isSome:
@ -161,7 +186,7 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
if data.nStorageQueue.isSome:
nStoQue = $data.nStorageQueue.unsafeGet
if t.recovery:
if t.snap.recovery:
info "Snap sync statistics (recovery)",
up, nInst, bc, pv, nAcc, accCov, nSto, nStoQue, mem
elif recoveryDone:
@ -171,20 +196,83 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
info "Snap sync statistics",
up, nInst, bc, pv, nAcc, accCov, nSto, nStoQue, mem
t.setLogTicker(Moment.fromNow(tickerLogInterval))
proc fullTicker(t: TickerRef) {.gcsafe.} =
let
data = t.full.fullCb()
now = Moment.now()
if data != t.full.lastStats or
tickerLogSuppressMax < (now - t.visited):
let
persistent = data.topPersistent.pp
staged = data.nextStaged.pp
unprocessed = data.nextUnprocessed.pp
queued = data.nStagedQueue
reOrg = if data.reOrg: "t" else: "f"
buddies = t.nBuddies
# With `int64`, there are more than 29*10^10 years range for seconds
up = (now - t.started).seconds.uint64.toSI
mem = getTotalMem().uint.toSI
t.full.lastStats = data
t.visited = now
if data.suspended:
info "Sync statistics (suspended)", up, buddies,
persistent, unprocessed, staged, queued, reOrg, mem
else:
info "Sync statistics", up, buddies,
persistent, unprocessed, staged, queued, reOrg, mem
# ------------------------------------------------------------------------------
# Private functions: ticking log messages
# ------------------------------------------------------------------------------
proc setLogTicker(t: TickerRef; at: Moment) {.gcsafe.}
proc runLogTicker(t: TickerRef) {.gcsafe.} =
t.prettyPrint(t)
t.setLogTicker(Moment.fromNow(tickerLogInterval))
proc setLogTicker(t: TickerRef; at: Moment) =
if not t.logTicker.isNil:
t.logTicker = safeSetTimer(at, runLogTicker, t)
proc initImpl(t: TickerRef; cb: TickerSnapStatsUpdater) =
t.fullMode = false
t.prettyPrint = snapTicker
t.snap = SnapDescDetails(snapCb: cb)
proc initImpl(t: TickerRef; cb: TickerFullStatsUpdater) =
t.fullMode = true
t.prettyPrint = fullTicker
t.full = FullDescDetails(fullCb: cb)
# ------------------------------------------------------------------------------
# Public constructor and start/stop functions
# ------------------------------------------------------------------------------
proc init*(T: type TickerRef; cb: TickerStatsUpdater): T =
proc init*(t: TickerRef; cb: TickerSnapStatsUpdater) =
## Re-initialise ticket
t.visited.reset
if t.fullMode:
t.prettyPrint(t) # print final state for full sync
t.initImpl(cb)
proc init*(t: TickerRef; cb: TickerFullStatsUpdater) =
## Re-initialise ticket
t.visited.reset
if not t.fullMode:
t.prettyPrint(t) # print final state for snap sync
t.initImpl(cb)
proc init*(T: type TickerRef; cb: TickerSnapStatsUpdater): T =
## Constructor
T(statsCb: cb)
new result
result.initImpl(cb)
proc start*(t: TickerRef) =
## Re/start ticker unconditionally
@ -206,30 +294,35 @@ proc startBuddy*(t: TickerRef) =
## Increment buddies counter and start ticker unless running.
if t.nBuddies <= 0:
t.nBuddies = 1
if not t.recovery:
t.start()
if not t.fullMode:
if not t.snap.recovery:
t.start()
else:
t.nBuddies.inc
proc startRecovery*(t: TickerRef) =
## Ditto for recovery mode
if not t.recovery:
t.recovery = true
if t.nBuddies <= 0:
t.start()
if not t.fullMode:
if not t.snap.recovery:
t.snap.recovery = true
if t.nBuddies <= 0:
t.start()
proc stopBuddy*(t: TickerRef) =
## Decrement buddies counter and stop ticker if there are no more registered
## buddies.
t.nBuddies.dec
if t.nBuddies <= 0 and not t.recovery:
t.stop()
if t.nBuddies <= 0:
if not t.fullMode:
if not t.snap.recovery:
t.stop()
proc stopRecovery*(t: TickerRef) =
## Ditto for recovery mode
if t.recovery:
t.recovery = false
t.stop()
if not t.fullMode:
if t.snap.recovery:
t.snap.recovery = false
t.stop()
# ------------------------------------------------------------------------------
# End

View File

@ -12,6 +12,7 @@
import
std/hashes,
chronos,
eth/[common, p2p],
stew/[interval_set, keyed_queue, sorted_set],
../../db/select_backend,
@ -82,7 +83,20 @@ type
SnapBuddyData* = object
## Per-worker local descriptor data extension
errors*: ComErrorStatsRef ## For error handling
pivotEnv*: SnapPivotRef ## Environment containing state root
SnapSyncModeType* = enum
## Current sync mode, after a snapshot has been downloaded, the system
## proceeds with full sync.
SnapSyncMode = 0 ## Start mode
PreFullSyncMode
FullSyncMode
SnapSyncSpecs* = object
## Full specs for all sync modes. This table must be held in the main
## descriptor and initialised at run time. The table values are opaque
## and will be specified in the worker module(s).
active*: SnapSyncModeType
tab*: array[SnapSyncModeType,RootRef]
SnapCtxData* = object
## Globally shared data extension
@ -96,7 +110,9 @@ type
coveredAccounts*: NodeTagRangeSet ## Derived from all available accounts
covAccTimesFull*: uint ## # of 100% coverages
recovery*: SnapRecoveryRef ## Current recovery checkpoint/context
noRecovery*: bool ## Ignore recovery checkpoints
# Snap/full mode muliplexing
syncMode*: SnapSyncSpecs ## Sync mode data contaier
# Info
ticker*: TickerRef ## Ticker, logger

View File

@ -38,7 +38,7 @@ proc test_pivotStoreRead*(
slotAccounts = seq[NodeKey].default
for n in 0 ..< accKeys.len:
let w = accKeys[n]
check dbBase.savePivot(
check dbBase.pivotSaveDB(
SnapDbPivotRegistry(
header: BlockHeader(stateRoot: w.to(Hash256)),
nAccounts: n.uint64,
@ -50,7 +50,7 @@ proc test_pivotStoreRead*(
sleep(50)
# verify latest state root
block:
let rc = dbBase.recoverPivot()
let rc = dbBase.pivotRecoverDB()
check rc.isOk
if rc.isOk:
check rc.value.nAccounts == n.uint64
@ -64,13 +64,13 @@ proc test_pivotStoreRead*(
for n in 0 ..< accKeys.len:
let w = accKeys[n]
block:
let rc = dbBase.recoverPivot(w)
let rc = dbBase.pivotRecoverDB(w)
check rc.isOk
if rc.isOk:
check rc.value.nAccounts == n.uint64
check rc.value.nSlotLists == n.uint64
# Update record in place
check dbBase.savePivot(
check dbBase.pivotSaveDB(
SnapDbPivotRegistry(
header: BlockHeader(stateRoot: w.to(Hash256)),
nAccounts: n.uint64,
@ -81,7 +81,7 @@ proc test_pivotStoreRead*(
# There might be a race condition on Windows (seen on qemu/win7)
sleep(50)
block:
let rc = dbBase.recoverPivot(w)
let rc = dbBase.pivotRecoverDB(w)
check rc.isOk
if rc.isOk:
check rc.value.nAccounts == n.uint64