diff --git a/nimbus/sync/snap/constants.nim b/nimbus/sync/snap/constants.nim index e6b128777..26ed438e6 100644 --- a/nimbus/sync/snap/constants.nim +++ b/nimbus/sync/snap/constants.nim @@ -11,6 +11,9 @@ {.push raises: [Defect].} const + pivotTableLruEntriesMax* = 50 + ## Max depth of pivot table. On overflow, the oldest one will be removed. + pivotBlockDistanceMin* = 128 ## The minimal depth of two block headers needed to activate a new state ## root pivot. @@ -28,6 +31,10 @@ const ## ## Note that 128 is the magic distance for snapshots used by *Geth*. + pivotBlockDistanceThrottledPivotChangeMin* = 256 + ## Slower pivot change while healing or nearly complete accounts database + ## takes place. + pivotEnvStopChangingIfComplete* = true ## If set `true`, new peers will not change the pivot even if the ## negotiated pivot would be newer. This should be the default. @@ -48,17 +55,17 @@ const snapAccountsSaveProcessedChunksMax* = 1000 ## Recovery data are stored if the processed ranges list contains no more - ## than this many range `chunks`. + ## than this many range *chunks*. ## - ## If there are too many dangling nodes, no data will be saved and restart - ## has to perform from scratch. + ## If the range set is too much fragmented, no data will be saved and + ## restart has to perform from scratch or an earlier checkpoint. snapAccountsSaveStorageSlotsMax* = 10_000 ## Recovery data are stored if the oustanding storage slots to process do ## not amount to more than this many entries. ## ## If there are too many dangling nodes, no data will be saved and restart - ## has to perform from scratch. + ## has to perform from scratch or an earlier checkpoint. snapStorageSlotsFetchMax* = 2 * 1024 @@ -74,6 +81,17 @@ const # -------------- + swapInAccountsCoverageTrigger* = 0.30 + ## Similar to `healAccountsCoverageTrigger` below only for trying to + ## swap in from earlier pivots. + + swapInAccountsPivotsMin* = 2 + ## Require at least this man pivots available before any swap in can + ## take place (must be at least 2.) This value is typically combined + ## with `swapInAccountsCoverageTrigger`. + + # -------------- + healInspectionBatch* = 10_000 ## Number of nodes to inspect in a single batch. In between batches, a ## task/thread switch is allowed. @@ -159,6 +177,7 @@ const ## Set 0 to disable. static: + doAssert 1 < swapInAccountsPivotsMin doAssert healAccountsCoverageTrigger < 1.0 # larger values make no sense doAssert snapStorageSlotsQuPrioThresh < snapAccountsSaveStorageSlotsMax doAssert snapStorageSlotsFetchMax < healAccountsBatchFetchMax diff --git a/nimbus/sync/snap/range_desc.nim b/nimbus/sync/snap/range_desc.nim index 1f08492db..19056c294 100644 --- a/nimbus/sync/snap/range_desc.nim +++ b/nimbus/sync/snap/range_desc.nim @@ -25,10 +25,11 @@ type NodeKey* = distinct ByteArray32 ## Hash key without the hash wrapper (as opposed to `NodeTag` which is a - ## number) + ## number.) NodeTag* = distinct UInt256 - ## Trie leaf item, account hash etc. + ## Trie leaf item, account hash etc. This data type is a representation + ## for a `NodeKey` geared up for arithmetic and comparing keys. NodeTagRange* = Interval[NodeTag,UInt256] ## Interval `[minPt,maxPt]` of` NodeTag` elements, can be managed in an @@ -249,6 +250,13 @@ proc fullFactor*(lrs: NodeTagRangeSet): float = else: 1.0 # number of points in `lrs` is `2^256 + 1` +proc fullFactor*(iv: NodeTagRange): float = + ## Relative covered length of an inetrval, i.e. `#points-covered / 2^256` + if 0 < iv.len: + iv.len.u256.to(float) / (2.0^256) + else: + 1.0 # number of points in `iv` is `2^256 + 1` + # ------------------------------------------------------------------------------ # Public functions: printing & pretty printing # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker.nim b/nimbus/sync/snap/worker.nim index 2a38c8eb7..c4e3f8eac 100644 --- a/nimbus/sync/snap/worker.nim +++ b/nimbus/sync/snap/worker.nim @@ -18,7 +18,8 @@ import ../../utils/prettify, ../misc/best_pivot, ".."/[protocol, sync_desc], - ./worker/[pivot_helper, ticker], + ./worker/[heal_accounts, heal_storage_slots, pivot_helper, + range_fetch_accounts, range_fetch_storage_slots, ticker], ./worker/com/com_error, ./worker/db/[hexary_desc, snapdb_desc, snapdb_pivot], "."/[constants, range_desc, worker_desc] @@ -72,19 +73,21 @@ proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} = return false rc.value - # Cosmetics: allows other processes to log etc. - await sleepAsync(1300.milliseconds) + # Cosmetics: allow other processes (e.g. ticker) to log the current recovery + # state. There is no other intended purpose of this wait state. + await sleepAsync 1100.milliseconds - when extraTraceMessages: - trace "Recovery continued ...", checkpoint, topLevel, - nAccounts=recov.state.nAccounts, nDangling=recov.state.dangling.len + #when extraTraceMessages: + # trace "Recovery continued ...", checkpoint, topLevel, + # nAccounts=recov.state.nAccounts, nDangling=recov.state.dangling.len # Update pivot data from recovery checkpoint env.recoverPivotFromCheckpoint(ctx, topLevel) # Fetch next recovery record if there is any if recov.state.predecessor.isZero: - trace "Recovery done", checkpoint, topLevel + #when extraTraceMessages: + # trace "Recovery done", checkpoint, topLevel return false let rc = ctx.data.snapDb.recoverPivot(recov.state.predecessor) if rc.isErr: @@ -138,6 +141,47 @@ proc updateSinglePivot(buddy: SnapBuddyRef): Future[bool] {.async.} = return true +proc execSnapSyncAction( + env: SnapPivotRef; # Current pivot environment + buddy: SnapBuddyRef; # Worker peer + ) {.async.} = + ## Execute a synchronisation run. + let + ctx = buddy.ctx + + block: + # Clean up storage slots queue first it it becomes too large + let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len + if snapStorageSlotsQuPrioThresh < nStoQu: + await buddy.rangeFetchStorageSlots(env) + if buddy.ctrl.stopped or env.archived: + return + + if not env.pivotAccountsComplete(): + await buddy.rangeFetchAccounts(env) + if buddy.ctrl.stopped or env.archived: + return + + await buddy.rangeFetchStorageSlots(env) + if buddy.ctrl.stopped or env.archived: + return + + if env.pivotAccountsHealingOk(ctx): + await buddy.healAccounts(env) + if buddy.ctrl.stopped or env.archived: + return + + # Some additional storage slots might have been popped up + await buddy.rangeFetchStorageSlots(env) + if buddy.ctrl.stopped or env.archived: + return + + # Don't bother with storage slots healing before accounts healing takes + # place. This saves communication bandwidth. The pivot might change soon, + # anyway. + if env.pivotAccountsHealingOk(ctx): + await buddy.healStorageSlots(env) + # ------------------------------------------------------------------------------ # Public start/stop and admin functions # ------------------------------------------------------------------------------ @@ -248,8 +292,8 @@ proc runPool*(buddy: SnapBuddyRef, last: bool): bool = proc runMulti*(buddy: SnapBuddyRef) {.async.} = ## Enabled while - ## * `buddy.ctrl.multiOk` is `true` - ## * `buddy.ctrl.poolMode` is `false` + ## * `buddy.ctx.multiOk` is `true` + ## * `buddy.ctx.poolMode` is `false` ## let ctx = buddy.ctx @@ -280,29 +324,41 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} = # point in keeping some older space consuming state data any longer. ctx.data.pivotTable.beforeTopMostlyClean() + when extraTraceMessages: + block: + let + nCheckNodes = env.fetchAccounts.checkNodes.len + nSickSubTries = env.fetchAccounts.sickSubTries.len + nAccounts = env.nAccounts + nSlotLists = env.nSlotLists + processed = env.fetchAccounts.processed.fullFactor.toPC(2) + nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len + accHealThresh = env.healThresh.toPC(2) + trace "Multi sync runner", peer, pivot, nAccounts, nSlotLists, processed, + nStoQu, accHealThresh, nCheckNodes, nSickSubTries + # This one is the syncing work horse which downloads the database await env.execSnapSyncAction(buddy) - if env.obsolete: + if env.archived: return # pivot has changed - # Save state so sync can be partially resumed at next start up - let - nCheckNodes = env.fetchAccounts.checkNodes.len - nSickSubTries = env.fetchAccounts.sickSubTries.len - nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len - processed = env.fetchAccounts.processed.fullFactor.toPC(2) block: - let rc = env.saveCheckpoint(ctx) + # Save state so sync can be partially resumed at next start up + let + nAccounts = env.nAccounts + nSlotLists = env.nSlotLists + processed = env.fetchAccounts.processed.fullFactor.toPC(2) + nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len + accHealThresh = env.healThresh.toPC(2) + rc = env.saveCheckpoint(ctx) if rc.isErr: - error "Failed to save recovery checkpoint", peer, pivot, - nAccounts=env.nAccounts, nSlotLists=env.nSlotLists, - processed, nStoQu, error=rc.error + error "Failed to save recovery checkpoint", peer, pivot, nAccounts, + nSlotLists, processed, nStoQu, error=rc.error else: when extraTraceMessages: - trace "Saved recovery checkpoint", peer, pivot, - nAccounts=env.nAccounts, nSlotLists=env.nSlotLists, - processed, nStoQu, blobSize=rc.value + trace "Saved recovery checkpoint", peer, pivot, nAccounts, nSlotLists, + processed, nStoQu, blobSize=rc.value, accHealThresh if buddy.ctrl.stopped: return # peer worker has gone diff --git a/nimbus/sync/snap/worker/heal_accounts.nim b/nimbus/sync/snap/worker/heal_accounts.nim index 9de7e4771..a0e6c25f2 100644 --- a/nimbus/sync/snap/worker/heal_accounts.nim +++ b/nimbus/sync/snap/worker/heal_accounts.nim @@ -117,7 +117,7 @@ import ".."/[constants, range_desc, worker_desc], ./com/[com_error, get_trie_nodes], ./db/[hexary_desc, hexary_error, snapdb_accounts], - ./sub_tries_helper + "."/[sub_tries_helper, swap_in] {.push raises: [Defect].} @@ -148,10 +148,61 @@ proc healingCtx( "nCheckNodes=" & $env.fetchAccounts.checkNodes.len & "," & "nSickSubTries=" & $env.fetchAccounts.sickSubTries.len & "}" +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +template discardRlpError(info: static[string]; code: untyped) = + try: + code + except RlpError as e: + discard + +template noExceptionOops(info: static[string]; code: untyped) = + try: + code + except Defect as e: + raise e + except Exception as e: + raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg + # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ +proc reorgHealingState( + buddy: SnapBuddyRef; + env: SnapPivotRef; + ) = + let + ctx = buddy.ctx + rootKey = env.stateHeader.stateRoot.to(NodeKey) + getFn = ctx.data.snapDb.getAccountFn + + nCheckNodes0 = env.fetchAccounts.checkNodes.len + nSickSubTries0 = env.fetchAccounts.sickSubTries.len + nProcessed0 = env.fetchAccounts.processed.fullfactor.toPC(3) + + # Reclassify nodes into existing/allocated and dangling ones + if buddy.swapInAccounts(env) == 0: + # Nothing to swap in, so force reclassification + noExceptionOops("reorgHealingState"): + var delayed: seq[NodeSpecs] + for node in env.fetchAccounts.sickSubTries: + if node.nodeKey.ByteArray32.getFn().len == 0: + delayed.add node # still subject to healing + else: + env.fetchAccounts.checkNodes.add node + env.fetchAccounts.sickSubTries = delayed + + when extraTraceMessages: + let + nCheckNodes1 = env.fetchAccounts.checkNodes.len + nSickSubTries1 = env.fetchAccounts.sickSubTries.len + trace logTxt "sick nodes reclassified", nCheckNodes0, nSickSubTries0, + nCheckNodes1, nSickSubTries1, nProcessed0 + + proc updateMissingNodesList( buddy: SnapBuddyRef; env: SnapPivotRef; @@ -234,15 +285,14 @@ proc getMissingNodesFromNetwork( # allows other processes to access the full `sickSubTries` list. env.fetchAccounts.sickSubTries = env.fetchAccounts.sickSubTries & fetchNodes - let error = rc.error - if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): - discard - when extraTraceMessages: + let + error = rc.error + ok = await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors) + when extraTraceMessages: + if ok: trace logTxt "fetch nodes error => stop", peer, ctx=buddy.healingCtx(env), error - else: - discard - when extraTraceMessages: + else: trace logTxt "fetch nodes error", peer, ctx=buddy.healingCtx(env), error @@ -253,23 +303,31 @@ proc kvAccountLeaf( buddy: SnapBuddyRef; node: NodeSpecs; env: SnapPivotRef; - ): (bool,NodeKey,Account) - {.gcsafe, raises: [Defect,RlpError]} = + ): (bool,NodeKey,Account) = ## Re-read leaf node from persistent database (if any) let peer = buddy.peer + var + nNibbles = -1 - nodeRlp = rlpFromBytes node.data - (_,prefix) = hexPrefixDecode node.partialPath - (_,segment) = hexPrefixDecode nodeRlp.listElem(0).toBytes - nibbles = prefix & segment - if nibbles.len == 64: - let data = nodeRlp.listElem(1).toBytes - return (true, nibbles.getBytes.convertTo(NodeKey), rlp.decode(data,Account)) + discardRlpError("kvAccountLeaf"): + let + nodeRlp = rlpFromBytes node.data + prefix = (hexPrefixDecode node.partialPath)[1] + segment = (hexPrefixDecode nodeRlp.listElem(0).toBytes)[1] + nibbles = prefix & segment + + nNibbles = nibbles.len + if nNibbles == 64: + let + data = nodeRlp.listElem(1).toBytes + nodeKey = nibbles.getBytes.convertTo(NodeKey) + accData = rlp.decode(data,Account) + return (true, nodeKey, accData) when extraTraceMessages: - trace logTxt "non-leaf node path", peer, - ctx=buddy.healingCtx(env), nNibbles=nibbles.len + trace logTxt "non-leaf node path or corrupt data", peer, + ctx=buddy.healingCtx(env), nNibbles proc registerAccountLeaf( @@ -313,11 +371,7 @@ proc accountsHealingImpl( peer = buddy.peer # Update for changes since last visit - try: - db.getAccountFn.subTriesNodesReclassify( - env.stateHeader.stateRoot.to(NodeKey), env.fetchAccounts) - except Exception as e: - raiseAssert "Not possible @ accountsHealingImpl(" & $e.name & "):" & e.msg + buddy.reorgHealingState(env) if env.fetchAccounts.sickSubTries.len == 0: # Traverse the hexary trie for more missing nodes. This call is expensive. diff --git a/nimbus/sync/snap/worker/pivot_helper.nim b/nimbus/sync/snap/worker/pivot_helper.nim index 184cf7995..d0dceea3a 100644 --- a/nimbus/sync/snap/worker/pivot_helper.nim +++ b/nimbus/sync/snap/worker/pivot_helper.nim @@ -13,12 +13,11 @@ import bearssl/rand, chronos, eth/[common, trie/trie_defs], - stew/[interval_set, keyed_queue], + stew/[interval_set, keyed_queue, sorted_set], ../../sync_desc, ".."/[constants, range_desc, worker_desc], ./db/[hexary_error, snapdb_accounts, snapdb_pivot], - "."/[heal_accounts, heal_storage_slots, - range_fetch_accounts, range_fetch_storage_slots, ticker] + ./ticker {.push raises: [Defect].} @@ -26,6 +25,10 @@ const extraAsserts = false or true ## Enable some asserts +proc pivotAccountsHealingOk*(env: SnapPivotRef;ctx: SnapCtxRef): bool {.gcsafe.} +proc pivotAccountsComplete*(env: SnapPivotRef): bool {.gcsafe.} +proc pivotMothball*(env: SnapPivotRef) {.gcsafe.} + # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ @@ -40,9 +43,14 @@ proc init( batch.unprocessed.init() batch.processed = NodeTagRangeSet.init() - # Initialise partial path the envelope of which covers the full range of - # account keys `0..high(NodeTag)`. This will trigger healing on the full - # range all possible keys. + # Once applicable when the hexary trie is non-empty, healing is started on + # the full range of all possible accounts. So the partial path batch list + # is initialised with the empty partial path encoded as `@[0]` which refers + # to the first (typically `Branch`) node. The envelope of `@[0]` covers the + # maximum range of accounts. + # + # Note that `@[]` incidentally has the same effect as `@[0]` although it + # is formally no partial path. batch.checkNodes.add NodeSpecs( partialPath: @[0.byte], nodeKey: stateRoot.to(NodeKey)) @@ -84,12 +92,7 @@ proc beforeTopMostlyClean*(pivotTable: var SnapPivotTable) = ## usable any more after cleaning but might be useful as historic record. let rc = pivotTable.beforeLastValue if rc.isOk: - let env = rc.value - env.fetchStorageFull.clear() - env.fetchStoragePart.clear() - env.fetchAccounts.checkNodes.setLen(0) - env.fetchAccounts.sickSubTries.setLen(0) - env.obsolete = true + rc.value.pivotMothball proc topNumber*(pivotTable: var SnapPivotTable): BlockNumber = @@ -100,10 +103,10 @@ proc topNumber*(pivotTable: var SnapPivotTable): BlockNumber = proc update*( - pivotTable: var SnapPivotTable; ## Pivot table - header: BlockHeader; ## Header to generate new pivot from - ctx: SnapCtxRef; ## Some global context - reverse = false; ## Update from bottom (e.g. for recovery) + pivotTable: var SnapPivotTable; # Pivot table + header: BlockHeader; # Header to generate new pivot from + ctx: SnapCtxRef; # Some global context + reverse = false; # Update from bottom (e.g. for recovery) ) = ## Activate environment for state root implied by `header` argument. This ## function appends a new environment unless there was any not far enough @@ -112,6 +115,14 @@ proc update*( ## Note that the pivot table is assumed to be sorted by the block numbers of ## the pivot header. ## + # Calculate minimum block distance. + let minBlockDistance = block: + let rc = pivotTable.lastValue + if rc.isOk and rc.value.pivotAccountsHealingOk(ctx): + pivotBlockDistanceThrottledPivotChangeMin + else: + pivotBlockDistanceMin + # Check whether the new header follows minimum depth requirement. This is # where the queue is assumed to have increasing block numbers. if reverse or @@ -122,6 +133,7 @@ proc update*( stateHeader: header, fetchAccounts: SnapRangeBatchRef()) env.fetchAccounts.init(header.stateRoot, ctx) + env.storageAccounts.init() var topEnv = env # Append per-state root environment to LRU queue @@ -139,7 +151,8 @@ proc update*( topEnv = pivotTable.lastValue.value else: - discard pivotTable.lruAppend(header.stateRoot, env, ctx.buddiesMax) + discard pivotTable.lruAppend( + header.stateRoot, env, pivotTableLruEntriesMax) # Update healing threshold let @@ -149,8 +162,8 @@ proc update*( proc tickerStats*( - pivotTable: var SnapPivotTable; ## Pivot table - ctx: SnapCtxRef; ## Some global context + pivotTable: var SnapPivotTable; # Pivot table + ctx: SnapCtxRef; # Some global context ): TickerStatsUpdater = ## This function returns a function of type `TickerStatsUpdater` that prints ## out pivot table statitics. The returned fuction is supposed to drive @@ -181,7 +194,6 @@ proc tickerStats*( let sLen = kvp.data.nSlotLists.float sSum += sLen sSqSum += sLen * sLen - let env = ctx.data.pivotTable.lastValue.get(otherwise = nil) accCoverage = ctx.data.coveredAccounts.fullFactor @@ -210,16 +222,41 @@ proc tickerStats*( # Public functions: particular pivot # ------------------------------------------------------------------------------ +proc pivotMothball*(env: SnapPivotRef) = + ## Clean up most of this argument `env` pivot record and mark it `archived`. + ## Note that archived pivots will be checked for swapping in already known + ## accounts and storage slots. + env.fetchAccounts.checkNodes.setLen(0) + env.fetchAccounts.sickSubTries.setLen(0) + env.fetchAccounts.unprocessed.init() + + # Simplify storage slots queues by resolving partial slots into full list + for kvp in env.fetchStoragePart.nextPairs: + discard env.fetchStorageFull.append( + kvp.key, SnapSlotsQueueItemRef(acckey: kvp.data.accKey)) + env.fetchStoragePart.clear() + + # Provide index into `fetchStorageFull` + env.storageAccounts.clear() + for kvp in env.fetchStorageFull.nextPairs: + let rc = env.storageAccounts.insert(kvp.data.accKey.to(NodeTag)) + # Note that `rc.isErr` should not exist as accKey => storageRoot + if rc.isOk: + rc.value.data = kvp.key + + # Finally, mark that node `archived` + env.archived = true + + proc pivotAccountsComplete*( - env: SnapPivotRef; ## Current pivot environment + env: SnapPivotRef; # Current pivot environment ): bool = ## Returns `true` if accounts are fully available for this this pivot. env.fetchAccounts.processed.isFull - proc pivotAccountsHealingOk*( - env: SnapPivotRef; ## Current pivot environment - ctx: SnapCtxRef; ## Some global context + env: SnapPivotRef; # Current pivot environment + ctx: SnapCtxRef; # Some global context ): bool = ## Returns `true` if accounts healing is enabled for this pivot. ## @@ -238,47 +275,9 @@ proc pivotAccountsHealingOk*( return true -proc execSnapSyncAction*( - env: SnapPivotRef; ## Current pivot environment - buddy: SnapBuddyRef; ## Worker peer - ) {.async.} = - ## Execute a synchronisation run. - let - ctx = buddy.ctx - - block: - # Clean up storage slots queue first it it becomes too large - let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len - if snapStorageSlotsQuPrioThresh < nStoQu: - await buddy.rangeFetchStorageSlots(env) - if buddy.ctrl.stopped or env.obsolete: - return - - if not env.pivotAccountsComplete(): - await buddy.rangeFetchAccounts(env) - if buddy.ctrl.stopped or env.obsolete: - return - - await buddy.rangeFetchStorageSlots(env) - if buddy.ctrl.stopped or env.obsolete: - return - - if env.pivotAccountsHealingOk(ctx): - await buddy.healAccounts(env) - if buddy.ctrl.stopped or env.obsolete: - return - - # Some additional storage slots might have been popped up - await buddy.rangeFetchStorageSlots(env) - if buddy.ctrl.stopped or env.obsolete: - return - - await buddy.healStorageSlots(env) - - proc saveCheckpoint*( - env: SnapPivotRef; ## Current pivot environment - ctx: SnapCtxRef; ## Some global context + env: SnapPivotRef; # Current pivot environment + ctx: SnapCtxRef; # Some global context ): Result[int,HexaryError] = ## Save current sync admin data. On success, the size of the data record ## saved is returned (e.g. for logging.) @@ -304,9 +303,9 @@ proc saveCheckpoint*( proc recoverPivotFromCheckpoint*( - env: SnapPivotRef; ## Current pivot environment - ctx: SnapCtxRef; ## Global context (containing save state) - topLevel: bool; ## Full data set on top level only + env: SnapPivotRef; # Current pivot environment + ctx: SnapCtxRef; # Global context (containing save state) + topLevel: bool; # Full data set on top level only ) = ## Recover some pivot variables and global list `coveredAccounts` from ## checkpoint data. If the argument `toplevel` is set `true`, also the @@ -323,27 +322,34 @@ proc recoverPivotFromCheckpoint*( # Import processed interval for (minPt,maxPt) in recov.state.processed: if topLevel: - discard env.fetchAccounts.processed.merge(minPt, maxPt) env.fetchAccounts.unprocessed.reduce(minPt, maxPt) + discard env.fetchAccounts.processed.merge(minPt, maxPt) discard ctx.data.coveredAccounts.merge(minPt, maxPt) # Handle storage slots - if topLevel: - let stateRoot = recov.state.header.stateRoot - for w in recov.state.slotAccounts: - let pt = NodeTagRange.new(w.to(NodeTag),w.to(NodeTag)) + let stateRoot = recov.state.header.stateRoot + for w in recov.state.slotAccounts: + let pt = NodeTagRange.new(w.to(NodeTag),w.to(NodeTag)) - if 0 < env.fetchAccounts.processed.covered(pt): - # Ignoring slots that have accounts to be downloaded, anyway - let rc = ctx.data.snapDb.getAccountsData(stateRoot, w) - if rc.isErr: - # Oops, how did that account get lost? - discard env.fetchAccounts.processed.reduce pt - env.fetchAccounts.unprocessed.merge pt - elif rc.value.storageRoot != emptyRlpHash: - env.fetchStorageFull.merge AccountSlotsHeader( - accKey: w, - storageRoot: rc.value.storageRoot) + if 0 < env.fetchAccounts.processed.covered(pt): + # Ignoring slots that have accounts to be downloaded, anyway + let rc = ctx.data.snapDb.getAccountsData(stateRoot, w) + if rc.isErr: + # Oops, how did that account get lost? + discard env.fetchAccounts.processed.reduce pt + env.fetchAccounts.unprocessed.merge pt + elif rc.value.storageRoot != emptyRlpHash: + env.fetchStorageFull.merge AccountSlotsHeader( + accKey: w, + storageRoot: rc.value.storageRoot) + + # Handle mothballed pivots for swapping in (see `pivotMothball()`) + if not topLevel: + for kvp in env.fetchStorageFull.nextPairs: + let rc = env.storageAccounts.insert(kvp.data.accKey.to(NodeTag)) + if rc.isOk: + rc.value.data = kvp.key + env.archived = true # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/range_fetch_accounts.nim b/nimbus/sync/snap/worker/range_fetch_accounts.nim index 0d866a20a..c546c26d5 100644 --- a/nimbus/sync/snap/worker/range_fetch_accounts.nim +++ b/nimbus/sync/snap/worker/range_fetch_accounts.nim @@ -39,7 +39,8 @@ import ../../sync_desc, ".."/[constants, range_desc, worker_desc], ./com/[com_error, get_account_range], - ./db/[hexary_envelope, snapdb_accounts] + ./db/[hexary_envelope, snapdb_accounts], + "."/[pivot_helper, swap_in] {.push raises: [Defect].} @@ -127,7 +128,7 @@ proc accountsRangefetchImpl( buddy.data.errors.resetComError() let - gotAccounts = dd.data.accounts.len + gotAccounts = dd.data.accounts.len # comprises `gotStorage` gotStorage = dd.withStorage.len #when extraTraceMessages: @@ -179,10 +180,29 @@ proc accountsRangefetchImpl( # Register accounts with storage slots on the storage TODO list. env.fetchStorageFull.merge dd.withStorage + var nSwapInLaps = 0 + if env.archived: + # Current pivot just became outdated, rebuild storage slots index (if any) + if 0 < gotStorage: + trace logTxt "mothballing", peer, pivot, gotStorage + env.pivotMothball + + elif swapInAccountsCoverageTrigger <= ctx.data.coveredAccounts.fullFactor: + # Swap in from other pivots + when extraTraceMessages: + trace logTxt "before swap in", peer, pivot, gotAccounts, gotStorage, + coveredHere=covered.fullFactor.toPC(2), + processed=fa.processed.fullFactor.toPC(2), + nProcessedChunks=fa.processed.chunks.uint.toSI + + if swapInAccountsPivotsMin <= ctx.data.pivotTable.len: + nSwapInLaps = buddy.swapInAccounts(env) + when extraTraceMessages: - trace logTxt "request done", peer, pivot, - covered=covered.fullFactor.toPC(2), - processed=fa.processed.fullFactor.toPC(2) + trace logTxt "request done", peer, pivot, gotAccounts, gotStorage, + nSwapInLaps, coveredHere=covered.fullFactor.toPC(2), + processed=fa.processed.fullFactor.toPC(2), + nProcessedChunks=fa.processed.chunks.uint.toSI return true @@ -210,7 +230,7 @@ proc rangeFetchAccounts*( var nFetchAccounts = 0 # for logging while not fa.processed.isFull() and buddy.ctrl.running and - not env.obsolete: + not env.archived: nFetchAccounts.inc if not await buddy.accountsRangefetchImpl(env): break diff --git a/nimbus/sync/snap/worker/range_fetch_storage_slots.nim b/nimbus/sync/snap/worker/range_fetch_storage_slots.nim index 840c94133..15dcb24c5 100644 --- a/nimbus/sync/snap/worker/range_fetch_storage_slots.nim +++ b/nimbus/sync/snap/worker/range_fetch_storage_slots.nim @@ -333,7 +333,7 @@ proc rangeFetchStorageSlots*( var fullRangeItemsleft = 1 + (fullRangeLen-1) div snapStorageSlotsFetchMax while 0 < fullRangeItemsleft and buddy.ctrl.running and - not env.obsolete: + not env.archived: # Pull out the next request list from the queue let req = buddy.getNextSlotItemsFull(env) if req.len == 0: @@ -344,7 +344,7 @@ proc rangeFetchStorageSlots*( var partialRangeItemsLeft = env.fetchStoragePart.len while 0 < partialRangeItemsLeft and buddy.ctrl.running and - not env.obsolete: + not env.archived: # Pull out the next request list from the queue let req = buddy.getNextSlotItemPartial(env) if req.len == 0: diff --git a/nimbus/sync/snap/worker/sub_tries_helper.nim b/nimbus/sync/snap/worker/sub_tries_helper.nim index af133cc85..f604a1eb7 100644 --- a/nimbus/sync/snap/worker/sub_tries_helper.nim +++ b/nimbus/sync/snap/worker/sub_tries_helper.nim @@ -15,7 +15,7 @@ import eth/[common, p2p], stew/interval_set, ".."/[constants, range_desc, worker_desc], - ./db/[hexary_desc, hexary_error, hexary_envelope, hexary_inspect] + ./db/[hexary_desc, hexary_error, hexary_inspect] {.push raises: [Defect].} @@ -53,22 +53,6 @@ proc doInspect( ok(stats) - -proc getOverlapping( - batch: SnapRangeBatchRef; ## Healing data support - iv: NodeTagRange; ## Reference interval - ): Result[NodeTagRange,void] = - ## Find overlapping interval in `batch` - block: - let rc = batch.processed.ge iv.minPt - if rc.isOk and rc.value.minPt <= iv.maxPt: - return ok(rc.value) - block: - let rc = batch.processed.le iv.maxPt - if rc.isOk and iv.minPt <= rc.value.maxPt: - return ok(rc.value) - err() - # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ @@ -141,87 +125,6 @@ proc subTriesFromPartialPaths*( batch.lockTriePerusal = false return err(error) - -proc subTriesNodesReclassify*( - getFn: HexaryGetFn; ## Abstract database access - rootKey: NodeKey; ## Start node into hexary trie - batch: SnapRangeBatchRef; ## Healing data support - ) {.gcsafe, raises: [Defect,KeyError].} = - ## Check whether previously missing nodes from the `sickSubTries` list have - ## been magically added to the database since it was checked last time. These - ## nodes will me moved to `checkNodes` for further processing. Also, some - ## full sub-tries might have been added which can be checked against - ## the `processed` range set. - - # Move `sickSubTries` entries that have now an exisiting node to the - # list of partial paths to be re-checked. - block: - var delayed: seq[NodeSpecs] - for w in batch.sickSubTries: - if 0 < getFn(w.nodeKey.ByteArray32).len: - batch.checkNodes.add w - else: - delayed.add w - batch.sickSubTries = delayed - - # Remove `checkNodes` entries with complete known sub-tries. - var - doneWith: seq[NodeSpecs] # loop will not recurse on that list - count = 0 # for logging only - - # `While` loop will terminate with processed paths in `doneWith`. - block: - var delayed: seq[NodeSpecs] - while 0 < batch.checkNodes.len: - - when extraTraceMessages: - trace logTxt "reclassify", count, - nCheckNodes=batch.checkNodes.len - - for w in batch.checkNodes: - let - iv = w.hexaryEnvelope - nCov = batch.processed.covered iv - - if iv.len <= nCov: - # Fully processed envelope, no need to keep `w` any longer - when extraTraceMessages: - trace logTxt "reclassify discard", count, partialPath=w, - nDelayed=delayed.len - continue - - if 0 < nCov: - # Partially processed range, fetch an overlapping interval and - # remove that from the envelope of `w`. - try: - let paths = block: - let rc = w.partialPath.hexaryEnvelopeDecompose( - rootKey, batch.getOverlapping(iv).value, getFn) - if rc.isErr: - continue - rc.value - delayed &= paths - when extraTraceMessages: - trace logTxt "reclassify dismantled", count, partialPath=w, - nPaths=paths.len, nDelayed=delayed.len - continue - except RlpError: - discard - - # Not processed at all. So keep `w` but there is no need to look - # at it again in the next lap. - doneWith.add w - - # Prepare for next lap - batch.checkNodes.swap delayed - delayed.setLen(0) - - batch.checkNodes = doneWith.hexaryEnvelopeUniq - - when extraTraceMessages: - trace logTxt "reclassify finalise", count, - nDoneWith=doneWith.len, nCheckNodes=batch.checkNodes.len - # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/swap_in.nim b/nimbus/sync/snap/worker/swap_in.nim new file mode 100644 index 000000000..a2cfc9b76 --- /dev/null +++ b/nimbus/sync/snap/worker/swap_in.nim @@ -0,0 +1,392 @@ +# Nimbus +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +## Swap in already allocated sub-tries +## =================================== +## +## This module imports sub-tries from other pivots into the current. It does +## so by detecting the top of an existing sub-trie in the current pivot and +## searches other pivots for the part of the sub-trie that is already +## available there. So it can be marked accomplished on the current pivot. +## +## Note that the terminology hinges on *account pivots* but is implemented in +## a more general way where +## +## * the current pivot is of type `SnapRangeBatchRef` +## +## * other pivots are represented by an iterator of type `SwapInPivots` +## +## So the algorithm can be transferred to other that accounting pivot +## situations. +## +## Algorithm +## --------- +## +## * On the *current pivot*, use the `processed` ranges of accounts to find all +## the nodes the envelopes of which are disjunct to the `processed` ranges +## (see module `hexary_envelope` for supporting functions.) +## +## * Select all the non-dangling/existing nodes disjunct envelopes from the +## previous step. +## +## * For all the selected non-dangling nodes from the previous step, check +## which ones are present in other pivots. This means that for a given +## existing node in the current pivot its *partial path* can be applied +## to the *state root* key of another pivot ending up at the same node key. +## +## The portion of `processed` ranges on the other pivot that intersects with +## the envelope of the node has been downloaded already. It is equally +## applicable to the current pivot as it applies to the same sub-trie. So +## the intersection of `processed` with the node envelope can be copied to +## to the `processed` ranges of the current pivot. +## +## * Rinse and repeat. +## +import + std/[sequtils, strutils], + chronicles, + eth/[common, p2p], + stew/[byteutils, interval_set, keyed_queue, sorted_set], + ../../../utils/prettify, + ".."/[range_desc, worker_desc], + ./db/[hexary_desc, hexary_error, hexary_envelope, + hexary_paths, snapdb_accounts] + +{.push raises: [Defect].} + +logScope: + topics = "snap-swapin" + +type + SwapInPivot = object + ## Subset of `SnapPivotRef` with relevant parts, only + rootKey: NodeKey ## Storage slots & accounts + processed: NodeTagRangeSet ## Storage slots & accounts + pivot: SnapPivotRef ## Accounts only + +const + extraTraceMessages = false or true + ## Enabled additional logging noise + +when extraTraceMessages: + import std/math, ../../types + +# ------------------------------------------------------------------------------ +# Private logging helpers +# ------------------------------------------------------------------------------ + +template logTxt(info: static[string]): static[string] = + "Swap-in helper " & info + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +proc decompose( + node: NodeSpecs; # Contains hex encoded partial path + iv: NodeTagRange; # Proofed range of leaf paths + rootKey: NodeKey; # Start node into hexary trie + getFn: HexaryGetFn; # Abstract database access + ): Result[seq[NodeSpecs],void] = + ## Decompose, succeed only if there is a change + var error: HexaryError + + try: + let rc = node.partialPath.hexaryEnvelopeDecompose(rootKey, iv, getFn) + if rc.isErr: + error = rc.error + elif rc.value.len != 1 or rc.value[0].nodeKey != node.nodeKey: + return ok(rc.value) + else: + return err() + except RlpError: + error = RlpEncoding + + when extraTraceMessages: + trace logTxt "envelope decomposition failed", + node=node.partialPath.toHex, error + + err() + + +proc existsInTrie( + node: NodeSpecs; # Probe node to test to exist + rootKey: NodeKey; # Start node into hexary trie + getFn: HexaryGetFn; # Abstract database access + ): bool = + ## Check whether this node exists on the sub-trie starting at ` rootKey` + var error: HexaryError + + try: + let rc = node.partialPath.hexaryPathNodeKey(rootKey, getFn) + if rc.isOk: + return rc.value == node.nodeKey + except RlpError: + error = RlpEncoding + + when extraTraceMessages: + trace logTxt "check nodes failed", + partialPath=node.partialPath.toHex, error + + false + + +template noKeyErrorOops(info: static[string]; code: untyped) = + try: + code + except KeyError as e: + raiseAssert "Not possible (" & info & "): " & e.msg + +template noExceptionOops(info: static[string]; code: untyped) = + try: + code + except Defect as e: + raise e + except Exception as e: + raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc decomposeCheckNodes( + pivot: SnapRangeBatchRef; # Healing data support + rootKey: NodeKey; # Start node into hexary trie + getFn: HexaryGetFn; # Abstract database access + ): Result[seq[NodeSpecs],void] = + ## Decompose the `checkNodes` list of the argument `pivot` relative to the + ## set `processed` of processed leaf node ranges. + ## + ## The function fails if there wan no change to the `checkNodes` list. + var + delayed: seq[NodeSpecs] + didSomething = 0 + + # Remove `checkNodes` entries with known complete sub-tries. + for node in pivot.checkNodes: + var paths: seq[NodeSpecs] + + # For a Partially processed range, fetch overlapping intervals and + # sort of remove them from the envelope of `w`. + for touched in pivot.processed.hexaryEnvelopeTouchedBy(node).increasing: + let rc = node.decompose(touched, rootKey, getFn) + if rc.isOk: + paths &= rc.value + didSomething.inc + when extraTraceMessages: + trace logTxt "checkNodes decompose", nDelayed=delayed.len, + node=node.partialPath.toHex, nPaths=paths.len, + newPaths=rc.value.mapIt(it.partialPath.toHex).join(",") + # End inner for() + + delayed &= paths + # End outer for() + + if 0 < didSomething: + noKeyErrorOops("subTriesCheckNodesDecompose"): + # Remove duplicates in resulting path list + return ok(delayed.hexaryEnvelopeUniq) + + err() + + +proc otherProcessedRanges( + node: NodeSpecs; # Top node of portential sub-trie + otherPivots: seq[SwapInPivot]; # Other pivots list + rootKey: NodeKey; # Start node into hexary trie + getFn: HexaryGetFn; # Abstract database access + ): seq[NodeTagRangeSet] = + ## Collect already processed ranges from other pivots intersecting with the + ## envelope of the argument `node`. The list of other pivots is represented + ## by the argument iterator `otherPivots`. + let envelope = node.hexaryEnvelope + var count = 0 # logging & debugging + + noExceptionOops("otherProcessedRanges"): + # For the current `node` select all hexary sub-tries that contain the same + # node `node.nodeKey` for the partial path `node.partianPath`. + for rp in otherPivots.items: + # Check whether the node is shared + let haveNode = node.existsInTrie(rp.rootKey, getFn) + + var subCount = 0 # logging & debugging + count.inc # logging & debugging + + result.add NodeTagRangeSet.init() + + if not haveNode: + trace logTxt "history loop", count, node=node.partialPath.toHex, + processed=rp.processed.fullFactor.toPC(3), haveNode + + if haveNode: + when extraTraceMessages: + trace logTxt "history loop => sub start", count, + nTouched=rp.processed.hexaryEnvelopeTouchedBy(node).chunks, haveNode + + # Import already processed part of the envelope of `node` into the + # `batch.processed` set of ranges. + for iv in rp.processed.hexaryEnvelopeTouchedBy(node).increasing: + let segment = (envelope * iv).value + discard result[^1].merge segment + + subCount.inc # dlogging & ebugging + when extraTraceMessages: + trace logTxt "history loop => sub", count, subCount, + touchedLen=segment.fullFactor.toPC(3) + +# ------------------------------------------------------------------------------ +# Private functions, swap-in functionality +# ------------------------------------------------------------------------------ + +proc swapIn*( + pivot: SnapRangeBatchRef; # Healing state for target hexary trie + otherPivots: seq[SwapInPivot]; # Other pivots list + rootKey: NodeKey; # Start node into target hexary trie + getFn: HexaryGetFn; # Abstract database access + loopMax = 20; # Prevent from looping too often + ): (int,seq[NodeTagRangeSet]) = + ## Collect processed already ranges from argument `otherPivots` and register + ## it onto the argument `pivot`. This function recognises and imports + ## directly accessible sub-tries where the top-level node exists. + var + lapCount = 0 + notDoneYet = true + swappedIn = newSeq[NodeTagRangeSet](otherPivots.len) + + # Initialise return value + for n in 0 ..< swappedIn.len: + swappedIn[n] = NodeTagRangeSet.init() + + while notDoneYet and lapCount < loopMax: + var + merged = 0.u256 + nCheckNodesBefore = 0 # debugging + + # Decompose `checkNodes` into sub-tries disjunct from `processed` + let toBeReclassified = block: + let rc = pivot.decomposeCheckNodes(rootKey, getFn) + if rc.isErr: + return (lapCount,swappedIn) # nothing to do + rc.value + + lapCount.inc + notDoneYet = false + + # Reclassify nodes into existing/allocated and dangling ones + noKeyErrorOops("swapIn"): + var + checkNodes: seq[NodeSpecs] + sickNodes: seq[NodeSpecs] + for node in toBeReclassified: + # Check whether previously missing nodes from the `sickSubTries` list + # have been magically added to the database since it was checked last + # time. These nodes will me moved to `checkNodes` for further + # processing. + if node.nodeKey.ByteArray32.getFn().len == 0: + sickNodes.add node # probably subject to healing + else: + let iv = node.hexaryEnvelope + if pivot.processed.covered(iv) < iv.len: + checkNodes.add node # may be swapped in + pivot.checkNodes = checkNodes.hexaryEnvelopeUniq + pivot.sickSubTries = sickNodes.hexaryEnvelopeUniq + + nCheckNodesBefore = pivot.checkNodes.len # logging & debugging + + # Swap in node ranges from other pivots + for node in pivot.checkNodes: + for n,rangeSet in node.otherProcessedRanges(otherPivots,rootKey,getFn): + for iv in rangeSet.increasing: + discard swappedIn[n].merge iv # imported range / other pivot + merged += pivot.processed.merge iv # import this range + pivot.unprocessed.reduce iv # no need to fetch it again + notDoneYet = 0 < merged # loop control + + # Remove fully covered nodes + block: + var checkNodes: seq[NodeSpecs] + for node in toBeReclassified: + let iv = node.hexaryEnvelope + if pivot.processed.covered(iv) < iv.len: + checkNodes.add node # may be swapped in + pivot.checkNodes = checkNodes.hexaryEnvelopeUniq + + when extraTraceMessages: + let mergedFactor = merged.to(float) / (2.0^256) + trace logTxt "inherited ranges", nCheckNodesBefore, + nCheckNodes=pivot.checkNodes.len, merged=mergedFactor.toPC(3) + + # End while() + + (lapCount,swappedIn) + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc swapInAccounts*( + buddy: SnapBuddyRef; # Worker peer + env: SnapPivotRef; # Current pivot environment + loopMax = 20; # Prevent from looping too often + ): int = + ## Variant of `swapIn()` for the particular case of accounts database pivots. + let + ctx = buddy.ctx + rootKey = env.stateHeader.stateRoot.to(NodeKey) + getFn = ctx.data.snapDb.getAccountFn + + others = toSeq(ctx.data.pivotTable.nextPairs) + + # Swap in from mothballed pifots different from the current one + .filterIt(it.data.archived and it.key.to(NodeKey) != rootKey) + + # Extract relevant parts + .mapIt(SwapInPivot( + rootKey: it.key.to(NodeKey), + processed: it.data.fetchAccounts.processed, + pivot: it.data)) + var + nLaps: int + swappedIn: seq[NodeTagRangeSet] + + noExceptionOops("swapInAccounts"): + (nLaps,swappedIn) = env.fetchAccounts.swapIn(others,rootKey,getFn,loopMax) + + noKeyErrorOops("swapInAccounts"): + # Update storage slots + doAssert swappedIn.len == others.len + for n in 0 ..< others.len: + + when extraTraceMessages: + trace logTxt "post-processing storage slots", inx=n, maxInx=others.len, + changes=swappedIn[n].fullFactor.toPC(3), chunks=swappedIn[n].chunks + + # Revisit all imported account key ranges + for iv in swappedIn[n].increasing: + + # The `storageAccounts` list contains indices for storage slots, mapping + # account keys => storage root + var rc = others[n].pivot.storageAccounts.ge(iv.minPt) + while rc.isOk and rc.value.key <= iv.maxPt: + + # Fetch storage slots specs from `fetchStorageFull` list + let stRoot = rc.value.data + if others[n].pivot.fetchStorageFull.hasKey(stRoot): + let accKey = others[n].pivot.fetchStorageFull[stRoot].accKey + discard env.fetchStorageFull.append( + stRoot, SnapSlotsQueueItemRef(acckey: accKey)) + + rc = others[n].pivot.storageAccounts.gt(rc.value.key) + + nLaps + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker_desc.nim b/nimbus/sync/snap/worker_desc.nim index ead55f38e..08eda903c 100644 --- a/nimbus/sync/snap/worker_desc.nim +++ b/nimbus/sync/snap/worker_desc.nim @@ -11,7 +11,7 @@ import std/hashes, eth/[common, p2p], - stew/[interval_set, keyed_queue], + stew/[interval_set, keyed_queue, sorted_set], ../../db/select_backend, ../sync_desc, ./worker/com/com_error, @@ -22,6 +22,9 @@ import {.push raises: [Defect].} type + SnapAccountsList* = SortedSet[NodeTag,Hash256] + ## Sorted pair of `(account,state-root)` entries + SnapSlotsQueue* = KeyedQueue[Hash256,SnapSlotsQueueItemRef] ## Handles list of storage slots data for fetch indexed by storage root. ## @@ -72,7 +75,10 @@ type # Info nAccounts*: uint64 ## Imported # of accounts nSlotLists*: uint64 ## Imported # of account storage tries - obsolete*: bool ## Not latest pivot, anymore + + # Mothballing, ready to be swapped into newer pivot record + storageAccounts*: SnapAccountsList ## Accounts with missing stortage slots + archived*: bool ## Not latest pivot, anymore SnapPivotTable* = KeyedQueue[Hash256,SnapPivotRef] ## LRU table, indexed by state root