diff --git a/nimbus/config.nim b/nimbus/config.nim index 435ee39c1..a1b57dae1 100644 --- a/nimbus/config.nim +++ b/nimbus/config.nim @@ -135,6 +135,7 @@ type Default Full ## Beware, experimental Snap ## Beware, experimental + SnapCtx ## Beware, experimental NimbusConf* = object of RootObj ## Main Nimbus configuration object @@ -166,7 +167,8 @@ type longDesc: "- default -- legacy sync mode\n" & "- full -- full blockchain archive\n" & - "- snap -- experimental snap mode (development only)\n" + "- snap -- experimental snap mode (development only)\n" & + "- snapCtx -- snap considering possible recovery context\n" defaultValue: SyncMode.Default defaultValueDesc: $SyncMode.Default abbr: "y" diff --git a/nimbus/nimbus.nim b/nimbus/nimbus.nim index 7ef350509..63bcd0a4b 100644 --- a/nimbus/nimbus.nim +++ b/nimbus/nimbus.nim @@ -155,7 +155,7 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf, ) nimbus.ethNode.addCapability(protocol.eth, ethWireHandler) case conf.syncMode: - of SyncMode.Snap: + of SyncMode.Snap, SyncMode.SnapCtx: nimbus.ethNode.addCapability protocol.snap of SyncMode.Full, SyncMode.Default: discard @@ -173,10 +173,10 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf, nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers, tickerOK) nimbus.fullSyncRef.start - of SyncMode.Snap: + of SyncMode.Snap, SyncMode.SnapCtx: nimbus.snapSyncRef = SnapSyncRef.init( nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers, - nimbus.dbBackend, tickerOK) + nimbus.dbBackend, tickerOK, noRecovery = (conf.syncMode==SyncMode.Snap)) nimbus.snapSyncRef.start of SyncMode.Default: discard @@ -196,7 +196,7 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf, if conf.maxPeers > 0: var waitForPeers = true case conf.syncMode: - of SyncMode.Snap: + of SyncMode.Snap, SyncMode.SnapCtx: waitForPeers = false of SyncMode.Full, SyncMode.Default: discard @@ -433,7 +433,7 @@ proc start(nimbus: NimbusNode, conf: NimbusConf) = cast[pointer](syncer) ) - of SyncMode.Full, SyncMode.Snap: + of SyncMode.Full, SyncMode.Snap, SyncMode.SnapCtx: discard if nimbus.state == Starting: diff --git a/nimbus/sync/full.nim b/nimbus/sync/full.nim index 3099588a4..6c00ea77b 100644 --- a/nimbus/sync/full.nim +++ b/nimbus/sync/full.nim @@ -42,7 +42,7 @@ proc runStart(buddy: FullBuddyRef): bool = proc runStop(buddy: FullBuddyRef) = worker.stop(buddy) -proc runPool(buddy: FullBuddyRef; last: bool) = +proc runPool(buddy: FullBuddyRef; last: bool): bool = worker.runPool(buddy, last) proc runSingle(buddy: FullBuddyRef) {.async.} = diff --git a/nimbus/sync/misc/best_pivot.nim b/nimbus/sync/misc/best_pivot.nim index fb6eb0634..5cc12c7ee 100644 --- a/nimbus/sync/misc/best_pivot.nim +++ b/nimbus/sync/misc/best_pivot.nim @@ -33,6 +33,10 @@ const pivotMinPeersToStartSync* = 2 ## Wait for consensus of at least this number of peers before syncing. + pivotFailCountMax* = 3 + ## Stop after a peer fails too often while negotiating. This happens if + ## a peer responses repeatedly with useless data. + type BestPivotCtxRef* = ref object of RootRef ## Data shared by all peers. @@ -47,6 +51,7 @@ type header: Option[BlockHeader] ## Pivot header (if any) ctrl: BuddyCtrlRef ## Control and state settings peer: Peer ## network peer + comFailCount: int ## Beware of repeated network errors # ------------------------------------------------------------------------------ # Private helpers @@ -139,7 +144,11 @@ proc getBestHeader( return err() if hdrResp.isNone: - trace trEthRecvReceivedBlockHeaders, peer, reqLen, respose="n/a" + bp.comFailCount.inc + trace trEthRecvReceivedBlockHeaders, peer, reqLen, + response="n/a", comFailCount=bp.comFailCount + if pivotFailCountMax < bp.comFailCount: + bp.ctrl.zombie = true return err() let hdrRespLen = hdrResp.get.headers.len @@ -148,6 +157,7 @@ proc getBestHeader( header = hdrResp.get.headers[0] blockNumber = header.blockNumber trace trEthRecvReceivedBlockHeaders, peer, hdrRespLen, blockNumber + bp.comFailCount = 0 # reset fail count return ok(header) trace trEthRecvReceivedBlockHeaders, peer, reqLen, hdrRespLen diff --git a/nimbus/sync/snap.nim b/nimbus/sync/snap.nim index 80a71abc4..08406ba07 100644 --- a/nimbus/sync/snap.nim +++ b/nimbus/sync/snap.nim @@ -44,7 +44,7 @@ proc runStart(buddy: SnapBuddyRef): bool = proc runStop(buddy: SnapBuddyRef) = worker.stop(buddy) -proc runPool(buddy: SnapBuddyRef; last: bool) = +proc runPool(buddy: SnapBuddyRef; last: bool): bool = worker.runPool(buddy, last) proc runSingle(buddy: SnapBuddyRef) {.async.} = @@ -63,18 +63,21 @@ proc init*( chain: Chain; rng: ref HmacDrbgContext; maxPeers: int; - dbBackend: ChainDb, - enableTicker = false): T = + dbBackend: ChainDb; + enableTicker = false; + noRecovery = false; + ): T = new result result.initSync(ethNode, chain, maxPeers, enableTicker) result.ctx.chain = chain # explicitely override result.ctx.data.rng = rng result.ctx.data.dbBackend = dbBackend + result.ctx.data.noRecovery = noRecovery # Required to have been initialised via `addCapability()` doAssert not result.ctx.ethWireCtx.isNil proc start*(ctx: SnapSyncRef) = - doAssert ctx.startSync(daemon = true) + doAssert ctx.startSync() proc stop*(ctx: SnapSyncRef) = ctx.stopSync() diff --git a/nimbus/sync/snap/constants.nim b/nimbus/sync/snap/constants.nim index f125eca68..20bfe6ff3 100644 --- a/nimbus/sync/snap/constants.nim +++ b/nimbus/sync/snap/constants.nim @@ -37,24 +37,28 @@ const snapRequestBytesLimit* = 2 * 1024 * 1024 ## Soft bytes limit to request in `snap` protocol calls. - snapAccountsSaveDanglingMax* = 10_000 - ## Recovery data are stored if the healing register - ## `fetchAccounts.missingNodes` with dangling node links has no more - ## than this many entries. Upon recovery, these dangling links allow - ## to reconstuct the needed ranges to complete the hexary trie for the - ## account fot current pivot. + snapRequestTrieNodesFetchMax* = 1024 + ## Informal maximal number of trie nodes to fetch at once in `snap` + ## protocol calls. This is not an official limit but found with several + ## implementations (e.g. Geth.) + ## + ## Resticting the fetch list length early allows to better paralellise + ## healing. + + + snapAccountsSaveProcessedChunksMax* = 1000 + ## Recovery data are stored if the processed ranges list contains no more + ## than this many range `chunks`. ## ## If there are too many dangling nodes, no data will be saved and restart ## has to perform from scratch. snapAccountsSaveStorageSlotsMax* = 10_000 - ## Similar retriction as `snapAccountsSaveDanglingMax` but for the joint - ## queues `fetchStorageFull` and `fetchStoragePart`. If the joint queue - ## becomes too large, nothing is saved. + ## Recovery data are stored if the oustanding storage slots to process do + ## not amount to more than this many entries. ## - ## Note thet the length of the jount queue is controlled by the constat - ## `snapStorageSlotsQuPrioThresh` which should be smaller than - ## this one. + ## If there are too many dangling nodes, no data will be saved and restart + ## has to perform from scratch. snapStorageSlotsFetchMax* = 2 * 1024 @@ -68,16 +72,17 @@ const ## and switch to processing the storage slots queue if the queue has ## more than this many items. - - snapTrieNodesFetchMax* = 1024 - ## Informal maximal number of trie nodes to fetch at once. This is not - ## an official limit but found on several implementations (e.g. Geth.) - ## - ## Resticting the fetch list length early allows to better paralellise - ## healing. - # -------------- + healInspectionBatch* = 10_000 + ## Number of nodes to inspect in a single batch. In between batches, a + ## task/thread switch is allowed. + + healInspectionBatchWaitNanoSecs* = 500 + ## Wait some time asynchroneously after processing `healInspectionBatch` + ## nodes to allow for a pseudo -task switch. + + healAccountsTrigger* = 0.99 ## Apply accounts healing if the global snap download coverage factor ## exceeds this setting. The global coverage factor is derived by merging @@ -90,11 +95,6 @@ const ## over the network. More requests might be a disadvantage if peers only ## serve a maximum number requests (rather than data.) - healAccountsInspectionBatch* = 10_000 - ## Number of nodes to inspect in a single batch. Several batches are run - ## until at least `snapTrieNodeFetchMax` dangling nodes are found. In - ## between batches, a tast/thread switch is allowed. - healAccountsBatchFetchMax* = 10 * 1024 ## Keep on gloing in healing task up until this many nodes have been ## fetched from the network or some error contition terminates the task. diff --git a/nimbus/sync/snap/range_desc.nim b/nimbus/sync/snap/range_desc.nim index fbaddae57..73948f3a9 100644 --- a/nimbus/sync/snap/range_desc.nim +++ b/nimbus/sync/snap/range_desc.nim @@ -221,13 +221,13 @@ proc digestTo*(data: Blob; T: type NodeTag): T = proc isEmpty*(lrs: NodeTagRangeSet): bool = ## Returns `true` if the argument set `lrs` of intervals is empty - lrs.total == 0 and lrs.chunks == 0 + lrs.chunks == 0 proc isEmpty*(lrs: openArray[NodeTagRangeSet]): bool = ## Variant of `isEmpty()` where intervals are distributed across several ## sets. for ivSet in lrs: - if 0 < ivSet.total or 0 < ivSet.chunks: + if 0 < ivSet.chunks: return false true @@ -276,24 +276,6 @@ proc fullFactor*(lrs: NodeTagRangeSet): float = else: 1.0 # number of points in `lrs` is `2^256 + 1` -proc fullFactor*(lrs: openArray[NodeTagRangeSet]): float = - ## Variant of `fullFactor()` where intervals are distributed across several - ## sets. This function makes sense only if the interval sets are mutually - ## disjunct. - var accu: NodeTag - for ivSet in lrs: - if 0 < ivSet.total: - if high(NodeTag) - ivSet.total < accu: - return 1.0 - accu = accu + ivSet.total - elif ivSet.chunks == 0: - discard - else: # number of points in `ivSet` is `2^256 + 1` - return 1.0 - if accu == 0.to(NodeTag): - return 0.0 - accu.u256.to(float) / (2.0^256) - # ------------------------------------------------------------------------------ # Public functions: printing & pretty printing # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker.nim b/nimbus/sync/snap/worker.nim index 313332a14..4a4c0a6d1 100644 --- a/nimbus/sync/snap/worker.nim +++ b/nimbus/sync/snap/worker.nim @@ -15,10 +15,12 @@ import eth/[common, p2p], stew/[interval_set, keyed_queue], ../../db/select_backend, - ".."/[misc/best_pivot, protocol, sync_desc], + ../../utils/prettify, + ../misc/best_pivot, + ".."/[protocol, sync_desc], ./worker/[pivot_helper, ticker], ./worker/com/com_error, - ./worker/db/[snapdb_check, snapdb_desc], + ./worker/db/[hexary_desc, snapdb_check, snapdb_desc, snapdb_pivot], "."/[constants, range_desc, worker_desc] {.push raises: [Defect].} @@ -54,6 +56,54 @@ proc `pivot=`(buddy: SnapBuddyRef; val: BestPivotWorkerRef) = # Private functions # ------------------------------------------------------------------------------ +proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} = + let recov = ctx.data.recovery + if recov.isNil: + return false + + let + checkpoint = + "#" & $recov.state.header.blockNumber & "(" & $recov.level & ")" + topLevel = recov.level == 0 + env = block: + let rc = ctx.data.pivotTable.eq recov.state.header.stateRoot + if rc.isErr: + error "Recovery pivot context gone", checkpoint, topLevel + return false + rc.value + + # Cosmetics: allows other processes to log etc. + await sleepAsync(1300.milliseconds) + + when extraTraceMessages: + trace "Recovery continued ...", checkpoint, topLevel, + nAccounts=recov.state.nAccounts, nDangling=recov.state.dangling.len + + # Update pivot data from recovery checkpoint + env.recoverPivotFromCheckpoint(ctx, topLevel) + + # Fetch next recovery record if there is any + if recov.state.predecessor.isZero: + trace "Recovery done", checkpoint, topLevel + return false + let rc = ctx.data.snapDb.recoverPivot(recov.state.predecessor) + if rc.isErr: + when extraTraceMessages: + trace "Recovery stopped at pivot stale checkpoint", checkpoint, topLevel + return false + + # Set up next level pivot checkpoint + ctx.data.recovery = SnapRecoveryRef( + state: rc.value, + level: recov.level + 1) + + # Push onto pivot table and continue recovery (i.e. do not stop it yet) + ctx.data.pivotTable.update( + ctx.data.recovery.state.header, ctx, reverse=true) + + return true # continue recovery + + proc updateSinglePivot(buddy: SnapBuddyRef): Future[bool] {.async.} = ## Helper, negotiate pivot unless present if buddy.pivot.pivotHeader.isOk: @@ -106,7 +156,20 @@ proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool = else: trace "Ticker is disabled" - result = true + # Check for recovery mode + if not ctx.data.noRecovery: + let rc = ctx.data.snapDb.recoverPivot() + if rc.isOk: + ctx.data.recovery = SnapRecoveryRef(state: rc.value) + ctx.daemon = true + + # Set up early initial pivot + ctx.data.pivotTable.update(ctx.data.recovery.state.header, ctx) + trace "Recovery started", + checkpoint=("#" & $ctx.data.pivotTable.topNumber() & "(0)") + if not ctx.data.ticker.isNil: + ctx.data.ticker.startRecovery() + true proc release*(ctx: SnapCtxRef) = ## Global clean up @@ -147,10 +210,20 @@ proc stop*(buddy: SnapBuddyRef) = proc runDaemon*(ctx: SnapCtxRef) {.async.} = ## Enabled while `ctx.daemon` is `true` ## - let nPivots = ctx.data.pivotTable.len - trace "I am the mighty recovery daemon ... stopped for now", nPivots - # To be populated ... - ctx.daemon = false + if not ctx.data.recovery.isNil: + if not await ctx.recoveryStepContinue(): + # Done, stop recovery + ctx.data.recovery = nil + ctx.daemon = false + + # Update logging + if not ctx.data.ticker.isNil: + ctx.data.ticker.stopRecovery() + return + + # Update logging + if not ctx.data.ticker.isNil: + ctx.data.ticker.stopRecovery() proc runSingle*(buddy: SnapBuddyRef) {.async.} = @@ -159,9 +232,7 @@ proc runSingle*(buddy: SnapBuddyRef) {.async.} = ## * `buddy.ctrl.poolMode` is `false` ## let peer = buddy.peer - # This pivot finder one harmonises assigned difficulties of at least two - # peers. There can only be one `pivot2Exec()` instance active/unfinished - # (which is wrapped into the helper function `updateSinglePivot()`.) + # Find pivot, probably relaxed mode enabled in `setup()` if not await buddy.updateSinglePivot(): # Wait if needed, then return => repeat if not buddy.ctrl.stopped: @@ -171,13 +242,14 @@ proc runSingle*(buddy: SnapBuddyRef) {.async.} = buddy.ctrl.multiOk = true -proc runPool*(buddy: SnapBuddyRef, last: bool) = +proc runPool*(buddy: SnapBuddyRef, last: bool): bool = ## Enabled when `buddy.ctrl.poolMode` is `true` ## let ctx = buddy.ctx - if ctx.poolMode: - ctx.poolMode = false + ctx.poolMode = false + result = true + block: let rc = ctx.data.pivotTable.lastValue if rc.isOk: @@ -233,12 +305,6 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} = # Full sync processsing based on current snapshot # ----------------------------------------------- if env.storageDone: - if not buddy.checkAccountsTrieIsComplete(env): - error "Ooops, all accounts fetched but DvnB still incomplete", peer, pivot - - if not buddy.checkStorageSlotsTrieIsComplete(env): - error "Ooops, all storages fetched but DB still incomplete", peer, pivot - trace "Snap full sync -- not implemented yet", peer, pivot await sleepAsync(5.seconds) return @@ -246,31 +312,30 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} = # Snapshot sync processing # ------------------------ - # If this is a new pivot, the previous one can be partially cleaned up. - # There is no point in keeping some older space consuming state data any - # longer. + # If this is a new pivot, the previous one can be cleaned up. There is no + # point in keeping some older space consuming state data any longer. ctx.data.pivotTable.beforeTopMostlyClean() - # This one is the syncing work horse + # This one is the syncing work horse which downloads the database let syncActionContinue = await env.execSnapSyncAction(buddy) # Save state so sync can be partially resumed at next start up + let + nCheckNodes = env.fetchAccounts.checkNodes.len + nSickSubTries = env.fetchAccounts.sickSubTries.len + nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len + processed = env.fetchAccounts.processed.fullFactor.toPC(2) block: - let - nMissingNodes = env.fetchAccounts.missingNodes.len - nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len - - rc = env.saveSnapState(ctx) + let rc = env.saveCheckpoint(ctx) if rc.isErr: error "Failed to save recovery checkpoint", peer, pivot, nAccounts=env.nAccounts, nSlotLists=env.nSlotLists, - nMissingNodes, nStoQu, error=rc.error + processed, nStoQu, error=rc.error else: - discard when extraTraceMessages: trace "Saved recovery checkpoint", peer, pivot, nAccounts=env.nAccounts, nSlotLists=env.nSlotLists, - nMissingNodes, nStoQu, blobSize=rc.value + processed, nStoQu, blobSize=rc.value if not syncActionContinue: return diff --git a/nimbus/sync/snap/worker/db/hexary_error.nim b/nimbus/sync/snap/worker/db/hexary_error.nim index 6356eb65c..ce0dfb8c5 100644 --- a/nimbus/sync/snap/worker/db/hexary_error.nim +++ b/nimbus/sync/snap/worker/db/hexary_error.nim @@ -22,7 +22,7 @@ type SlotsNotSrictlyIncreasing TrieLoopAlert TrieIsEmpty - TooManyDanglingLinks + TooManyProcessedChunks TooManySlotAccounts # import diff --git a/nimbus/sync/snap/worker/db/snapdb_accounts.nim b/nimbus/sync/snap/worker/db/snapdb_accounts.nim index eb3092209..074edf5d8 100644 --- a/nimbus/sync/snap/worker/db/snapdb_accounts.nim +++ b/nimbus/sync/snap/worker/db/snapdb_accounts.nim @@ -507,13 +507,12 @@ proc getAccountsNodeKey*( proc getAccountsNodeKey*( pv: SnapDbRef; ## Base descriptor on `BaseChainDB` - peer: Peer; ## For log messages, only root: Hash256; ## state root path: Blob; ## Partial node path ): Result[NodeKey,HexaryDbError] = ## Variant of `getAccountsNodeKey()` for persistent storage. SnapDbAccountsRef.init( - pv, root, peer).getAccountsNodeKey(path, persistent=true) + pv, root, Peer()).getAccountsNodeKey(path, persistent=true) proc getAccountsData*( @@ -524,7 +523,6 @@ proc getAccountsData*( ## Fetch account data. ## ## Caveat: There is no unit test yet for the non-persistent version - let peer = ps.peer var acc: Account noRlpExceptionOops("getAccountData()"): @@ -542,13 +540,12 @@ proc getAccountsData*( proc getAccountsData*( pv: SnapDbRef; ## Base descriptor on `BaseChainDB` - peer: Peer; ## For log messages, only root: Hash256; ## State root path: NodeKey; ## Account to visit ): Result[Account,HexaryDbError] = ## Variant of `getAccountsData()` for persistent storage. SnapDbAccountsRef.init( - pv, root, peer).getAccountsData(path, persistent=true) + pv, root, Peer()).getAccountsData(path, persistent=true) # ------------------------------------------------------------------------------ # Public functions: additional helpers diff --git a/nimbus/sync/snap/worker/db/snapdb_check.nim b/nimbus/sync/snap/worker/db/snapdb_check.nim index c8ab4dc8d..8ed5d6a4a 100644 --- a/nimbus/sync/snap/worker/db/snapdb_check.nim +++ b/nimbus/sync/snap/worker/db/snapdb_check.nim @@ -48,7 +48,7 @@ proc accountsCtx( ("covered=" & env.fetchAccounts.unprocessed.emptyFactor.toPC(0) & "/" & ctx.data.coveredAccounts.fullFactor.toPC(0)) & "," & "nCheckNodes=" & $env.fetchAccounts.checkNodes.len & "," & - "nMissingNodes=" & $env.fetchAccounts.missingNodes.len & "}" + "nSickSubTries=" & $env.fetchAccounts.sickSubTries.len & "}" # ------------------------------------------------------------------------------ # Private helpers @@ -76,7 +76,7 @@ proc storageSlotsCtx( result &= "" & "covered=" & slots.unprocessed.emptyFactor.toPC(0) & "nCheckNodes=" & $slots.checkNodes.len & "," & - "nMissingNodes=" & $slots.missingNodes.len + "nSickSubTries=" & $slots.sickSubTries.len result &= "}" # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/db/snapdb_pivot.nim b/nimbus/sync/snap/worker/db/snapdb_pivot.nim index ddc4aea60..4ff473031 100644 --- a/nimbus/sync/snap/worker/db/snapdb_pivot.nim +++ b/nimbus/sync/snap/worker/db/snapdb_pivot.nim @@ -9,7 +9,6 @@ # except according to those terms. import - #chronicles, eth/[common, rlp], stew/results, ../../range_desc, @@ -17,18 +16,16 @@ import {.push raises: [Defect].} -#logScope: -# topics = "snap-db" - type SnapDbPivotRegistry* = object - predecessor*: NodeKey ## Predecessor key in chain + predecessor*: NodeKey ## Predecessor key in chain, auto filled header*: BlockHeader ## Pivot state, containg state root nAccounts*: uint64 ## Imported # of accounts nSlotLists*: uint64 ## Imported # of account storage tries dangling*: seq[Blob] ## Dangling nodes in accounts trie + processed*: seq[ + (NodeTag,NodeTag)] ## Processed acoount ranges slotAccounts*: seq[NodeKey] ## List of accounts with storage slots - coverage*: uint8 ## coverage factor, 255 => 100% const extraTraceMessages = false or true @@ -58,24 +55,6 @@ proc savePivot*( return ok(rlpData.len) # notreached -proc savePivot*( - pv: SnapDbRef; ## Base descriptor on `BaseChainDB` - header: BlockHeader; ## Pivot state, containg state root - nAccounts: uint64; ## Imported # of accounts - nSlotLists: uint64; ## Imported # of account storage tries - dangling: seq[Blob]; ## Dangling nodes in accounts trie - slotAccounts: seq[NodeKey]; ## List of accounts with storage slots - coverage: uint8; ## coverage factor, 255 => 100% - ): Result[int,HexaryDbError] = - ## Variant of `savePivot()` - result = pv.savePivot SnapDbPivotRegistry( - header: header, - nAccounts: nAccounts, - nSlotLists: nSlotLists, - dangling: dangling, - slotAccounts: slotAccounts, - coverage: coverage) - proc recoverPivot*( pv: SnapDbRef; ## Base descriptor on `BaseChainDB` stateRoot: NodeKey; ## Check for a particular state root diff --git a/nimbus/sync/snap/worker/heal_accounts.nim b/nimbus/sync/snap/worker/heal_accounts.nim index 22d821a4c..1bc9dfa5c 100644 --- a/nimbus/sync/snap/worker/heal_accounts.nim +++ b/nimbus/sync/snap/worker/heal_accounts.nim @@ -47,7 +47,7 @@ ## ## Legend: ## * `<..>`: some action, process, etc. -## * `{missing-nodes}`: list implemented as `env.fetchAccounts.missingNodes` +## * `{missing-nodes}`: list implemented as `env.fetchAccounts.sickSubTries` ## * `(state-root}`: implicit argument for `getAccountNodeKey()` when ## the argument list is empty ## * `{leaf-nodes}`: list is optimised out @@ -144,7 +144,7 @@ proc healingCtx( ("covered=" & env.fetchAccounts.unprocessed.emptyFactor.toPC(0) & "/" & ctx.data.coveredAccounts.fullFactor.toPC(0)) & "," & "nCheckNodes=" & $env.fetchAccounts.checkNodes.len & "," & - "nMissingNodes=" & $env.fetchAccounts.missingNodes.len & "}" + "nSickSubTries=" & $env.fetchAccounts.sickSubTries.len & "}" # ------------------------------------------------------------------------------ # Private functions @@ -154,7 +154,7 @@ proc verifyStillMissingNodes( buddy: SnapBuddyRef; env: SnapPivotRef; ) = - ## Check whether previously missing nodes from the `missingNodes` list + ## Check whether previously missing nodes from the `sickSubTries` list ## have been magically added to the database since it was checked last ## time. These nodes will me moved to `checkNodes` for further processing. let @@ -164,7 +164,7 @@ proc verifyStillMissingNodes( stateRoot = env.stateHeader.stateRoot var delayed: seq[NodeSpecs] - for w in env.fetchAccounts.missingNodes: + for w in env.fetchAccounts.sickSubTries: if ctx.data.snapDb.nodeExists(peer, stateRoot, w): # Check nodes for dangling links below env.fetchAccounts.checkNodes.add w.partialPath @@ -173,7 +173,7 @@ proc verifyStillMissingNodes( delayed.add w # Must not modify sequence while looping over it - env.fetchAccounts.missingNodes = env.fetchAccounts.missingNodes & delayed + env.fetchAccounts.sickSubTries = env.fetchAccounts.sickSubTries & delayed proc updateMissingNodesList( @@ -190,13 +190,13 @@ proc updateMissingNodesList( peer = buddy.peer stateRoot = env.stateHeader.stateRoot - while env.fetchAccounts.missingNodes.len < snapTrieNodesFetchMax: + while env.fetchAccounts.sickSubTries.len < snapRequestTrieNodesFetchMax: # Inspect hexary trie for dangling nodes let rc = db.inspectAccountsTrie( peer, stateRoot, env.fetchAccounts.checkNodes, # start with these nodes env.fetchAccounts.resumeCtx, # resume previous attempt - healAccountsInspectionBatch) # visit no more than this many nodes + healInspectionBatch) # visit no more than this many nodes if rc.isErr: when extraTraceMessages: error logTxt "failed => stop", peer, @@ -210,15 +210,15 @@ proc updateMissingNodesList( env.fetchAccounts.checkNodes.setLen(0) # Collect result - env.fetchAccounts.missingNodes = - env.fetchAccounts.missingNodes & rc.value.dangling + env.fetchAccounts.sickSubTries = + env.fetchAccounts.sickSubTries & rc.value.dangling # Done unless there is some resumption context if rc.value.resumeCtx.isNil: break # Allow async task switch and continue. Note that some other task might - # steal some of the `env.fetchAccounts.missingNodes`. + # steal some of the `env.fetchAccounts.sickSubTries`. await sleepAsync 1.nanoseconds return true @@ -229,7 +229,7 @@ proc getMissingNodesFromNetwork( env: SnapPivotRef; ): Future[seq[NodeSpecs]] {.async.} = - ## Extract from `missingNodes` the next batch of nodes that need + ## Extract from `sickSubTries` the next batch of nodes that need ## to be merged it into the database let ctx = buddy.ctx @@ -237,13 +237,13 @@ proc getMissingNodesFromNetwork( stateRoot = env.stateHeader.stateRoot pivot = "#" & $env.stateHeader.blockNumber # for logging - nMissingNodes = env.fetchAccounts.missingNodes.len - inxLeft = max(0, nMissingNodes - snapTrieNodesFetchMax) + nSickSubTries = env.fetchAccounts.sickSubTries.len + inxLeft = max(0, nSickSubTries - snapRequestTrieNodesFetchMax) # There is no point in processing too many nodes at the same time. So leave - # the rest on the `missingNodes` queue to be handled later. - let fetchNodes = env.fetchAccounts.missingNodes[inxLeft ..< nMissingNodes] - env.fetchAccounts.missingNodes.setLen(inxLeft) + # the rest on the `sickSubTries` queue to be handled later. + let fetchNodes = env.fetchAccounts.sickSubTries[inxLeft ..< nSickSubTries] + env.fetchAccounts.sickSubTries.setLen(inxLeft) # Initalise for `getTrieNodes()` for fetching nodes from the network var @@ -253,8 +253,9 @@ proc getMissingNodesFromNetwork( pathList.add @[w.partialPath] nodeKey[w.partialPath] = w.nodeKey - # Fetch nodes from the network. Note that the remainder of the `missingNodes` - # list might be used by another process that runs semi-parallel. + # Fetch nodes from the network. Note that the remainder of the + # `sickSubTries` list might be used by another process that runs + # semi-parallel. let rc = await buddy.getTrieNodes(stateRoot, pathList, pivot) if rc.isOk: # Reset error counts for detecting repeated timeouts, network errors, etc. @@ -262,7 +263,7 @@ proc getMissingNodesFromNetwork( # Register unfetched missing nodes for the next pass for w in rc.value.leftOver: - env.fetchAccounts.missingNodes.add NodeSpecs( + env.fetchAccounts.sickSubTries.add NodeSpecs( partialPath: w[0], nodeKey: nodeKey[w[0]]) return rc.value.nodes.mapIt(NodeSpecs( @@ -271,8 +272,8 @@ proc getMissingNodesFromNetwork( data: it.data)) # Restore missing nodes list now so that a task switch in the error checker - # allows other processes to access the full `missingNodes` list. - env.fetchAccounts.missingNodes = env.fetchAccounts.missingNodes & fetchNodes + # allows other processes to access the full `sickSubTries` list. + env.fetchAccounts.sickSubTries = env.fetchAccounts.sickSubTries & fetchNodes let error = rc.error if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): @@ -364,14 +365,14 @@ proc accountsHealingImpl( buddy.verifyStillMissingNodes(env) # If `checkNodes` is empty, healing is at the very start or was - # postponed in which case `missingNodes` is non-empty. + # postponed in which case `sickSubTries` is non-empty. if env.fetchAccounts.checkNodes.len != 0 or - env.fetchAccounts.missingNodes.len == 0: + env.fetchAccounts.sickSubTries.len == 0: if not await buddy.updateMissingNodesList(env): return 0 # Check whether the trie is complete. - if env.fetchAccounts.missingNodes.len == 0: + if env.fetchAccounts.sickSubTries.len == 0: trace logTxt "complete", peer, ctx=buddy.healingCtx(env) return 0 # nothing to do @@ -386,7 +387,7 @@ proc accountsHealingImpl( # Storage error, just run the next lap (not much else that can be done) error logTxt "error updating persistent database", peer, ctx=buddy.healingCtx(env), nNodes=nodeSpecs.len, error=report[^1].error - env.fetchAccounts.missingNodes = env.fetchAccounts.missingNodes & nodeSpecs + env.fetchAccounts.sickSubTries = env.fetchAccounts.sickSubTries & nodeSpecs return -1 # Filter out error and leaf nodes @@ -399,7 +400,7 @@ proc accountsHealingImpl( if w.error != NothingSerious or w.kind.isNone: # error, try downloading again - env.fetchAccounts.missingNodes.add nodeSpecs[inx] + env.fetchAccounts.sickSubTries.add nodeSpecs[inx] elif w.kind.unsafeGet != Leaf: # re-check this node @@ -434,21 +435,6 @@ proc healAccounts*( ctx = buddy.ctx peer = buddy.peer - # Only start healing if there is some completion level, already. - # - # We check against the global coverage factor, i.e. a measure for how - # much of the total of all accounts have been processed. Even if the trie - # database for the current pivot state root is sparsely filled, there - # is a good chance that it can inherit some unchanged sub-trie from an - # earlier pivor state root download. The healing process then works like - # sort of glue. - # - if env.nAccounts == 0 or - ctx.data.coveredAccounts.fullFactor < healAccountsTrigger: - #when extraTraceMessages: - # trace logTxt "postponed", peer, ctx=buddy.healingCtx(env) - return - when extraTraceMessages: trace logTxt "started", peer, ctx=buddy.healingCtx(env) diff --git a/nimbus/sync/snap/worker/heal_storage_slots.nim b/nimbus/sync/snap/worker/heal_storage_slots.nim index e07242d93..7a6740959 100644 --- a/nimbus/sync/snap/worker/heal_storage_slots.nim +++ b/nimbus/sync/snap/worker/heal_storage_slots.nim @@ -62,7 +62,7 @@ proc healingCtx( "pivot=" & "#" & $env.stateHeader.blockNumber & "," & "covered=" & slots.unprocessed.emptyFactor.toPC(0) & "," & "nCheckNodes=" & $slots.checkNodes.len & "," & - "nMissingNodes=" & $slots.missingNodes.len & "}" + "nSickSubTries=" & $slots.sickSubTries.len & "}" # ------------------------------------------------------------------------------ # Private functions @@ -98,7 +98,7 @@ proc verifyStillMissingNodes( kvp: SnapSlotsQueuePair; env: SnapPivotRef; ) = - ## Check whether previously missing nodes from the `missingNodes` list + ## Check whether previously missing nodes from the `sickSubTries` list ## have been magically added to the database since it was checked last ## time. These nodes will me moved to `checkNodes` for further processing. let @@ -110,7 +110,7 @@ proc verifyStillMissingNodes( slots = kvp.data.slots var delayed: seq[NodeSpecs] - for w in slots.missingNodes: + for w in slots.sickSubTries: let rc = db.getStorageSlotsNodeKey(peer, accKey, storageRoot, w.partialPath) if rc.isOk: # Check nodes for dangling links @@ -120,7 +120,7 @@ proc verifyStillMissingNodes( delayed.add w # Must not modify sequence while looping over it - slots.missingNodes = slots.missingNodes & delayed + slots.sickSubTries = slots.sickSubTries & delayed proc updateMissingNodesList( @@ -140,7 +140,7 @@ proc updateMissingNodesList( storageRoot = kvp.key slots = kvp.data.slots - while slots.missingNodes.len < snapTrieNodesFetchMax: + while slots.sickSubTries.len < snapRequestTrieNodesFetchMax: # Inspect hexary trie for dangling nodes let rc = db.inspectStorageSlotsTrie( peer, accKey, storageRoot, @@ -161,15 +161,14 @@ proc updateMissingNodesList( slots.checkNodes.setLen(0) # Collect result - slots.missingNodes = - slots.missingNodes & rc.value.dangling + slots.sickSubTries = slots.sickSubTries & rc.value.dangling # Done unless there is some resumption context if rc.value.resumeCtx.isNil: break # Allow async task switch and continue. Note that some other task might - # steal some of the `env.fetchAccounts.missingNodes`. + # steal some of the `env.fetchAccounts.sickSubTries`. await sleepAsync 1.nanoseconds return true @@ -181,7 +180,7 @@ proc getMissingNodesFromNetwork( env: SnapPivotRef; ): Future[seq[NodeSpecs]] {.async.} = - ## Extract from `missingNodes` the next batch of nodes that need + ## Extract from `sickSubTries` the next batch of nodes that need ## to be merged it into the database let ctx = buddy.ctx @@ -191,13 +190,13 @@ proc getMissingNodesFromNetwork( pivot = "#" & $env.stateHeader.blockNumber # for logging slots = kvp.data.slots - nMissingNodes = slots.missingNodes.len - inxLeft = max(0, nMissingNodes - snapTrieNodesFetchMax) + nSickSubTries = slots.sickSubTries.len + inxLeft = max(0, nSickSubTries - snapRequestTrieNodesFetchMax) # There is no point in processing too many nodes at the same time. So leave - # the rest on the `missingNodes` queue to be handled later. - let fetchNodes = slots.missingNodes[inxLeft ..< nMissingNodes] - slots.missingNodes.setLen(inxLeft) + # the rest on the `sickSubTries` queue to be handled later. + let fetchNodes = slots.sickSubTries[inxLeft ..< nSickSubTries] + slots.sickSubTries.setLen(inxLeft) # Initalise for `getTrieNodes()` for fetching nodes from the network var @@ -207,7 +206,7 @@ proc getMissingNodesFromNetwork( pathList.add @[w.partialPath] nodeKey[w.partialPath] = w.nodeKey - # Fetch nodes from the network. Note that the remainder of the `missingNodes` + # Fetch nodes from the network. Note that the remainder of the `sickSubTries` # list might be used by another process that runs semi-parallel. let req = @[accKey.to(Blob)] & fetchNodes.mapIt(it.partialPath) @@ -219,7 +218,7 @@ proc getMissingNodesFromNetwork( # Register unfetched missing nodes for the next pass for w in rc.value.leftOver: for n in 1 ..< w.len: - slots.missingNodes.add NodeSpecs( + slots.sickSubTries.add NodeSpecs( partialPath: w[n], nodeKey: nodeKey[w[n]]) return rc.value.nodes.mapIt(NodeSpecs( @@ -228,8 +227,8 @@ proc getMissingNodesFromNetwork( data: it.data)) # Restore missing nodes list now so that a task switch in the error checker - # allows other processes to access the full `missingNodes` list. - slots.missingNodes = slots.missingNodes & fetchNodes + # allows other processes to access the full `sickSubTries` list. + slots.sickSubTries = slots.sickSubTries & fetchNodes let error = rc.error if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): @@ -384,7 +383,7 @@ proc storageSlotsHealing( return false # Check whether the trie is complete. - if slots.missingNodes.len == 0: + if slots.sickSubTries.len == 0: trace logTxt "complete", peer, itCtx=buddy.healingCtx(kvp,env) return true @@ -401,7 +400,7 @@ proc storageSlotsHealing( error logTxt "error updating persistent database", peer, itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists, nStorageQueue, nNodes=nodeSpecs.len, error=report[^1].error - slots.missingNodes = slots.missingNodes & nodeSpecs + slots.sickSubTries = slots.sickSubTries & nodeSpecs return false when extraTraceMessages: @@ -421,7 +420,7 @@ proc storageSlotsHealing( if w.error != NothingSerious or w.kind.isNone: # error, try downloading again - slots.missingNodes.add nodeSpecs[inx] + slots.sickSubTries.add nodeSpecs[inx] elif w.kind.unsafeGet != Leaf: # re-check this node @@ -478,7 +477,7 @@ proc healingIsComplete( return true # done # Full range covered by unprocessed items - kvp.data.slots = SnapTrieRangeBatchRef(missingNodes: rc.value.dangling) + kvp.data.slots = SnapRangeBatchRef(sickSubTries: rc.value.dangling) kvp.data.slots.unprocessed.init() # Proceed with healing diff --git a/nimbus/sync/snap/worker/pivot_helper.nim b/nimbus/sync/snap/worker/pivot_helper.nim index 3f14c7964..92c0677fd 100644 --- a/nimbus/sync/snap/worker/pivot_helper.nim +++ b/nimbus/sync/snap/worker/pivot_helper.nim @@ -10,12 +10,13 @@ import std/[math, sequtils], + bearssl/rand, chronos, - eth/[common, p2p], + eth/[common, trie/trie_defs], stew/[interval_set, keyed_queue], ../../sync_desc, ".."/[constants, range_desc, worker_desc], - ./db/[hexary_error, snapdb_pivot], + ./db/[hexary_error, snapdb_accounts, snapdb_pivot], "."/[heal_accounts, heal_storage_slots, range_fetch_accounts, range_fetch_storage_slots, ticker] @@ -29,10 +30,11 @@ const # Private helpers # ------------------------------------------------------------------------------ -proc init(batch: var SnapTrieRangeBatch; ctx: SnapCtxRef) = +proc init(batch: SnapRangeBatchRef; ctx: SnapCtxRef) = ## Returns a pair of account hash range lists with the full range of hashes ## smartly spread across the mutually disjunct interval sets. batch.unprocessed.init() + batch.processed = NodeTagRangeSet.init() # Initialise accounts range fetch batch, the pair of `fetchAccounts[]` # range sets. @@ -59,13 +61,7 @@ proc init(batch: var SnapTrieRangeBatch; ctx: SnapCtxRef) = discard batch.unprocessed[1].merge iv when extraAsserts: - if batch.unprocessed[0].isEmpty: - doAssert batch.unprocessed[1].isFull - elif batch.unprocessed[1].isEmpty: - doAssert batch.unprocessed[0].isFull - else: - doAssert((batch.unprocessed[0].total - 1) + - batch.unprocessed[1].total == high(UInt256)) + doAssert batch.unprocessed.verify # ------------------------------------------------------------------------------ # Public functions: pivot table related @@ -81,12 +77,12 @@ proc beforeTopMostlyClean*(pivotTable: var SnapPivotTable) = env.fetchStorageFull.clear() env.fetchStoragePart.clear() env.fetchAccounts.checkNodes.setLen(0) - env.fetchAccounts.missingNodes.setLen(0) + env.fetchAccounts.sickSubTries.setLen(0) env.obsolete = true proc topNumber*(pivotTable: var SnapPivotTable): BlockNumber = - ## Return the block number op the top pivot entry, or zero if there is none. + ## Return the block number of the top pivot entry, or zero if there is none. let rc = pivotTable.lastValue if rc.isOk: return rc.value.stateHeader.blockNumber @@ -96,6 +92,7 @@ proc update*( pivotTable: var SnapPivotTable; ## Pivot table header: BlockHeader; ## Header to generate new pivot from ctx: SnapCtxRef; ## Some global context + reverse = false; ## Update from bottom (e.g. for recovery) ) = ## Activate environment for state root implied by `header` argument. This ## function appends a new environment unless there was any not far enough @@ -106,14 +103,28 @@ proc update*( ## # Check whether the new header follows minimum depth requirement. This is # where the queue is assumed to have increasing block numbers. - if pivotTable.topNumber() + pivotBlockDistanceMin < header.blockNumber: + if reverse or + pivotTable.topNumber() + pivotBlockDistanceMin < header.blockNumber: # Ok, append a new environment - let env = SnapPivotRef(stateHeader: header) + let env = SnapPivotRef( + stateHeader: header, + fetchAccounts: SnapRangeBatchRef()) env.fetchAccounts.init(ctx) + var topEnv = env # Append per-state root environment to LRU queue - discard pivotTable.lruAppend(header.stateRoot, env, ctx.buddiesMax) + if reverse: + discard pivotTable.prepend(header.stateRoot, env) + # Make sure that the LRU table does not grow too big. + if max(3, ctx.buddiesMax) < pivotTable.len: + # Delete second entry rather thanthe first which might currently + # be needed. + let rc = pivotTable.secondKey + if rc.isOk: + pivotTable.del rc.value + else: + discard pivotTable.lruAppend(header.stateRoot, env, ctx.buddiesMax) proc tickerStats*( @@ -142,7 +153,7 @@ proc tickerStats*( aSqSum += aLen * aLen # Fill utilisation mean & variance - let fill = kvp.data.fetchAccounts.unprocessed.emptyFactor + let fill = kvp.data.fetchAccounts.processed.fullFactor uSum += fill uSqSum += fill * fill @@ -161,9 +172,9 @@ proc tickerStats*( if not env.isNil: pivotBlock = some(env.stateHeader.blockNumber) stoQuLen = some(env.fetchStorageFull.len + env.fetchStoragePart.len) - accStats = (env.fetchAccounts.unprocessed[0].chunks + - env.fetchAccounts.unprocessed[1].chunks, - env.fetchAccounts.missingNodes.len) + accStats = (env.fetchAccounts.processed.chunks, + env.fetchAccounts.checkNodes.len + + env.fetchAccounts.sickSubTries.len) TickerStats( pivotBlock: pivotBlock, @@ -205,16 +216,30 @@ proc execSnapSyncAction*( if buddy.ctrl.stopped or env.obsolete: return false - # Can only run a single accounts healer instance at a time. This - # instance will clear the batch queue so there is nothing to do for - # another process. - if env.accountsState == HealerIdle: - env.accountsState = HealerRunning - await buddy.healAccounts(env) - env.accountsState = HealerIdle + if not ctx.data.accountsHealing: + # Only start healing if there is some completion level, already. + # + # We check against the global coverage factor, i.e. a measure for how + # much of the total of all accounts have been processed. Even if the + # hexary trie database for the current pivot state root is sparsely + # filled, there is a good chance that it can inherit some unchanged + # sub-trie from an earlier pivor state root download. The healing + # process then works like sort of glue. + if 0 < env.nAccounts: + if healAccountsTrigger <= ctx.data.coveredAccounts.fullFactor: + ctx.data.accountsHealing = true - if buddy.ctrl.stopped or env.obsolete: - return false + if ctx.data.accountsHealing: + # Can only run a single accounts healer instance at a time. This + # instance will clear the batch queue so there is nothing to do for + # another process. + if env.accountsState == HealerIdle: + env.accountsState = HealerRunning + await buddy.healAccounts(env) + env.accountsState = HealerIdle + + if buddy.ctrl.stopped or env.obsolete: + return false # Some additional storage slots might have been popped up await buddy.rangeFetchStorageSlots(env) @@ -228,31 +253,74 @@ proc execSnapSyncAction*( return true -proc saveSnapState*( +proc saveCheckpoint*( env: SnapPivotRef; ## Current pivot environment ctx: SnapCtxRef; ## Some global context ): Result[int,HexaryDbError] = ## Save current sync admin data. On success, the size of the data record ## saved is returned (e.g. for logging.) - if snapAccountsSaveDanglingMax < env.fetchAccounts.missingNodes.len: - return err(TooManyDanglingLinks) + ## + let + fa = env.fetchAccounts + nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len + + if snapAccountsSaveProcessedChunksMax < fa.processed.chunks: + return err(TooManyProcessedChunks) - let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len if snapAccountsSaveStorageSlotsMax < nStoQu: return err(TooManySlotAccounts) - let - rc = ctx.data.snapDb.savePivot( - env.stateHeader, env.nAccounts, env.nSlotLists, - dangling = env.fetchAccounts.missingNodes.mapIt(it.partialPath), - slotAccounts = toSeq(env.fetchStorageFull.nextKeys).mapIt(it.to(NodeKey)) & - toSeq(env.fetchStoragePart.nextKeys).mapIt(it.to(NodeKey)), - coverage = (ctx.data.coveredAccounts.fullFactor * 255).uint8) + ctx.data.snapDb.savePivot SnapDbPivotRegistry( + header: env.stateHeader, + nAccounts: env.nAccounts, + nSlotLists: env.nSlotLists, + processed: toSeq(env.fetchAccounts.processed.increasing) + .mapIt((it.minPt,it.maxPt)), + slotAccounts: (toSeq(env.fetchStorageFull.nextKeys) & + toSeq(env.fetchStoragePart.nextKeys)).mapIt(it.to(NodeKey))) - if rc.isErr: - return err(rc.error) - ok(rc.value) +proc recoverPivotFromCheckpoint*( + env: SnapPivotRef; ## Current pivot environment + ctx: SnapCtxRef; ## Global context (containing save state) + topLevel: bool; ## Full data set on top level only + ) = + ## Recover some pivot variables and global list `coveredAccounts` from + ## checkpoint data. If the argument `toplevel` is set `true`, also the + ## `processed`, `unprocessed`, and the `fetchStorageFull` lists are + ## initialised. + ## + let recov = ctx.data.recovery + if recov.isNil: + return + + env.nAccounts = recov.state.nAccounts + env.nSlotLists = recov.state.nSlotLists + + # Import processed interval + for (minPt,maxPt) in recov.state.processed: + if topLevel: + discard env.fetchAccounts.processed.merge(minPt, maxPt) + env.fetchAccounts.unprocessed.reduce(minPt, maxPt) + discard ctx.data.coveredAccounts.merge(minPt, maxPt) + + # Handle storage slots + if topLevel: + let stateRoot = recov.state.header.stateRoot + for w in recov.state.slotAccounts: + let pt = NodeTagRange.new(w.to(NodeTag),w.to(NodeTag)) + + if 0 < env.fetchAccounts.processed.covered(pt): + # Ignoring slots that have accounts to be downloaded, anyway + let rc = ctx.data.snapDb.getAccountsData(stateRoot, w) + if rc.isErr: + # Oops, how did that account get lost? + discard env.fetchAccounts.processed.reduce pt + env.fetchAccounts.unprocessed.merge pt + elif rc.value.storageRoot != emptyRlpHash: + env.fetchStorageFull.merge AccountSlotsHeader( + accKey: w, + storageRoot: rc.value.storageRoot) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/range_fetch_accounts.nim b/nimbus/sync/snap/worker/range_fetch_accounts.nim index 6af9929ec..6f96a2e06 100644 --- a/nimbus/sync/snap/worker/range_fetch_accounts.nim +++ b/nimbus/sync/snap/worker/range_fetch_accounts.nim @@ -161,23 +161,31 @@ proc accountsRangefetchImpl( # Statistics env.nAccounts.inc(gotAccounts) - # Punch holes into the reproted range from the network if it contains holes. + # Punch holes into the reported range of received accounts from the network + # if it there are gaps (described by dangling nodes.) for w in gaps.innerGaps: discard processed.reduce( w.partialPath.min(NodeKey).to(NodeTag), w.partialPath.max(NodeKey).to(Nodetag)) - # Update dangling nodes list - var delayed: seq[NodeSpecs] - for w in env.fetchAccounts.missingNodes: - if not ctx.data.snapDb.nodeExists(peer, stateRoot, w): - delayed.add w - when extraTraceMessages: - trace logTxt "dangling nodes", peer, pivot, - nCheckNodes=env.fetchAccounts.checkNodes.len, - nMissingNodes=env.fetchAccounts.missingNodes.len, - nUpdatedMissing=delayed.len, nOutsideGaps=gaps.dangling.len - env.fetchAccounts.missingNodes = delayed & gaps.dangling + # Update dangling nodes list unless healing is activated. The problem + # with healing activated is, that a previously missing node that suddenly + # appears will not automatically translate into a full sub-trie. It might + # just be the node itself (which justified the name `sickSubTrie`). + # + # Instead of managing partial sub-tries here, this is delegated to the + # healing module. + if not ctx.data.accountsHealing: + var delayed: seq[NodeSpecs] + for w in env.fetchAccounts.sickSubTries: + if not ctx.data.snapDb.nodeExists(peer, stateRoot, w): + delayed.add w + when extraTraceMessages: + trace logTxt "dangling nodes", peer, pivot, + nCheckNodes=env.fetchAccounts.checkNodes.len, + nSickSubTries=env.fetchAccounts.sickSubTries.len, + nUpdatedMissing=delayed.len, nOutsideGaps=gaps.dangling.len + env.fetchAccounts.sickSubTries = delayed & gaps.dangling # Update book keeping for w in processed.increasing: @@ -195,7 +203,7 @@ proc accountsRangefetchImpl( # unprocessed = buddy.dumpUnprocessed(env) # trace logTxt "request done", peer, pivot, # nCheckNodes=env.fetchAccounts.checkNodes.len, - # nMissingNodes=env.fetchAccounts.missingNodes.len, + # nSickSubTries=env.fetchAccounts.sickSubTries.len, # imported, unprocessed return true diff --git a/nimbus/sync/snap/worker/ticker.nim b/nimbus/sync/snap/worker/ticker.nim index 5594c11c5..83757ca19 100644 --- a/nimbus/sync/snap/worker/ticker.nim +++ b/nimbus/sync/snap/worker/ticker.nim @@ -39,6 +39,7 @@ type TickerRef* = ref object ## Account fetching state that is shared among all peers. nBuddies: int + recovery: bool lastStats: TickerStats statsCb: TickerStatsUpdater logTicker: TimerCallback @@ -145,8 +146,12 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} = if data.nStorageQueue.isSome: nStoQue = $data.nStorageQueue.unsafeGet - info "Snap sync statistics", - up, buddies, pivot, nAcc, accCov, nSto, nStoQue, mem + if t.recovery: + info "Snap sync statistics (recovery)", + up, buddies, pivot, nAcc, accCov, nSto, nStoQue, mem + else: + info "Snap sync statistics", + up, buddies, pivot, nAcc, accCov, nSto, nStoQue, mem t.setLogTicker(Moment.fromNow(tickerLogInterval)) @@ -183,15 +188,29 @@ proc startBuddy*(t: TickerRef) = ## Increment buddies counter and start ticker unless running. if t.nBuddies <= 0: t.nBuddies = 1 - t.start() + if not t.recovery: + t.start() else: t.nBuddies.inc +proc startRecovery*(t: TickerRef) = + ## Ditto for recovery mode + if not t.recovery: + t.recovery = true + if t.nBuddies <= 0: + t.start() + proc stopBuddy*(t: TickerRef) = ## Decrement buddies counter and stop ticker if there are no more registered ## buddies. t.nBuddies.dec - if t.nBuddies <= 0: + if t.nBuddies <= 0 and not t.recovery: + t.stop() + +proc stopRecovery*(t: TickerRef) = + ## Ditto for recovery mode + if t.recovery: + t.recovery = false t.stop() # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker_desc.nim b/nimbus/sync/snap/worker_desc.nim index be6543b5b..3aaa57246 100644 --- a/nimbus/sync/snap/worker_desc.nim +++ b/nimbus/sync/snap/worker_desc.nim @@ -15,7 +15,7 @@ import ../../db/select_backend, ../sync_desc, ./worker/com/com_error, - ./worker/db/[hexary_desc, snapdb_desc], + ./worker/db/[hexary_desc, snapdb_desc, snapdb_pivot], ./worker/ticker, ./range_desc @@ -37,25 +37,24 @@ type ## where the optional `subRange` interval has been replaced by an interval ## range + healing support. accKey*: NodeKey ## Owner account - slots*: SnapTrieRangeBatchRef ## slots to fetch, nil => all slots + slots*: SnapRangeBatchRef ## slots to fetch, nil => all slots inherit*: bool ## mark this trie seen already SnapTodoRanges* = array[2,NodeTagRangeSet] - ## Pair of node range lists. The first entry must be processed first. This - ## allows to coordinate peers working on different state roots to avoid - ## ovelapping accounts as long as they fetch from the first entry. + ## Pair of sets of ``unprocessed`` node ranges that need to be fetched and + ## integrated. The ranges in the first set must be handled with priority. + ## + ## This data structure is used for coordinating peers that run quasi + ## parallel. - SnapTrieRangeBatch* = object + SnapRangeBatchRef* = ref object ## `NodeTag` ranges to fetch, healing support - unprocessed*: SnapTodoRanges ## Range of slots not covered, yet + unprocessed*: SnapTodoRanges ## Range of slots to be fetched + processed*: NodeTagRangeSet ## Nodes definitely processed checkNodes*: seq[Blob] ## Nodes with prob. dangling child links - missingNodes*: seq[NodeSpecs] ## Dangling links to fetch and merge + sickSubTries*: seq[NodeSpecs] ## Top ref for sub-tries to be healed resumeCtx*: TrieNodeStatCtxRef ## State for resuming trie inpection - SnapTrieRangeBatchRef* = ref SnapTrieRangeBatch - ## Referenced object, so it can be made optional for the storage - ## batch list - SnapHealingState* = enum ## State of healing process. The `HealerRunning` state indicates that ## dangling and/or missing nodes have been temprarily removed from the @@ -69,8 +68,9 @@ type stateHeader*: BlockHeader ## Pivot state, containg state root # Accounts download - fetchAccounts*: SnapTrieRangeBatch ## Set of accounts ranges to fetch + fetchAccounts*: SnapRangeBatchRef ## Set of accounts ranges to fetch accountsState*: SnapHealingState ## All accounts have been processed + healThresh*: float ## Start healing when fill factor reached # Storage slots download fetchStorageFull*: SnapSlotsQueue ## Fetch storage trie for these accounts @@ -86,6 +86,11 @@ type ## LRU table, indexed by state root KeyedQueue[Hash256,SnapPivotRef] + SnapRecoveryRef* = ref object + ## Recovery context + state*: SnapDbPivotRegistry ## Saved recovery context state + level*: int ## top level is zero + BuddyData* = object ## Per-worker local descriptor data extension errors*: ComErrorStatsRef ## For error handling @@ -102,6 +107,9 @@ type pivotTable*: SnapPivotTable ## Per state root environment pivotFinderCtx*: RootRef ## Opaque object reference for sub-module coveredAccounts*: NodeTagRangeSet ## Derived from all available accounts + accountsHealing*: bool ## Activates accounts DB healing + recovery*: SnapRecoveryRef ## Current recovery checkpoint/context + noRecovery*: bool ## Ignore recovery checkpoints # Info ticker*: TickerRef ## Ticker, logger @@ -142,7 +150,7 @@ proc init*(q: var SnapTodoRanges) = proc merge*(q: var SnapTodoRanges; iv: NodeTagRange) = - ## Unconditionally merge the node range into the account ranges list + ## Unconditionally merge the node range into the account ranges list. discard q[0].merge(iv) discard q[1].reduce(iv) @@ -187,6 +195,21 @@ proc fetch*(q: var SnapTodoRanges; maxLen: UInt256): Result[NodeTagRange,void] = discard q[0].reduce(iv) ok(iv) + +proc verify*(q: var SnapTodoRanges): bool = + ## Verify consistency, i.e. that the two sets of ranges have no overlap. + if q[0].chunks == 0 or q[1].chunks == 0: + # At least on set is empty + return true + if q[0].total == 0 or q[1].total == 0: + # At least one set is maximal and the other non-empty + return false + let (a,b) = if q[0].chunks < q[1].chunks: (0,1) else: (1,0) + for iv in q[a].increasing: + if 0 < q[b].covered(iv): + return false + true + # ------------------------------------------------------------------------------ # Public helpers: SlotsQueue # ------------------------------------------------------------------------------ @@ -196,7 +219,10 @@ proc merge*(q: var SnapSlotsQueue; kvp: SnapSlotsQueuePair) = let reqKey = kvp.key rc = q.eq(reqKey) - if rc.isOk: + if rc.isErr: + # Append to list + discard q.append(reqKey, kvp.data) + else: # Entry exists already let qData = rc.value if not qData.slots.isNil: @@ -204,23 +230,17 @@ proc merge*(q: var SnapSlotsQueue; kvp: SnapSlotsQueuePair) = if kvp.data.slots.isNil: # Remove restriction for this entry and move it to the right end qData.slots = nil - discard q.lruFetch(reqKey) + discard q.lruFetch reqKey else: # Merge argument intervals into target set for ivSet in kvp.data.slots.unprocessed: for iv in ivSet.increasing: qData.slots.unprocessed.reduce iv - else: - # Only add non-existing entries - if kvp.data.slots.isNil: - # Append full range to the right of the list - discard q.append(reqKey, kvp.data) - else: - # Partial range, add healing support and interval - discard q.unshift(reqKey, kvp.data) proc merge*(q: var SnapSlotsQueue; fetchReq: AccountSlotsHeader) = - ## Append/prepend a slot header record into the batch queue. + ## Append/prepend a slot header record into the batch queue. If there is + ## a range merger, the argument range will be sortred in a way so that it + ## is processed separately with highest priority. let reqKey = fetchReq.storageRoot rc = q.eq(reqKey) @@ -232,26 +252,31 @@ proc merge*(q: var SnapSlotsQueue; fetchReq: AccountSlotsHeader) = if fetchReq.subRange.isNone: # Remove restriction for this entry and move it to the right end qData.slots = nil - discard q.lruFetch(reqKey) + discard q.lruFetch reqKey else: - # Merge argument interval into target set - let iv = fetchReq.subRange.unsafeGet - discard qData.slots.unprocessed[0].reduce(iv) - discard qData.slots.unprocessed[1].merge(iv) - else: - let reqData = SnapSlotsQueueItemRef(accKey: fetchReq.accKey) + # Merge argument interval into target separated from the already + # existing sets (note that this works only for the last set) + for iv in qData.slots.unprocessed[0].increasing: + # Move all to second set + discard qData.slots.unprocessed[1].merge iv + # Clear first set and add argument range + qData.slots.unprocessed[0].clear() + qData.slots.unprocessed.merge fetchReq.subRange.unsafeGet - # Only add non-existing entries - if fetchReq.subRange.isNone: - # Append full range to the right of the list - discard q.append(reqKey, reqData) - else: - # Partial range, add healing support and interval - reqData.slots = SnapTrieRangeBatchRef() - for n in 0 ..< reqData.slots.unprocessed.len: - reqData.slots.unprocessed[n] = NodeTagRangeSet.init() - discard reqData.slots.unprocessed[0].merge(fetchReq.subRange.unsafeGet) - discard q.unshift(reqKey, reqData) + elif fetchReq.subRange.isNone: + # Append full range to the list + discard q.append(reqKey, SnapSlotsQueueItemRef( + accKey: fetchReq.accKey)) + + else: + # Partial range, add healing support and interval + var unprocessed = [NodeTagRangeSet.init(), NodeTagRangeSet.init()] + discard unprocessed[0].merge(fetchReq.subRange.unsafeGet) + discard q.append(reqKey, SnapSlotsQueueItemRef( + accKey: fetchReq.accKey, + slots: SnapRangeBatchRef( + unprocessed: unprocessed, + processed: NodeTagRangeSet.init()))) proc merge*( q: var SnapSlotsQueue; diff --git a/nimbus/sync/sync_sched.nim b/nimbus/sync/sync_sched.nim index b0dd87b81..bfa3fd38a 100644 --- a/nimbus/sync/sync_sched.nim +++ b/nimbus/sync/sync_sched.nim @@ -17,15 +17,18 @@ ## Global set up. This function will be called before any worker peer is ## started. If that function returns `false`, no worker peers will be run. ## +## Also, this function should decide whether the `runDaemon()` job will be +## started next by controlling the `ctx.daemon` flag (default is `false`.) +## ## *runRelease(ctx: CtxRef[S])* ## Global clean up, done with all the worker peers. ## ## *runDaemon(ctx: CtxRef[S]) {.async.}* ## Global background job that will be re-started as long as the variable -## `ctx.daemon` is set `true`. If that job was stopped due to setting -## `ctx.daemon` to `false`, it will be restarted when reset to `true` when -## there is some activity on the `runPool()`, `runSingle()`, or `runMulti()` -## functions. +## `ctx.daemon` is set `true`. If that job was stopped due to re-setting +## `ctx.daemon` to `false`, it will be restarted next after it was reset +## as `true` not before there is some activity on the `runPool()`, +## `runSingle()`, or `runMulti()` functions. ## ## ## *runStart(buddy: BuddyRef[S,W]): bool* @@ -35,10 +38,11 @@ ## Clean up this worker peer. ## ## -## *runPool(buddy: BuddyRef[S,W], last: bool)* +## *runPool(buddy: BuddyRef[S,W], last: bool): bool* ## Once started, the function `runPool()` is called for all worker peers in -## sequence as the body of an iteration. There will be no other worker peer -## functions activated simultaneously. +## sequence as the body of an iteration as long as the function returns +## `false`. There will be no other worker peer functions activated +## simultaneously. ## ## This procedure is started if the global flag `buddy.ctx.poolMode` is set ## `true` (default is `false`.) It is the responsibility of the `runPool()` @@ -48,7 +52,7 @@ ## The argument `last` is set `true` if the last entry is reached. ## ## Note: -## + This function does not run in `async` mode. +## + This function does *not* runs in `async` mode. ## + The flag `buddy.ctx.poolMode` has priority over the flag ## `buddy.ctrl.multiOk` which controls `runSingle()` and `runMulti()`. ## @@ -110,6 +114,7 @@ type singleRunLock: bool ## Some single mode runner is activated monitorLock: bool ## Monitor mode is activated activeMulti: int ## Number of activated runners in multi-mode + shutdown: bool ## Internal shut down flag RunnerBuddyRef[S,W] = ref object ## Per worker peer descriptor @@ -144,7 +149,7 @@ proc hash(peer: Peer): Hash = proc daemonLoop[S,W](dsc: RunnerSyncRef[S,W]) {.async.} = mixin runDaemon - if dsc.ctx.daemon: + if dsc.ctx.daemon and not dsc.shutdown: dsc.daemonRunning = true # Continue until stopped @@ -179,7 +184,7 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} = # Continue until stopped block taskExecLoop: - while worker.ctrl.running: + while worker.ctrl.running and not dsc.shutdown: # Enforce minimum time spend on this loop let startMoment = Moment.now() @@ -199,7 +204,8 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} = var count = dsc.buddies.len for w in dsc.buddies.nextValues: count.dec - worker.runPool(count == 0) + if worker.runPool(count == 0): + break # `true` => stop dsc.monitorLock = false else: @@ -335,7 +341,7 @@ proc initSync*[S,W]( dsc.tickerOk = noisy dsc.buddies.init(dsc.ctx.buddiesMax) -proc startSync*[S,W](dsc: RunnerSyncRef[S,W]; daemon = false): bool = +proc startSync*[S,W](dsc: RunnerSyncRef[S,W]): bool = ## Set up `PeerObserver` handlers and start syncing. mixin runSetup # Initialise sub-systems @@ -350,8 +356,7 @@ proc startSync*[S,W](dsc: RunnerSyncRef[S,W]; daemon = false): bool = po.setProtocol eth dsc.pool.addObserver(dsc, po) - if daemon: - dsc.ctx.daemon = true + if dsc.ctx.daemon: asyncSpawn dsc.daemonLoop() return true @@ -360,7 +365,8 @@ proc stopSync*[S,W](dsc: RunnerSyncRef[S,W]) = mixin runRelease dsc.pool.delObserver(dsc) - # Shut down async services + # Gracefully shut down async services + dsc.shutdown = true for buddy in dsc.buddies.nextValues: buddy.worker.ctrl.stopped = true dsc.ctx.daemon = false diff --git a/tests/test_sync_snap.nim b/tests/test_sync_snap.nim index 6dd9bf2d3..63c2eef95 100644 --- a/tests/test_sync_snap.nim +++ b/tests/test_sync_snap.nim @@ -15,10 +15,11 @@ import std/[algorithm, distros, hashes, math, os, sets, sequtils, strformat, strutils, tables, times], chronicles, - eth/[common, p2p, rlp, trie/db], + eth/[common, p2p, rlp], + eth/trie/[db, nibbles], rocksdb, stint, - stew/[byteutils, results], + stew/[byteutils, interval_set, results], unittest2, ../nimbus/[chain_config, config, genesis], ../nimbus/db/[db_chain, select_backend, storage_types], @@ -26,7 +27,7 @@ import ../nimbus/sync/types, ../nimbus/sync/snap/range_desc, ../nimbus/sync/snap/worker/db/[ - hexary_desc, hexary_error, hexary_inspect, rocky_bulk_load, + hexary_desc, hexary_error, hexary_inspect, hexary_paths, rocky_bulk_load, snapdb_accounts, snapdb_desc, snapdb_pivot, snapdb_storage_slots], ../nimbus/utils/prettify, ./replay/[pp, undump_blocks, undump_accounts, undump_storages], @@ -301,6 +302,7 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) = var desc: SnapDbAccountsRef accKeys: seq[NodeKey] + accBaseTag: NodeTag test &"Snap-proofing {accountsList.len} items for state root ..{root.pp}": let @@ -316,14 +318,13 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) = desc = SnapDbAccountsRef.init(dbBase, root, peer) # Load/accumulate data from several samples (needs some particular sort) - let - lowerBound = accountsList.mapIt(it.base).sortMerge - packed = PackedAccountRange( - accounts: accountsList.mapIt(it.data.accounts).sortMerge, - proof: accountsList.mapIt(it.data.proof).flatten) + accBaseTag = accountsList.mapIt(it.base).sortMerge + let packed = PackedAccountRange( + accounts: accountsList.mapIt(it.data.accounts).sortMerge, + proof: accountsList.mapIt(it.data.proof).flatten) # Merging intervals will produce gaps, so the result is expected OK but # different from `.isImportOk` - check desc.importAccounts(lowerBound, packed, true).isOk + check desc.importAccounts(accBaseTag, packed, true).isOk # check desc.merge(lowerBound, accounts) == OkHexDb desc.assignPrettyKeys() # for debugging, make sure that state root ~ "$0" @@ -406,7 +407,8 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) = # added later (typically these nodes are update `Mutable` nodes.) # # Beware: dumping a large database is not recommended - #noisy.say "***", "database dump\n ", desc.dumpAccDB() + #true.say "***", "database dump\n ", desc.dumpHexaDB() + test &"Storing/retrieving {accKeys.len} items " & "on persistent state root registry": @@ -415,13 +417,18 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) = else: let dbBase = SnapDbRef.init(db.cdb[0]) - dangling = @[@[1u8],@[2u8,3u8],@[4u8,5u8,6u8],@[7u8,8u8,9u8,0u8]] + processed = @[(1.to(NodeTag),2.to(NodeTag)), + (4.to(NodeTag),5.to(NodeTag)), + (6.to(NodeTag),7.to(NodeTag))] slotAccounts = seq[NodeKey].default for n,w in accKeys: check dbBase.savePivot( - BlockHeader( - stateRoot: w.to(Hash256)), n.uint64, n.uint64, - dangling, slotAccounts, 0).isOk + SnapDbPivotRegistry( + header: BlockHeader(stateRoot: w.to(Hash256)), + nAccounts: n.uint64, + nSlotLists: n.uint64, + processed: processed, + slotAccounts: slotAccounts)).isOk # verify latest state root block: let rc = dbBase.recoverPivot() @@ -429,7 +436,7 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) = if rc.isOk: check rc.value.nAccounts == n.uint64 check rc.value.nSlotLists == n.uint64 - check rc.value.dangling == dangling + check rc.value.processed == processed for n,w in accKeys: block: let rc = dbBase.recoverPivot(w) @@ -439,15 +446,19 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) = check rc.value.nSlotLists == n.uint64 # Update record in place check dbBase.savePivot( - BlockHeader( - stateRoot: w.to(Hash256)), n.uint64, 0, @[], @[], 0).isOk + SnapDbPivotRegistry( + header: BlockHeader(stateRoot: w.to(Hash256)), + nAccounts: n.uint64, + nSlotLists: 0, + processed: @[], + slotAccounts: @[])).isOk block: let rc = dbBase.recoverPivot(w) check rc.isOk if rc.isOk: check rc.value.nAccounts == n.uint64 check rc.value.nSlotLists == 0 - check rc.value.dangling == seq[Blob].default + check rc.value.processed == seq[(NodeTag,NodeTag)].default proc storagesRunner( noisy = true; @@ -600,6 +611,7 @@ proc inspectionRunner( rootKey = root.to(NodeKey) dbBase = SnapDbRef.init(db.cdb[2+n]) desc = SnapDbAccountsRef.init(dbBase, root, peer) + for w in accList: check desc.importAccounts(w.base,w.data, persistent=true).isImportOk let rc = desc.inspectAccountsTrie(persistent=true)