diff --git a/nimbus/config.nim b/nimbus/config.nim index 52717e5d4..12bee380c 100644 --- a/nimbus/config.nim +++ b/nimbus/config.nim @@ -139,7 +139,6 @@ type Default Full ## Beware, experimental Snap ## Beware, experimental - SnapCtx ## Beware, experimental Stateless ## Beware, experimental NimbusConf* = object of RootObj @@ -173,7 +172,6 @@ type "- default -- legacy sync mode\n" & "- full -- full blockchain archive\n" & "- snap -- experimental snap mode (development only)\n" & - "- snapCtx -- snap considering possible recovery context\n" & "- stateless -- experimental stateless mode (development only)" defaultValue: SyncMode.Default defaultValueDesc: $SyncMode.Default diff --git a/nimbus/nimbus.nim b/nimbus/nimbus.nim index 646962999..a3c25ebae 100644 --- a/nimbus/nimbus.nim +++ b/nimbus/nimbus.nim @@ -169,7 +169,6 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf, # Early-initialise "--snap-sync" before starting any network connections. block: let - noRecovery = conf.syncMode in {SyncMode.SnapCtx} exCtrlFile = if conf.syncCtrlFile.isNone: none(string) else: some(conf.syncCtrlFile.get.string) tickerOK = conf.logLevel in { @@ -179,14 +178,14 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf, nimbus.fullSyncRef = FullSyncRef.init( nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers, tickerOK, exCtrlFile) - of SyncMode.Snap, SyncMode.SnapCtx: + of SyncMode.Snap: # Minimal capability needed for sync only if ProtocolFlag.Snap notin protocols: nimbus.ethNode.addSnapHandlerCapability( nimbus.ethNode.peerPool) nimbus.snapSyncRef = SnapSyncRef.init( nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers, - nimbus.dbBackend, tickerOK, noRecovery=noRecovery, exCtrlFile) + nimbus.dbBackend, tickerOK, exCtrlFile) of SyncMode.Stateless: # FIXME-Adam: what needs to go here? nimbus.statelessSyncRef = StatelessSyncRef.init() @@ -209,7 +208,7 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf, if conf.maxPeers > 0: var waitForPeers = true case conf.syncMode: - of SyncMode.Snap, SyncMode.SnapCtx, SyncMode.Stateless: + of SyncMode.Snap, SyncMode.Stateless: waitForPeers = false of SyncMode.Full, SyncMode.Default: discard @@ -434,7 +433,7 @@ proc start(nimbus: NimbusNode, conf: NimbusConf) = nimbus.fullSyncRef.start of SyncMode.Stateless: nimbus.statelessSyncRef.start - of SyncMode.Snap, SyncMode.SnapCtx: + of SyncMode.Snap: nimbus.snapSyncRef.start if nimbus.state == Starting: diff --git a/nimbus/sync/handlers/eth.nim b/nimbus/sync/handlers/eth.nim index 5d3271876..71b9a88a1 100644 --- a/nimbus/sync/handlers/eth.nim +++ b/nimbus/sync/handlers/eth.nim @@ -379,11 +379,11 @@ proc setNewBlockHashesHandler*(ctx: EthWireRef, handler: NewBlockHashesHandler, # Public getters/setters # ------------------------------------------------------------------------------ -proc `txPoolEnabled=`*(ctx: EthWireRef; ena: bool) = +proc `txPoolEnabled=`*(ctx: EthWireRef; ena: bool) {.gcsafe, raises: [].} = if ctx.enableTxPool != NotAvailable: ctx.enableTxPool = if ena: Enabled else: Suspended -proc txPoolEnabled*(ctx: EthWireRef): bool = +proc txPoolEnabled*(ctx: EthWireRef): bool {.gcsafe, raises: [].} = ctx.enableTxPool == Enabled # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap.nim b/nimbus/sync/snap.nim index 1814143d6..bc24f7054 100644 --- a/nimbus/sync/snap.nim +++ b/nimbus/sync/snap.nim @@ -114,7 +114,6 @@ proc init*( maxPeers: int; dbBackend: ChainDb; enableTicker = false; - noRecovery = false; exCtrlFile = none(string); ): T = new result @@ -122,7 +121,6 @@ proc init*( result.ctx.chain = chain # explicitely override result.ctx.pool.rng = rng result.ctx.pool.dbBackend = dbBackend - result.ctx.pool.noRecovery = noRecovery # Required to have been initialised via `addEthHandlerCapability()` doAssert not result.ctx.ethWireCtx.isNil diff --git a/nimbus/sync/snap/update_beacon_header.nim b/nimbus/sync/snap/update_beacon_header.nim index 86f19514a..c62f8ec83 100644 --- a/nimbus/sync/snap/update_beacon_header.nim +++ b/nimbus/sync/snap/update_beacon_header.nim @@ -26,6 +26,25 @@ logScope: # Public functions # ------------------------------------------------------------------------------ +proc updateBeaconHeaderbuBlockNumber*( + buddy: SnapBuddyRef; # Worker peer + num: BlockNumber; # Block number to sync against + ) {.async.} = + ## This function updates the beacon header according to the blok number + ## argument. + ## + ## This function is typically used for testing and debugging. + let + ctx = buddy.ctx + peer = buddy.peer + + trace "fetch beacon header", peer, num + if ctx.pool.beaconHeader.blockNumber < num: + let rc = await buddy.getBlockHeader(num) + if rc.isOk: + ctx.pool.beaconHeader = rc.value + + proc updateBeaconHeaderFromFile*( buddy: SnapBuddyRef; # Worker peer ) {.async.} = @@ -62,10 +81,8 @@ proc updateBeaconHeaderFromFile*( if ctx.pool.beaconHeader.blockNumber < num: rc = await buddy.getBlockHeader(num) except CatchableError as e: - let - name {.used.} = $e.name - msg {.used.} = e.msg - trace "Exception while parsing beacon info", peer, isHash, name, msg + trace "Exception while parsing beacon info", peer, isHash, + name=($e.name), msg=(e.msg) if rc.isOk: if ctx.pool.beaconHeader.blockNumber < rc.value.blockNumber: diff --git a/nimbus/sync/snap/worker.nim b/nimbus/sync/snap/worker.nim index d84a14b6e..7bb36b3ac 100644 --- a/nimbus/sync/snap/worker.nim +++ b/nimbus/sync/snap/worker.nim @@ -8,91 +8,91 @@ # at your option. This file may not be copied, modified, or distributed # except according to those terms. -import - std/[options, sets], - chronicles, - chronos, - eth/[common, p2p], - stew/[interval_set, keyed_queue], - ../../common as nimcom, - ../../db/select_backend, - ".."/[handlers, protocol, sync_desc], - ./worker/[pivot, ticker], - ./worker/com/com_error, - ./worker/db/[hexary_desc, snapdb_desc, snapdb_pivot], - "."/[range_desc, update_beacon_header, worker_desc] - {.push raises: [].} -logScope: - topics = "snap-buddy" +import + chronicles, + chronos, + eth/p2p, + stew/[interval_set, keyed_queue], + "../.."/[common, db/select_backend], + ".."/[handlers/eth, protocol, sync_desc], + ./worker/[pivot, play, ticker], + ./worker/com/com_error, + ./worker/db/[snapdb_desc, snapdb_pivot], + "."/[range_desc, worker_desc] -const - extraTraceMessages = false or true - ## Enabled additional logging noise +logScope: + topics = "snap-worker" # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ -template noExceptionOops(info: static[string]; code: untyped) = +template ignoreException(info: static[string]; code: untyped) = try: code except CatchableError as e: - raiseAssert "Inconveivable (" & - info & "): name=" & $e.name & " msg=" & e.msg + error "Exception at " & info & ":", name=($e.name), msg=(e.msg) # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ -proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} = - let recov = ctx.pool.recovery - if recov.isNil: - return false +proc disableWireServices(ctx: SnapCtxRef) = + ## Helper for `setup()`: Temporarily stop useless wire protocol services. + ctx.ethWireCtx.txPoolEnabled = false - let - checkpoint = - "#" & $recov.state.header.blockNumber & "(" & $recov.level & ")" - topLevel = recov.level == 0 - env = block: - let rc = ctx.pool.pivotTable.eq recov.state.header.stateRoot - if rc.isErr: - error "Recovery pivot context gone", checkpoint, topLevel - return false - rc.value +proc enableWireServices(ctx: SnapCtxRef) = + ## Helper for `release()` + ctx.ethWireCtx.txPoolEnabled = true - # Cosmetics: allow other processes (e.g. ticker) to log the current recovery - # state. There is no other intended purpose of this wait state. - await sleepAsync 1100.milliseconds +# -------------- - #when extraTraceMessages: - # trace "Recovery continued ...", checkpoint, topLevel, - # nAccounts=recov.state.nAccounts, nDangling=recov.state.dangling.len +proc enableTicker(ctx: SnapCtxRef; tickerOK: bool) = + ## Helper for `setup()`: Log/status ticker + if tickerOK: + ctx.pool.ticker = TickerRef.init(ctx.pool.pivotTable.tickerStats(ctx)) + else: + trace "Ticker is disabled" - # Update pivot data from recovery checkpoint - env.recoverPivotFromCheckpoint(ctx, topLevel) +proc disableTicker(ctx: SnapCtxRef) = + ## Helper for `release()` + if not ctx.pool.ticker.isNil: + ctx.pool.ticker.stop() + ctx.pool.ticker = nil - # Fetch next recovery record if there is any - if recov.state.predecessor.isZero: - #when extraTraceMessages: - # trace "Recovery done", checkpoint, topLevel - return false - let rc = ctx.pool.snapDb.recoverPivot(recov.state.predecessor) - if rc.isErr: - when extraTraceMessages: - trace "Recovery stopped at pivot stale checkpoint", checkpoint, topLevel - return false +# -------------- - # Set up next level pivot checkpoint - ctx.pool.recovery = SnapRecoveryRef( - state: rc.value, - level: recov.level + 1) +proc enableRpcMagic(ctx: SnapCtxRef) = + ## Helper for `setup()`: Enable external pivot update via RPC + ctx.chain.com.syncReqNewHead = ctx.pivotUpdateBeaconHeaderCB - # Push onto pivot table and continue recovery (i.e. do not stop it yet) - ctx.pool.pivotTable.reverseUpdate(ctx.pool.recovery.state.header, ctx) +proc disableRpcMagic(ctx: SnapCtxRef) = + ## Helper for `release()` + ctx.chain.com.syncReqNewHead = nil - return true # continue recovery +# -------------- + +proc detectSnapSyncRecovery(ctx: SnapCtxRef) = + ## Helper for `setup()`: Initiate snap sync recovery (if any) + let rc = ctx.pool.snapDb.pivotRecoverDB() + if rc.isOk: + ctx.pool.recovery = SnapRecoveryRef(state: rc.value) + ctx.daemon = true + + # Set up early initial pivot + ctx.pool.pivotTable.reverseUpdate(ctx.pool.recovery.state.header, ctx) + trace "Snap sync recovery started", + checkpoint=("#" & $ctx.pool.pivotTable.topNumber() & "(0)") + if not ctx.pool.ticker.isNil: + ctx.pool.ticker.startRecovery() + +proc initSnapDb(ctx: SnapCtxRef) = + ## Helper for `setup()`: Initialise snap sync database layer + ctx.pool.snapDb = + if ctx.pool.dbBackend.isNil: SnapDbRef.init(ctx.chain.db.db) + else: SnapDbRef.init(ctx.pool.dbBackend) # ------------------------------------------------------------------------------ # Public start/stop and admin functions @@ -100,32 +100,18 @@ proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} = proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool = ## Global set up + + # For snap sync book keeping ctx.pool.coveredAccounts = NodeTagRangeSet.init() - noExceptionOops("worker.setup()"): - ctx.ethWireCtx.txPoolEnabled = false - ctx.chain.com.syncReqNewHead = ctx.pivotUpdateBeaconHeaderCB - ctx.pool.snapDb = - if ctx.pool.dbBackend.isNil: SnapDbRef.init(ctx.chain.db.db) - else: SnapDbRef.init(ctx.pool.dbBackend) - if tickerOK: - ctx.pool.ticker = TickerRef.init(ctx.pool.pivotTable.tickerStats(ctx)) - else: - trace "Ticker is disabled" - # Check for recovery mode - if not ctx.pool.noRecovery: - let rc = ctx.pool.snapDb.recoverPivot() - if rc.isOk: - ctx.pool.recovery = SnapRecoveryRef(state: rc.value) - ctx.daemon = true - - # Set up early initial pivot - ctx.pool.pivotTable.reverseUpdate(ctx.pool.recovery.state.header, ctx) - trace "Recovery started", - checkpoint=("#" & $ctx.pool.pivotTable.topNumber() & "(0)") - if not ctx.pool.ticker.isNil: - ctx.pool.ticker.startRecovery() + ctx.enableRpcMagic() # Allow external pivot update via RPC + ctx.disableWireServices() # Stop unwanted public services + ctx.pool.syncMode.playInit() # Set up sync sub-mode specs. + ctx.initSnapDb() # Set database backend, subject to change + ctx.detectSnapSyncRecovery() # Check for recovery mode + ctx.enableTicker(tickerOK) # Start log/status ticker (if any) + # Experimental, also used for debugging if ctx.exCtrlFile.isSome: warn "Snap sync accepts pivot block number or hash", syncCtrlFile=ctx.exCtrlFile.get @@ -133,12 +119,10 @@ proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool = proc release*(ctx: SnapCtxRef) = ## Global clean up - if not ctx.pool.ticker.isNil: - ctx.pool.ticker.stop() - ctx.pool.ticker = nil - noExceptionOops("worker.release()"): - ctx.ethWireCtx.txPoolEnabled = true - ctx.chain.com.syncReqNewHead = nil + ctx.disableTicker() # Stop log/status ticker (if any) + ctx.enableWireServices() # re-enable public services + ctx.disableRpcMagic() # Disable external pivot update via RPC + proc start*(buddy: SnapBuddyRef): bool = ## Initialise worker peer @@ -160,127 +144,28 @@ proc stop*(buddy: SnapBuddyRef) = ctx.pool.ticker.stopBuddy() # ------------------------------------------------------------------------------ -# Public functions +# Public functions, sync handler multiplexers # ------------------------------------------------------------------------------ proc runDaemon*(ctx: SnapCtxRef) {.async.} = - ## Enabled while `ctx.daemon` is `true` - ## - if not ctx.pool.recovery.isNil: - if not await ctx.recoveryStepContinue(): - # Done, stop recovery - ctx.pool.recovery = nil - ctx.daemon = false - - # Update logging - if not ctx.pool.ticker.isNil: - ctx.pool.ticker.stopRecovery() - + ## Sync processsing multiplexer + ignoreException("runDaemon"): + await ctx.playSyncSpecs.daemon(ctx) proc runSingle*(buddy: SnapBuddyRef) {.async.} = - ## Enabled while - ## * `buddy.ctrl.multiOk` is `false` - ## * `buddy.ctrl.poolMode` is `false` - ## - let ctx = buddy.ctx - - # External beacon header updater - await buddy.updateBeaconHeaderFromFile() - - await buddy.pivotApprovePeer() - buddy.ctrl.multiOk = true - + ## Sync processsing multiplexer + ignoreException("runSingle"): + await buddy.ctx.playSyncSpecs.single(buddy) proc runPool*(buddy: SnapBuddyRef, last: bool): bool = - ## Enabled when `buddy.ctrl.poolMode` is `true` - ## - let ctx = buddy.ctx - ctx.poolMode = false - result = true - - # Clean up empty pivot slots (never the top one) - var rc = ctx.pool.pivotTable.beforeLast - while rc.isOK: - let (key, env) = (rc.value.key, rc.value.data) - if env.fetchAccounts.processed.isEmpty: - ctx.pool.pivotTable.del key - rc = ctx.pool.pivotTable.prev(key) - + ## Sync processsing multiplexer + ignoreException("runPool"): + result = buddy.ctx.playSyncSpecs.pool(buddy,last) proc runMulti*(buddy: SnapBuddyRef) {.async.} = - ## Enabled while - ## * `buddy.ctx.multiOk` is `true` - ## * `buddy.ctx.poolMode` is `false` - ## - let - ctx = buddy.ctx - peer = buddy.peer - - # Set up current state root environment for accounts snapshot - let - env = block: - let rc = ctx.pool.pivotTable.lastValue - if rc.isErr: - return # nothing to do - rc.value - pivot = "#" & $env.stateHeader.blockNumber # for logging - nStorQuAtStart = env.fetchStorageFull.len + - env.fetchStoragePart.len + - env.parkedStorage.len - - buddy.only.pivotEnv = env - - # Full sync processsing based on current snapshot - # ----------------------------------------------- - - # Check whether this pivot is fully downloaded - if env.fetchAccounts.processed.isFull and nStorQuAtStart == 0: - trace "Snap full sync -- not implemented yet", peer, pivot - await sleepAsync(5.seconds) - # flip over to single mode for getting new instructins - buddy.ctrl.multiOk = false - return - - # Snapshot sync processing - # ------------------------ - - # If this is a new pivot, the previous one can be cleaned up. There is no - # point in keeping some older space consuming state data any longer. - ctx.pool.pivotTable.beforeTopMostlyClean() - - when extraTraceMessages: - block: - trace "Multi sync runner", peer, pivot, nAccounts=env.nAccounts, - nSlotLists=env.nSlotLists, - processed=env.fetchAccounts.processed.fullPC3, - nStoQu=nStorQuAtStart - - # This one is the syncing work horse which downloads the database - await env.execSnapSyncAction(buddy) - - # Various logging entries (after accounts and storage slots download) - let - nAccounts {.used.} = env.nAccounts - nSlotLists {.used.} = env.nSlotLists - processed {.used.} = env.fetchAccounts.processed.fullPC3 - nStoQuLater {.used.} = env.fetchStorageFull.len + env.fetchStoragePart.len - - if env.archived: - # Archive pivot if it became stale - when extraTraceMessages: - trace "Mothballing", peer, pivot, nAccounts, nSlotLists - env.pivotMothball() - - else: - # Save state so sync can be partially resumed at next start up - let rc = env.saveCheckpoint(ctx) - if rc.isErr: - error "Failed to save recovery checkpoint", peer, pivot, nAccounts, - nSlotLists, processed, nStoQu=nStoQuLater, error=rc.error - else: - when extraTraceMessages: - trace "Saved recovery checkpoint", peer, pivot, nAccounts, nSlotLists, - processed, nStoQu=nStoQuLater, blobSize=rc.value + ## Sync processsing multiplexer + ignoreException("runMulti"): + await buddy.ctx.playSyncSpecs.multi(buddy) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/com/com_error.nim b/nimbus/sync/snap/worker/com/com_error.nim index 2520d7742..75ca778ed 100644 --- a/nimbus/sync/snap/worker/com/com_error.nim +++ b/nimbus/sync/snap/worker/com/com_error.nim @@ -19,6 +19,7 @@ type ComErrorStatsRef* = ref object ## particular error counters so connections will not be cut immediately ## after a particular error. + peerDegraded*: bool nTimeouts*: uint nNoData*: uint nNetwork*: uint @@ -61,6 +62,7 @@ proc stopAfterSeriousComError*( if comErrorsTimeoutMax < stats.nTimeouts: # Mark this peer dead, i.e. avoid fetching from this peer for a while ctrl.zombie = true + stats.peerDegraded = true return true when 0 < comErrorsTimeoutSleepMSecs: @@ -71,6 +73,7 @@ proc stopAfterSeriousComError*( stats.nNetwork.inc if comErrorsNetworkMax < stats.nNetwork: ctrl.zombie = true + stats.peerDegraded = true return true when 0 < comErrorsNetworkSleepMSecs: @@ -84,6 +87,7 @@ proc stopAfterSeriousComError*( ComNoTrieNodesAvailable: stats.nNoData.inc if comErrorsNoDataMax < stats.nNoData: + # Mark this peer dead, i.e. avoid fetching from this peer for a while ctrl.zombie = true return true diff --git a/nimbus/sync/snap/worker/db/snapdb_pivot.nim b/nimbus/sync/snap/worker/db/snapdb_pivot.nim index 5e475a81c..e71bc2868 100644 --- a/nimbus/sync/snap/worker/db/snapdb_pivot.nim +++ b/nimbus/sync/snap/worker/db/snapdb_pivot.nim @@ -44,37 +44,37 @@ template handleRlpException(info: static[string]; code: untyped) = # Public functions # ------------------------------------------------------------------------------ -proc savePivot*( +proc pivotSaveDB*( pv: SnapDbRef; ## Base descriptor on `ChainDBRef` data: SnapDbPivotRegistry; ## Registered data record ): Result[int,HexaryError] = ## Register pivot environment - handleRlpException("savePivot()"): + handleRlpException("pivotSaveDB()"): let rlpData = rlp.encode(data) pv.kvDb.persistentStateRootPut(data.header.stateRoot.to(NodeKey), rlpData) return ok(rlpData.len) # notreached -proc recoverPivot*( +proc pivotRecoverDB*( pv: SnapDbRef; ## Base descriptor on `ChainDBRef` stateRoot: NodeKey; ## Check for a particular state root ): Result[SnapDbPivotRegistry,HexaryError] = ## Restore pivot environment for a particular state root. let rc = pv.kvDb.persistentStateRootGet(stateRoot) if rc.isOk: - handleRlpException("recoverPivot()"): + handleRlpException("rpivotRecoverDB()"): var r = rlp.decode(rc.value.data, SnapDbPivotRegistry) r.predecessor = rc.value.key return ok(r) err(StateRootNotFound) -proc recoverPivot*( +proc pivotRecoverDB*( pv: SnapDbRef; ## Base descriptor on `ChainDBRef` ): Result[SnapDbPivotRegistry,HexaryError] = ## Restore pivot environment that was saved latest. let rc = pv.kvDb.persistentStateRootGet(NodeKey.default) if rc.isOk: - return pv.recoverPivot(rc.value.key) + return pv.pivotRecoverDB(rc.value.key) err(StateRootNotFound) # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/pivot.nim b/nimbus/sync/snap/worker/pivot.nim index ae124f8b6..81d97a3b5 100644 --- a/nimbus/sync/snap/worker/pivot.nim +++ b/nimbus/sync/snap/worker/pivot.nim @@ -121,7 +121,7 @@ proc reverseUpdate*( proc tickerStats*( pivotTable: var SnapPivotTable; # Pivot table ctx: SnapCtxRef; # Some global context - ): TickerStatsUpdater = + ): TickerSnapStatsUpdater = ## This function returns a function of type `TickerStatsUpdater` that prints ## out pivot table statitics. The returned fuction is supposed to drive ## ticker` module. @@ -134,7 +134,7 @@ proc tickerStats*( if rSq < sqSumAv: result[1] = sqrt(sqSum / length.float - result[0] * result[0]) - result = proc: SnapTickerStats = + result = proc: TickerSnapStats = var aSum, aSqSum, uSum, uSqSum, sSum, sSqSum: float count = 0 @@ -172,7 +172,7 @@ proc tickerStats*( if 0 < ctx.pool.beaconHeader.blockNumber: beaconBlock = some(ctx.pool.beaconHeader.blockNumber) - SnapTickerStats( + TickerSnapStats( beaconBlock: beaconBlock, pivotBlock: pivotBlock, nQueues: ctx.pool.pivotTable.len, @@ -239,7 +239,7 @@ proc execSnapSyncAction*( await buddy.rangeFetchStorageSlots(env) else: rangeFetchOk = false - if env.archived: + if env.archived or (buddy.ctrl.zombie and buddy.only.errors.peerDegraded): return # Uncconditonally try healing if enabled. @@ -250,7 +250,7 @@ proc execSnapSyncAction*( # physically disconnected. buddy.ctrl.forceRun = true await buddy.healAccounts(env) - if env.archived: + if env.archived or (buddy.ctrl.zombie and buddy.only.errors.peerDegraded): return # Some additional storage slots might have been popped up @@ -287,7 +287,7 @@ proc saveCheckpoint*( if accountsSaveStorageSlotsMax < nStoQu: return err(TooManySlotAccounts) - ctx.pool.snapDb.savePivot SnapDbPivotRegistry( + ctx.pool.snapDb.pivotSaveDB SnapDbPivotRegistry( header: env.stateHeader, nAccounts: env.nAccounts, nSlotLists: env.nSlotLists, @@ -298,7 +298,7 @@ proc saveCheckpoint*( toSeq(env.parkedStorage.items)) -proc recoverPivotFromCheckpoint*( +proc pivotRecoverFromCheckpoint*( env: SnapPivotRef; # Current pivot environment ctx: SnapCtxRef; # Global context (containing save state) topLevel: bool; # Full data set on top level only diff --git a/nimbus/sync/snap/worker/pivot/heal_accounts.nim b/nimbus/sync/snap/worker/pivot/heal_accounts.nim index 1fceead3e..d51238c87 100644 --- a/nimbus/sync/snap/worker/pivot/heal_accounts.nim +++ b/nimbus/sync/snap/worker/pivot/heal_accounts.nim @@ -56,7 +56,7 @@ logScope: topics = "snap-acc" const - extraTraceMessages = false or true + extraTraceMessages = false # or true ## Enabled additional logging noise # ------------------------------------------------------------------------------ @@ -64,7 +64,7 @@ const # ------------------------------------------------------------------------------ template logTxt(info: static[string]): static[string] = - "Accounts healing " & info + "Accounts heal " & info proc `$`(node: NodeSpecs): string = node.partialPath.toHex @@ -341,9 +341,7 @@ proc healAccounts*( env: SnapPivotRef; ) {.async.} = ## Fetching and merging missing account trie database nodes. - when extraTraceMessages: - let peer {.used.} = buddy.peer - trace logTxt "started", peer, ctx=buddy.healingCtx(env) + trace logTxt "started", peer=buddy.peer, ctx=buddy.healingCtx(env) let fa = env.fetchAccounts @@ -362,9 +360,8 @@ proc healAccounts*( nNodesFetched.inc(nNodes) nFetchLoop.inc - when extraTraceMessages: - trace logTxt "done", peer, ctx=buddy.healingCtx(env), - nNodesFetched, nFetchLoop, nIgnore=ignore.len + trace logTxt "done", peer=buddy.peer, ctx=buddy.healingCtx(env), + nNodesFetched, nFetchLoop, nIgnore=ignore.len # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/pivot/heal_storage_slots.nim b/nimbus/sync/snap/worker/pivot/heal_storage_slots.nim index 9cf7d27f8..3f1248783 100644 --- a/nimbus/sync/snap/worker/pivot/heal_storage_slots.nim +++ b/nimbus/sync/snap/worker/pivot/heal_storage_slots.nim @@ -58,7 +58,7 @@ logScope: topics = "snap-slot" const - extraTraceMessages = false or true + extraTraceMessages = false # or true ## Enabled additional logging noise # ------------------------------------------------------------------------------ @@ -66,7 +66,7 @@ const # ------------------------------------------------------------------------------ template logTxt(info: static[string]): static[string] = - "Storage slots healing " & info + "Storage slots heal " & info proc `$`(node: NodeSpecs): string = node.partialPath.toHex @@ -344,9 +344,7 @@ proc healStorageSlots*( env: SnapPivotRef; ) {.async.} = ## Fetching and merging missing slorage slots trie database nodes. - when extraTraceMessages: - let peer {.used.} = buddy.peer - trace logTxt "started", peer, ctx=buddy.healingCtx(env) + trace logTxt "started", peer=buddy.peer, ctx=buddy.healingCtx(env) var nNodesFetched = 0 @@ -363,8 +361,8 @@ proc healStorageSlots*( let rc = env.storageQueueUnlinkPartialItem visited if rc.isErr: when extraTraceMessages: - trace logTxt "queue exhausted", peer, ctx=buddy.healingCtx(env), - nIgnore=ignore.len, nVisited=visited.len + trace logTxt "queue exhausted", peer=buddy.peer, + ctx=buddy.healingCtx(env), nIgnore=ignore.len, nVisited=visited.len break rc.value @@ -383,9 +381,8 @@ proc healStorageSlots*( ignore = ignore + rejected nNodesFetched.inc(nNodes) - when extraTraceMessages: - trace logTxt "done", peer, ctx=buddy.healingCtx(env), - nNodesFetched, nFetchLoop, nIgnore=ignore.len, nVisited=visited.len + trace logTxt "done", peer=buddy.peer, ctx=buddy.healingCtx(env), + nNodesFetched, nFetchLoop, nIgnore=ignore.len, nVisited=visited.len # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim b/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim index d54dd7441..f1d30ca0c 100644 --- a/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim +++ b/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim @@ -60,7 +60,7 @@ logScope: topics = "snap-acc" const - extraTraceMessages = false or true + extraTraceMessages = false # or true ## Enabled additional logging noise # ------------------------------------------------------------------------------ @@ -68,7 +68,7 @@ const # ------------------------------------------------------------------------------ template logTxt(info: static[string]): static[string] = - "Accounts range " & info + "Accounts fetch " & info proc `$`(rs: NodeTagRangeSet): string = rs.fullPC3 @@ -166,9 +166,9 @@ proc accountsRangefetchImpl( if rc.isErr: # Bad data, just try another peer buddy.ctrl.zombie = true - when extraTraceMessages: - trace logTxt "import failed", peer, ctx=buddy.fetchCtx(env), - gotAccounts, gotStorage, reqLen=iv, covered, error=rc.error + # Failed to store on database, not much that can be done here + error logTxt "import failed", peer, ctx=buddy.fetchCtx(env), + gotAccounts, gotStorage, reqLen=iv, covered, error=rc.error return rc.value @@ -218,13 +218,12 @@ proc rangeFetchAccounts*( env: SnapPivotRef; ) {.async.} = ## Fetch accounts and store them in the database. + trace logTxt "start", peer=buddy.peer, ctx=buddy.fetchCtx(env) + let fa = env.fetchAccounts + var nFetchAccounts = 0 # for logging if not fa.processed.isFull(): - when extraTraceMessages: - trace logTxt "start", peer=buddy.peer, ctx=buddy.fetchCtx(env) - - var nFetchAccounts = 0 # for logging while not fa.processed.isFull() and buddy.ctrl.running and not env.archived: @@ -239,9 +238,7 @@ proc rangeFetchAccounts*( if storageSlotsQuPrioThresh < nStoQu: break - when extraTraceMessages: - trace logTxt "done", peer=buddy.peer, ctx=buddy.fetchCtx(env), - nFetchAccounts + trace logTxt "done", peer=buddy.peer, ctx=buddy.fetchCtx(env), nFetchAccounts # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/pivot/range_fetch_storage_slots.nim b/nimbus/sync/snap/worker/pivot/range_fetch_storage_slots.nim index d854d7d73..a4bd28d84 100644 --- a/nimbus/sync/snap/worker/pivot/range_fetch_storage_slots.nim +++ b/nimbus/sync/snap/worker/pivot/range_fetch_storage_slots.nim @@ -81,14 +81,14 @@ logScope: topics = "snap-slot" const - extraTraceMessages = false or true + extraTraceMessages = false # or true # ------------------------------------------------------------------------------ # Private logging helpers # ------------------------------------------------------------------------------ template logTxt(info: static[string]): static[string] = - "Storage slots range " & info + "Storage slots fetch " & info proc fetchCtx( buddy: SnapBuddyRef; @@ -142,6 +142,8 @@ proc fetchStorageSlotsImpl( let report = ctx.pool.snapDb.importStorageSlots(peer, stoRange.data) if 0 < report.len: if report[^1].slot.isNone: + # Bad data, just try another peer + buddy.ctrl.zombie = true # Failed to store on database, not much that can be done here error logTxt "import failed", peer, ctx=buddy.fetchCtx(env), nSlotLists=0, nReq=req.len, error=report[^1].error @@ -202,8 +204,7 @@ proc rangeFetchStorageSlots*( ## each work item on the queue at least once.For partial partial slot range ## items this means in case of success that the outstanding range has become ## at least smaller. - when extraTraceMessages: - trace logTxt "start", peer=buddy.peer, ctx=buddy.fetchCtx(env) + trace logTxt "start", peer=buddy.peer, ctx=buddy.fetchCtx(env) # Fetch storage data and save it on disk. Storage requests are managed by # request queues for handling full/partial replies and re-fetch issues. For @@ -252,8 +253,7 @@ proc rangeFetchStorageSlots*( # End `while` # End `for` - when extraTraceMessages: - trace logTxt "done", peer=buddy.peer, ctx=buddy.fetchCtx(env) + trace logTxt "done", peer=buddy.peer, ctx=buddy.fetchCtx(env) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/pivot/swap_in.nim b/nimbus/sync/snap/worker/pivot/swap_in.nim index 5cb7d0536..4ee0b67cf 100644 --- a/nimbus/sync/snap/worker/pivot/swap_in.nim +++ b/nimbus/sync/snap/worker/pivot/swap_in.nim @@ -58,7 +58,7 @@ type pivot: SnapPivotRef ## Accounts only const - extraTraceMessages = false or true + extraTraceMessages = false # or true ## Enabled additional logging noise # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/play.nim b/nimbus/sync/snap/worker/play.nim new file mode 100644 index 000000000..c37155c32 --- /dev/null +++ b/nimbus/sync/snap/worker/play.nim @@ -0,0 +1,25 @@ +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +{.push raises: [].} + +import + ../worker_desc, + ./play/[play_desc, play_full_sync, play_prep_full, play_snap_sync] + +export + PlaySyncSpecs, + playSyncSpecs, + `playMode=` + +proc playInit*(desc: var SnapSyncSpecs) = + ## Set up sync mode specs table. This cannot be done at compile time. + desc.tab[SnapSyncMode] = playSnapSyncSpecs() + desc.tab[PreFullSyncMode] = playPrepFullSpecs() + desc.tab[FullSyncMode] = playFullSyncSpecs() + +# End diff --git a/nimbus/sync/snap/worker/play/play_desc.nim b/nimbus/sync/snap/worker/play/play_desc.nim new file mode 100644 index 000000000..2f2e3abaa --- /dev/null +++ b/nimbus/sync/snap/worker/play/play_desc.nim @@ -0,0 +1,46 @@ +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +{.push raises: [].} + +import + chronos, + ../../../sync_desc, + ../../worker_desc + +type + PlayVoidFutureCtxFn* = proc( + ctx: SnapCtxRef): Future[void] {.gcsafe, raises: [CatchableError].} + + PlayVoidFutureBuddyFn* = proc( + buddy: SnapBuddyRef): Future[void] {.gcsafe, raises: [CatchableError].} + + PlayBoolBuddyFn* = proc( + buddy: SnapBuddyRef, last: bool): bool {.gcsafe, raises: [CatchableError].} + + PlaySyncSpecs* = ref object of RootRef + ## Holds sync mode specs & methods for a particular sync state + pool*: PlayBoolBuddyFn + daemon*: PlayVoidFutureCtxFn + single*: PlayVoidFutureBuddyFn + multi*: PlayVoidFutureBuddyFn + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc playSyncSpecs*(ctx: SnapCtxRef): PlaySyncSpecs = + ## Getter + ctx.pool.syncMode.tab[ctx.pool.syncMode.active].PlaySyncSpecs + +proc `playMode=`*(ctx: SnapCtxRef; val: SnapSyncModeType) = + ## Setter + ctx.pool.syncMode.active = val + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/play/play_full_sync.nim b/nimbus/sync/snap/worker/play/play_full_sync.nim new file mode 100644 index 000000000..bdb3ab36b --- /dev/null +++ b/nimbus/sync/snap/worker/play/play_full_sync.nim @@ -0,0 +1,65 @@ +# Nimbus +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +{.push raises: [].} + +import + chronicles, + chronos, + eth/p2p, + ../../../sync_desc, + ../../worker_desc, + play_desc + +const + extraTraceMessages = false or true + ## Enabled additional logging noise + +# ------------------------------------------------------------------------------ +# Private functions, full sync handlers +# ------------------------------------------------------------------------------ + +proc fullSyncPool(buddy: SnapBuddyRef, last: bool): bool = + buddy.ctx.poolMode = false + true + +proc fullSyncDaemon(ctx: SnapCtxRef) {.async.} = + ctx.daemon = false + +proc fullSyncSingle(buddy: SnapBuddyRef) {.async.} = + buddy.ctrl.multiOk = true + +proc fullSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} = + ## Full sync processing + let + ctx = buddy.ctx + peer = buddy.peer + + trace "Snap full sync -- not implemented yet", peer + await sleepAsync(5.seconds) + + # flip over to single mode for getting new instructins + buddy.ctrl.multiOk = false + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc playFullSyncSpecs*: PlaySyncSpecs = + ## Return full sync handler environment + PlaySyncSpecs( + pool: fullSyncPool, + daemon: fullSyncDaemon, + single: fullSyncSingle, + multi: fullSyncMulti) + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/play/play_prep_full.nim b/nimbus/sync/snap/worker/play/play_prep_full.nim new file mode 100644 index 000000000..ecff962b1 --- /dev/null +++ b/nimbus/sync/snap/worker/play/play_prep_full.nim @@ -0,0 +1,92 @@ +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +## Transitional handlers preparing for full sync + +{.push raises: [].} + +import + chronicles, + chronos, + eth/p2p, + stew/keyed_queue, + ../../../sync_desc, + ../../worker_desc, + ../ticker, + play_desc + +const + extraTraceMessages = false or true + ## Enabled additional logging noise + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +proc blindTicker(ctx: SnapCtxRef): TickerFullStatsUpdater = + result = proc: TickerFullStats = + discard + +# ------------------------------------------------------------------------------ +# Private functions, transitional handlers preparing for full sync +# ------------------------------------------------------------------------------ + +proc prepFullSyncPool(buddy: SnapBuddyRef, last: bool): bool = + buddy.ctx.poolMode = false + true + +proc prepFullSyncDaemon(ctx: SnapCtxRef) {.async.} = + ctx.daemon = false + +proc prepFullSyncSingle(buddy: SnapBuddyRef) {.async.} = + ## One off, setting up full sync processing in single mode + let + ctx = buddy.ctx + + # Fetch latest state root environment + env = block: + let rc = ctx.pool.pivotTable.lastValue + if rc.isErr: + buddy.ctrl.multiOk = false + return + rc.value + + peer = buddy.peer + pivot = "#" & $env.stateHeader.blockNumber # for logging + + when extraTraceMessages: + trace "Full sync prepare in single mode", peer, pivot + + # update ticker (currently blind) + ctx.pool.ticker.init(cb = ctx.blindTicker()) + + # Cosmetics: allow other processes (e.g. ticker) to log the current + # state. There is no other intended purpose of this wait state. + await sleepAsync 1100.milliseconds + + ctx.playMode = FullSyncMode + buddy.ctrl.multiOk = true + + +proc prepFullSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} = + ## One off, setting up full sync processing in single mode + buddy.ctrl.multiOk = false + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc playPrepFullSpecs*: PlaySyncSpecs = + ## Return full sync preparation handler environment + PlaySyncSpecs( + pool: prepFullSyncPool, + daemon: prepFullSyncDaemon, + single: prepFullSyncSingle, + multi: prepFullSyncMulti) + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/play/play_snap_sync.nim b/nimbus/sync/snap/worker/play/play_snap_sync.nim new file mode 100644 index 000000000..f9fabc189 --- /dev/null +++ b/nimbus/sync/snap/worker/play/play_snap_sync.nim @@ -0,0 +1,212 @@ +# Nimbus +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +{.push raises: [].} + +import + chronicles, + chronos, + eth/p2p, + stew/[interval_set, keyed_queue], + ../../../sync_desc, + ".."/[pivot, ticker], + ../pivot/storage_queue_helper, + ../db/[hexary_desc, snapdb_pivot], + "../.."/[range_desc, update_beacon_header, worker_desc], + play_desc + +logScope: + topics = "snap-play" + +const + extraTraceMessages = false or true + ## Enabled additional logging noise + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} = + let recov = ctx.pool.recovery + if recov.isNil: + return false + + let + checkpoint = + "#" & $recov.state.header.blockNumber & "(" & $recov.level & ")" + topLevel = recov.level == 0 + env = block: + let rc = ctx.pool.pivotTable.eq recov.state.header.stateRoot + if rc.isErr: + error "Recovery pivot context gone", checkpoint, topLevel + return false + rc.value + + # Cosmetics: allow other processes (e.g. ticker) to log the current recovery + # state. There is no other intended purpose of this wait state. + await sleepAsync 1100.milliseconds + + #when extraTraceMessages: + # trace "Recovery continued ...", checkpoint, topLevel, + # nAccounts=recov.state.nAccounts, nDangling=recov.state.dangling.len + + # Update pivot data from recovery checkpoint + env.pivotRecoverFromCheckpoint(ctx, topLevel) + + # Fetch next recovery record if there is any + if recov.state.predecessor.isZero: + #when extraTraceMessages: + # trace "Recovery done", checkpoint, topLevel + return false + let rc = ctx.pool.snapDb.pivotRecoverDB(recov.state.predecessor) + if rc.isErr: + when extraTraceMessages: + trace "Recovery stopped at pivot stale checkpoint", checkpoint, topLevel + return false + + # Set up next level pivot checkpoint + ctx.pool.recovery = SnapRecoveryRef( + state: rc.value, + level: recov.level + 1) + + # Push onto pivot table and continue recovery (i.e. do not stop it yet) + ctx.pool.pivotTable.reverseUpdate(ctx.pool.recovery.state.header, ctx) + + return true # continue recovery + +# ------------------------------------------------------------------------------ +# Private functions, snap sync handlers +# ------------------------------------------------------------------------------ + +proc snapSyncPool(buddy: SnapBuddyRef, last: bool): bool = + ## Enabled when `buddy.ctrl.poolMode` is `true` + ## + let ctx = buddy.ctx + ctx.poolMode = false + result = true + + # Clean up empty pivot slots (never the top one) + var rc = ctx.pool.pivotTable.beforeLast + while rc.isOK: + let (key, env) = (rc.value.key, rc.value.data) + if env.fetchAccounts.processed.isEmpty: + ctx.pool.pivotTable.del key + rc = ctx.pool.pivotTable.prev(key) + + +proc snapSyncDaemon(ctx: SnapCtxRef) {.async.} = + ## Enabled while `ctx.daemon` is `true` + ## + if not ctx.pool.recovery.isNil: + if not await ctx.recoveryStepContinue(): + # Done, stop recovery + ctx.pool.recovery = nil + ctx.daemon = false + + # Update logging + if not ctx.pool.ticker.isNil: + ctx.pool.ticker.stopRecovery() + + +proc snapSyncSingle(buddy: SnapBuddyRef) {.async.} = + ## Enabled while + ## * `buddy.ctrl.multiOk` is `false` + ## * `buddy.ctrl.poolMode` is `false` + ## + let ctx = buddy.ctx + + # External beacon header updater + await buddy.updateBeaconHeaderFromFile() + + await buddy.pivotApprovePeer() + buddy.ctrl.multiOk = true + + +proc snapSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} = + ## Enabled while + ## * `buddy.ctx.multiOk` is `true` + ## * `buddy.ctx.poolMode` is `false` + ## + let + ctx = buddy.ctx + + # Fetch latest state root environment + env = block: + let rc = ctx.pool.pivotTable.lastValue + if rc.isErr: + buddy.ctrl.multiOk = false + return # nothing to do + rc.value + + peer = buddy.peer + pivot = "#" & $env.stateHeader.blockNumber # for logging + fa = env.fetchAccounts + + # Check whether this pivot is fully downloaded + if env.fetchAccounts.processed.isFull and env.storageQueueTotal() == 0: + # Switch to full sync => final state + ctx.playMode = PreFullSyncMode + trace "Switch to full sync", peer, pivot, nAccounts=env.nAccounts, + processed=fa.processed.fullPC3, nStoQu=env.storageQueueTotal(), + nSlotLists=env.nSlotLists + return + + # If this is a new snap sync pivot, the previous one can be cleaned up and + # archived. There is no point in keeping some older space consuming state + # data any longer. + ctx.pool.pivotTable.beforeTopMostlyClean() + + when extraTraceMessages: + trace "Multi sync runner", peer, pivot, nAccounts=env.nAccounts, + processed=fa.processed.fullPC3, nStoQu=env.storageQueueTotal(), + nSlotLists=env.nSlotLists + + # This one is the syncing work horse which downloads the database + await env.execSnapSyncAction(buddy) + + # Various logging entries (after accounts and storage slots download) + let + nAccounts = env.nAccounts + nSlotLists = env.nSlotLists + processed = fa.processed.fullPC3 + + # Archive this pivot eveironment if it has become stale + if env.archived: + when extraTraceMessages: + trace "Mothballing", peer, pivot, nAccounts, nSlotLists + env.pivotMothball() + return + + # Save state so sync can be resumed at next start up + let rc = env.saveCheckpoint(ctx) + if rc.isOk: + when extraTraceMessages: + trace "Saved recovery checkpoint", peer, pivot, nAccounts, processed, + nStoQu=env.storageQueueTotal(), nSlotLists, blobSize=rc.value + return + + error "Failed to save recovery checkpoint", peer, pivot, nAccounts, + processed, nStoQu=env.storageQueueTotal(), nSlotLists, error=rc.error + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc playSnapSyncSpecs*: PlaySyncSpecs = + ## Return snap sync handler environment + PlaySyncSpecs( + pool: snapSyncPool, + daemon: snapSyncDaemon, + single: snapSyncSingle, + multi: snapSyncMulti) + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/ticker.nim b/nimbus/sync/snap/worker/ticker.nim index 736580d42..c9d07930f 100644 --- a/nimbus/sync/snap/worker/ticker.nim +++ b/nimbus/sync/snap/worker/ticker.nim @@ -10,7 +10,7 @@ # except according to those terms. import - std/[strformat, strutils], + std/[options, strformat, strutils], chronos, chronicles, eth/[common, p2p], @@ -24,13 +24,26 @@ logScope: topics = "snap-tick" type - # TODO: Seems like a compiler name mangling bug or so. If this is named - # `TickerStats` then `eqeq___syncZsnapZworkerZticker_97` complains - # that the TickerStats object does not have beaconBlock and pivotBlock - # members. So I'm assuming here it seems to take the wrong function, meaning - # the one of the `TickerStats` of full sync, because it has the same name and - # the same module name. Not sure.. - SnapTickerStats* = object + TickerSnapStatsUpdater* = proc: TickerSnapStats {.gcsafe, raises: [].} + ## Snap sync state update function + + TickerFullStatsUpdater* = proc: TickerFullStats {.gcsafe, raises: [].} + ## Full sync state update function + + SnapDescDetails = object + ## Private state descriptor + snapCb: TickerSnapStatsUpdater + recovery: bool + lastRecov: bool + lastStats: TickerSnapStats + + FullDescDetails = object + ## Private state descriptor + fullCb: TickerFullStatsUpdater + lastStats: TickerFullStats + + TickerSnapStats* = object + ## Snap sync state (see `TickerSnapStatsUpdater`) beaconBlock*: Option[BlockNumber] pivotBlock*: Option[BlockNumber] nAccounts*: (float,float) ## Mean and standard deviation @@ -40,19 +53,27 @@ type nStorageQueue*: Option[int] nQueues*: int - TickerStatsUpdater* = - proc: SnapTickerStats {.gcsafe, raises: [].} + TickerFullStats* = object + ## Full sync state (see `TickerFullStatsUpdater`) + topPersistent*: BlockNumber + nextUnprocessed*: Option[BlockNumber] + nextStaged*: Option[BlockNumber] + nStagedQueue*: int + suspended*: bool + reOrg*: bool TickerRef* = ref object - ## Account fetching state that is shared among all peers. - nBuddies: int - recovery: bool - lastRecov: bool - lastStats: SnapTickerStats - statsCb: TickerStatsUpdater + ## Ticker descriptor object + nBuddies: int logTicker: TimerCallback - started: Moment - visited: Moment + started: Moment + visited: Moment + prettyPrint: proc(t: TickerRef) {.gcsafe, raises: [].} + case fullMode: bool + of false: + snap: SnapDescDetails + of true: + full: FullDescDetails const tickerStartDelay = chronos.milliseconds(100) @@ -107,8 +128,14 @@ proc pc99(val: float): string = elif 0.0 < val and val <= 0.01: "1%" else: val.toPC(0) +proc pp(n: BlockNumber): string = + "#" & $n + +proc pp(n: Option[BlockNumber]): string = + if n.isNone: "n/a" else: n.get.pp + # ------------------------------------------------------------------------------ -# Private functions: ticking log messages +# Private functions: printing ticker messages # ------------------------------------------------------------------------------ template noFmtError(info: static[string]; code: untyped) = @@ -117,15 +144,13 @@ template noFmtError(info: static[string]; code: untyped) = except ValueError as e: raiseAssert "Inconveivable (" & info & "): " & e.msg -proc setLogTicker(t: TickerRef; at: Moment) {.gcsafe.} - -proc runLogTicker(t: TickerRef) {.gcsafe.} = +proc snapTicker(t: TickerRef) {.gcsafe.} = let - data = t.statsCb() + data = t.snap.snapCb() now = Moment.now() - if data != t.lastStats or - t.recovery != t.lastRecov or + if data != t.snap.lastStats or + t.snap.recovery != t.snap.lastRecov or tickerLogSuppressMax < (now - t.visited): var nAcc, nSto: string @@ -133,7 +158,7 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} = bc = "n/a" nStoQue = "n/a" let - recoveryDone = t.lastRecov + recoveryDone = t.snap.lastRecov accCov = data.accountsFill[0].pc99 & "(" & data.accountsFill[1].pc99 & ")" & "/" & data.accountsFill[2].pc99 & @@ -144,9 +169,9 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} = up = (now - t.started).seconds.uint64.toSI mem = getTotalMem().uint.toSI - t.lastStats = data + t.snap.lastStats = data t.visited = now - t.lastRecov = t.recovery + t.snap.lastRecov = t.snap.recovery noFmtError("runLogTicker"): if data.pivotBlock.isSome: @@ -161,7 +186,7 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} = if data.nStorageQueue.isSome: nStoQue = $data.nStorageQueue.unsafeGet - if t.recovery: + if t.snap.recovery: info "Snap sync statistics (recovery)", up, nInst, bc, pv, nAcc, accCov, nSto, nStoQue, mem elif recoveryDone: @@ -171,20 +196,83 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} = info "Snap sync statistics", up, nInst, bc, pv, nAcc, accCov, nSto, nStoQue, mem - t.setLogTicker(Moment.fromNow(tickerLogInterval)) +proc fullTicker(t: TickerRef) {.gcsafe.} = + let + data = t.full.fullCb() + now = Moment.now() + + if data != t.full.lastStats or + tickerLogSuppressMax < (now - t.visited): + let + persistent = data.topPersistent.pp + staged = data.nextStaged.pp + unprocessed = data.nextUnprocessed.pp + queued = data.nStagedQueue + reOrg = if data.reOrg: "t" else: "f" + + buddies = t.nBuddies + + # With `int64`, there are more than 29*10^10 years range for seconds + up = (now - t.started).seconds.uint64.toSI + mem = getTotalMem().uint.toSI + + t.full.lastStats = data + t.visited = now + + if data.suspended: + info "Sync statistics (suspended)", up, buddies, + persistent, unprocessed, staged, queued, reOrg, mem + else: + info "Sync statistics", up, buddies, + persistent, unprocessed, staged, queued, reOrg, mem + +# ------------------------------------------------------------------------------ +# Private functions: ticking log messages +# ------------------------------------------------------------------------------ + +proc setLogTicker(t: TickerRef; at: Moment) {.gcsafe.} + +proc runLogTicker(t: TickerRef) {.gcsafe.} = + t.prettyPrint(t) + t.setLogTicker(Moment.fromNow(tickerLogInterval)) proc setLogTicker(t: TickerRef; at: Moment) = if not t.logTicker.isNil: t.logTicker = safeSetTimer(at, runLogTicker, t) +proc initImpl(t: TickerRef; cb: TickerSnapStatsUpdater) = + t.fullMode = false + t.prettyPrint = snapTicker + t.snap = SnapDescDetails(snapCb: cb) + +proc initImpl(t: TickerRef; cb: TickerFullStatsUpdater) = + t.fullMode = true + t.prettyPrint = fullTicker + t.full = FullDescDetails(fullCb: cb) + # ------------------------------------------------------------------------------ # Public constructor and start/stop functions # ------------------------------------------------------------------------------ -proc init*(T: type TickerRef; cb: TickerStatsUpdater): T = +proc init*(t: TickerRef; cb: TickerSnapStatsUpdater) = + ## Re-initialise ticket + t.visited.reset + if t.fullMode: + t.prettyPrint(t) # print final state for full sync + t.initImpl(cb) + +proc init*(t: TickerRef; cb: TickerFullStatsUpdater) = + ## Re-initialise ticket + t.visited.reset + if not t.fullMode: + t.prettyPrint(t) # print final state for snap sync + t.initImpl(cb) + +proc init*(T: type TickerRef; cb: TickerSnapStatsUpdater): T = ## Constructor - T(statsCb: cb) + new result + result.initImpl(cb) proc start*(t: TickerRef) = ## Re/start ticker unconditionally @@ -206,30 +294,35 @@ proc startBuddy*(t: TickerRef) = ## Increment buddies counter and start ticker unless running. if t.nBuddies <= 0: t.nBuddies = 1 - if not t.recovery: - t.start() + if not t.fullMode: + if not t.snap.recovery: + t.start() else: t.nBuddies.inc proc startRecovery*(t: TickerRef) = ## Ditto for recovery mode - if not t.recovery: - t.recovery = true - if t.nBuddies <= 0: - t.start() + if not t.fullMode: + if not t.snap.recovery: + t.snap.recovery = true + if t.nBuddies <= 0: + t.start() proc stopBuddy*(t: TickerRef) = ## Decrement buddies counter and stop ticker if there are no more registered ## buddies. t.nBuddies.dec - if t.nBuddies <= 0 and not t.recovery: - t.stop() + if t.nBuddies <= 0: + if not t.fullMode: + if not t.snap.recovery: + t.stop() proc stopRecovery*(t: TickerRef) = ## Ditto for recovery mode - if t.recovery: - t.recovery = false - t.stop() + if not t.fullMode: + if t.snap.recovery: + t.snap.recovery = false + t.stop() # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker_desc.nim b/nimbus/sync/snap/worker_desc.nim index da8f2b178..4f1040c99 100644 --- a/nimbus/sync/snap/worker_desc.nim +++ b/nimbus/sync/snap/worker_desc.nim @@ -12,6 +12,7 @@ import std/hashes, + chronos, eth/[common, p2p], stew/[interval_set, keyed_queue, sorted_set], ../../db/select_backend, @@ -82,7 +83,20 @@ type SnapBuddyData* = object ## Per-worker local descriptor data extension errors*: ComErrorStatsRef ## For error handling - pivotEnv*: SnapPivotRef ## Environment containing state root + + SnapSyncModeType* = enum + ## Current sync mode, after a snapshot has been downloaded, the system + ## proceeds with full sync. + SnapSyncMode = 0 ## Start mode + PreFullSyncMode + FullSyncMode + + SnapSyncSpecs* = object + ## Full specs for all sync modes. This table must be held in the main + ## descriptor and initialised at run time. The table values are opaque + ## and will be specified in the worker module(s). + active*: SnapSyncModeType + tab*: array[SnapSyncModeType,RootRef] SnapCtxData* = object ## Globally shared data extension @@ -96,7 +110,9 @@ type coveredAccounts*: NodeTagRangeSet ## Derived from all available accounts covAccTimesFull*: uint ## # of 100% coverages recovery*: SnapRecoveryRef ## Current recovery checkpoint/context - noRecovery*: bool ## Ignore recovery checkpoints + + # Snap/full mode muliplexing + syncMode*: SnapSyncSpecs ## Sync mode data contaier # Info ticker*: TickerRef ## Ticker, logger diff --git a/tests/test_sync_snap/test_pivot.nim b/tests/test_sync_snap/test_pivot.nim index 3db543b56..7084a00e8 100644 --- a/tests/test_sync_snap/test_pivot.nim +++ b/tests/test_sync_snap/test_pivot.nim @@ -38,7 +38,7 @@ proc test_pivotStoreRead*( slotAccounts = seq[NodeKey].default for n in 0 ..< accKeys.len: let w = accKeys[n] - check dbBase.savePivot( + check dbBase.pivotSaveDB( SnapDbPivotRegistry( header: BlockHeader(stateRoot: w.to(Hash256)), nAccounts: n.uint64, @@ -50,7 +50,7 @@ proc test_pivotStoreRead*( sleep(50) # verify latest state root block: - let rc = dbBase.recoverPivot() + let rc = dbBase.pivotRecoverDB() check rc.isOk if rc.isOk: check rc.value.nAccounts == n.uint64 @@ -64,13 +64,13 @@ proc test_pivotStoreRead*( for n in 0 ..< accKeys.len: let w = accKeys[n] block: - let rc = dbBase.recoverPivot(w) + let rc = dbBase.pivotRecoverDB(w) check rc.isOk if rc.isOk: check rc.value.nAccounts == n.uint64 check rc.value.nSlotLists == n.uint64 # Update record in place - check dbBase.savePivot( + check dbBase.pivotSaveDB( SnapDbPivotRegistry( header: BlockHeader(stateRoot: w.to(Hash256)), nAccounts: n.uint64, @@ -81,7 +81,7 @@ proc test_pivotStoreRead*( # There might be a race condition on Windows (seen on qemu/win7) sleep(50) block: - let rc = dbBase.recoverPivot(w) + let rc = dbBase.pivotRecoverDB(w) check rc.isOk if rc.isOk: check rc.value.nAccounts == n.uint64