From cc2c888a639f8f825cfdb758a2ba2e83e6593b3a Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Mon, 12 Dec 2022 22:00:24 +0000 Subject: [PATCH] Snap sync swap in other pivots (#1363) * Provide index to reconstruct missing storage slots why; Pivots will be changed anymore once they are officially archived. The account of the archived pivots are ready to be swapped into the active pivot. This leaves open how to treat storage slots not fetched yet. Solution: when mothballing, an `account->storage-root` index is compiled that can be used when swapping in accounts. * Implement swap-in from earlier pivots details; When most accounts are covered by the current and previous pivot sessions, swapping inthe accounts and storage slots (i.e. registering account ranges done) from earlier pivots takes place if there is a common sub-trie. * Throttle pivot change when healing state has bean reached why: There is a hope to complete the current pivot, so pivot update can be throttled. This is achieved by setting another minimum block number distance for the pivot headers. This feature is still experimental --- nimbus/sync/snap/constants.nim | 27 +- nimbus/sync/snap/range_desc.nim | 12 +- nimbus/sync/snap/worker.nim | 102 ++++- nimbus/sync/snap/worker/heal_accounts.nim | 102 +++-- nimbus/sync/snap/worker/pivot_helper.nim | 172 ++++---- .../sync/snap/worker/range_fetch_accounts.nim | 32 +- .../snap/worker/range_fetch_storage_slots.nim | 4 +- nimbus/sync/snap/worker/sub_tries_helper.nim | 99 +---- nimbus/sync/snap/worker/swap_in.nim | 392 ++++++++++++++++++ nimbus/sync/snap/worker_desc.nim | 10 +- 10 files changed, 708 insertions(+), 244 deletions(-) create mode 100644 nimbus/sync/snap/worker/swap_in.nim diff --git a/nimbus/sync/snap/constants.nim b/nimbus/sync/snap/constants.nim index e6b128777..26ed438e6 100644 --- a/nimbus/sync/snap/constants.nim +++ b/nimbus/sync/snap/constants.nim @@ -11,6 +11,9 @@ {.push raises: [Defect].} const + pivotTableLruEntriesMax* = 50 + ## Max depth of pivot table. On overflow, the oldest one will be removed. + pivotBlockDistanceMin* = 128 ## The minimal depth of two block headers needed to activate a new state ## root pivot. @@ -28,6 +31,10 @@ const ## ## Note that 128 is the magic distance for snapshots used by *Geth*. + pivotBlockDistanceThrottledPivotChangeMin* = 256 + ## Slower pivot change while healing or nearly complete accounts database + ## takes place. + pivotEnvStopChangingIfComplete* = true ## If set `true`, new peers will not change the pivot even if the ## negotiated pivot would be newer. This should be the default. @@ -48,17 +55,17 @@ const snapAccountsSaveProcessedChunksMax* = 1000 ## Recovery data are stored if the processed ranges list contains no more - ## than this many range `chunks`. + ## than this many range *chunks*. ## - ## If there are too many dangling nodes, no data will be saved and restart - ## has to perform from scratch. + ## If the range set is too much fragmented, no data will be saved and + ## restart has to perform from scratch or an earlier checkpoint. snapAccountsSaveStorageSlotsMax* = 10_000 ## Recovery data are stored if the oustanding storage slots to process do ## not amount to more than this many entries. ## ## If there are too many dangling nodes, no data will be saved and restart - ## has to perform from scratch. + ## has to perform from scratch or an earlier checkpoint. snapStorageSlotsFetchMax* = 2 * 1024 @@ -74,6 +81,17 @@ const # -------------- + swapInAccountsCoverageTrigger* = 0.30 + ## Similar to `healAccountsCoverageTrigger` below only for trying to + ## swap in from earlier pivots. + + swapInAccountsPivotsMin* = 2 + ## Require at least this man pivots available before any swap in can + ## take place (must be at least 2.) This value is typically combined + ## with `swapInAccountsCoverageTrigger`. + + # -------------- + healInspectionBatch* = 10_000 ## Number of nodes to inspect in a single batch. In between batches, a ## task/thread switch is allowed. @@ -159,6 +177,7 @@ const ## Set 0 to disable. static: + doAssert 1 < swapInAccountsPivotsMin doAssert healAccountsCoverageTrigger < 1.0 # larger values make no sense doAssert snapStorageSlotsQuPrioThresh < snapAccountsSaveStorageSlotsMax doAssert snapStorageSlotsFetchMax < healAccountsBatchFetchMax diff --git a/nimbus/sync/snap/range_desc.nim b/nimbus/sync/snap/range_desc.nim index 1f08492db..19056c294 100644 --- a/nimbus/sync/snap/range_desc.nim +++ b/nimbus/sync/snap/range_desc.nim @@ -25,10 +25,11 @@ type NodeKey* = distinct ByteArray32 ## Hash key without the hash wrapper (as opposed to `NodeTag` which is a - ## number) + ## number.) NodeTag* = distinct UInt256 - ## Trie leaf item, account hash etc. + ## Trie leaf item, account hash etc. This data type is a representation + ## for a `NodeKey` geared up for arithmetic and comparing keys. NodeTagRange* = Interval[NodeTag,UInt256] ## Interval `[minPt,maxPt]` of` NodeTag` elements, can be managed in an @@ -249,6 +250,13 @@ proc fullFactor*(lrs: NodeTagRangeSet): float = else: 1.0 # number of points in `lrs` is `2^256 + 1` +proc fullFactor*(iv: NodeTagRange): float = + ## Relative covered length of an inetrval, i.e. `#points-covered / 2^256` + if 0 < iv.len: + iv.len.u256.to(float) / (2.0^256) + else: + 1.0 # number of points in `iv` is `2^256 + 1` + # ------------------------------------------------------------------------------ # Public functions: printing & pretty printing # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker.nim b/nimbus/sync/snap/worker.nim index 2a38c8eb7..c4e3f8eac 100644 --- a/nimbus/sync/snap/worker.nim +++ b/nimbus/sync/snap/worker.nim @@ -18,7 +18,8 @@ import ../../utils/prettify, ../misc/best_pivot, ".."/[protocol, sync_desc], - ./worker/[pivot_helper, ticker], + ./worker/[heal_accounts, heal_storage_slots, pivot_helper, + range_fetch_accounts, range_fetch_storage_slots, ticker], ./worker/com/com_error, ./worker/db/[hexary_desc, snapdb_desc, snapdb_pivot], "."/[constants, range_desc, worker_desc] @@ -72,19 +73,21 @@ proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} = return false rc.value - # Cosmetics: allows other processes to log etc. - await sleepAsync(1300.milliseconds) + # Cosmetics: allow other processes (e.g. ticker) to log the current recovery + # state. There is no other intended purpose of this wait state. + await sleepAsync 1100.milliseconds - when extraTraceMessages: - trace "Recovery continued ...", checkpoint, topLevel, - nAccounts=recov.state.nAccounts, nDangling=recov.state.dangling.len + #when extraTraceMessages: + # trace "Recovery continued ...", checkpoint, topLevel, + # nAccounts=recov.state.nAccounts, nDangling=recov.state.dangling.len # Update pivot data from recovery checkpoint env.recoverPivotFromCheckpoint(ctx, topLevel) # Fetch next recovery record if there is any if recov.state.predecessor.isZero: - trace "Recovery done", checkpoint, topLevel + #when extraTraceMessages: + # trace "Recovery done", checkpoint, topLevel return false let rc = ctx.data.snapDb.recoverPivot(recov.state.predecessor) if rc.isErr: @@ -138,6 +141,47 @@ proc updateSinglePivot(buddy: SnapBuddyRef): Future[bool] {.async.} = return true +proc execSnapSyncAction( + env: SnapPivotRef; # Current pivot environment + buddy: SnapBuddyRef; # Worker peer + ) {.async.} = + ## Execute a synchronisation run. + let + ctx = buddy.ctx + + block: + # Clean up storage slots queue first it it becomes too large + let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len + if snapStorageSlotsQuPrioThresh < nStoQu: + await buddy.rangeFetchStorageSlots(env) + if buddy.ctrl.stopped or env.archived: + return + + if not env.pivotAccountsComplete(): + await buddy.rangeFetchAccounts(env) + if buddy.ctrl.stopped or env.archived: + return + + await buddy.rangeFetchStorageSlots(env) + if buddy.ctrl.stopped or env.archived: + return + + if env.pivotAccountsHealingOk(ctx): + await buddy.healAccounts(env) + if buddy.ctrl.stopped or env.archived: + return + + # Some additional storage slots might have been popped up + await buddy.rangeFetchStorageSlots(env) + if buddy.ctrl.stopped or env.archived: + return + + # Don't bother with storage slots healing before accounts healing takes + # place. This saves communication bandwidth. The pivot might change soon, + # anyway. + if env.pivotAccountsHealingOk(ctx): + await buddy.healStorageSlots(env) + # ------------------------------------------------------------------------------ # Public start/stop and admin functions # ------------------------------------------------------------------------------ @@ -248,8 +292,8 @@ proc runPool*(buddy: SnapBuddyRef, last: bool): bool = proc runMulti*(buddy: SnapBuddyRef) {.async.} = ## Enabled while - ## * `buddy.ctrl.multiOk` is `true` - ## * `buddy.ctrl.poolMode` is `false` + ## * `buddy.ctx.multiOk` is `true` + ## * `buddy.ctx.poolMode` is `false` ## let ctx = buddy.ctx @@ -280,29 +324,41 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} = # point in keeping some older space consuming state data any longer. ctx.data.pivotTable.beforeTopMostlyClean() + when extraTraceMessages: + block: + let + nCheckNodes = env.fetchAccounts.checkNodes.len + nSickSubTries = env.fetchAccounts.sickSubTries.len + nAccounts = env.nAccounts + nSlotLists = env.nSlotLists + processed = env.fetchAccounts.processed.fullFactor.toPC(2) + nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len + accHealThresh = env.healThresh.toPC(2) + trace "Multi sync runner", peer, pivot, nAccounts, nSlotLists, processed, + nStoQu, accHealThresh, nCheckNodes, nSickSubTries + # This one is the syncing work horse which downloads the database await env.execSnapSyncAction(buddy) - if env.obsolete: + if env.archived: return # pivot has changed - # Save state so sync can be partially resumed at next start up - let - nCheckNodes = env.fetchAccounts.checkNodes.len - nSickSubTries = env.fetchAccounts.sickSubTries.len - nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len - processed = env.fetchAccounts.processed.fullFactor.toPC(2) block: - let rc = env.saveCheckpoint(ctx) + # Save state so sync can be partially resumed at next start up + let + nAccounts = env.nAccounts + nSlotLists = env.nSlotLists + processed = env.fetchAccounts.processed.fullFactor.toPC(2) + nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len + accHealThresh = env.healThresh.toPC(2) + rc = env.saveCheckpoint(ctx) if rc.isErr: - error "Failed to save recovery checkpoint", peer, pivot, - nAccounts=env.nAccounts, nSlotLists=env.nSlotLists, - processed, nStoQu, error=rc.error + error "Failed to save recovery checkpoint", peer, pivot, nAccounts, + nSlotLists, processed, nStoQu, error=rc.error else: when extraTraceMessages: - trace "Saved recovery checkpoint", peer, pivot, - nAccounts=env.nAccounts, nSlotLists=env.nSlotLists, - processed, nStoQu, blobSize=rc.value + trace "Saved recovery checkpoint", peer, pivot, nAccounts, nSlotLists, + processed, nStoQu, blobSize=rc.value, accHealThresh if buddy.ctrl.stopped: return # peer worker has gone diff --git a/nimbus/sync/snap/worker/heal_accounts.nim b/nimbus/sync/snap/worker/heal_accounts.nim index 9de7e4771..a0e6c25f2 100644 --- a/nimbus/sync/snap/worker/heal_accounts.nim +++ b/nimbus/sync/snap/worker/heal_accounts.nim @@ -117,7 +117,7 @@ import ".."/[constants, range_desc, worker_desc], ./com/[com_error, get_trie_nodes], ./db/[hexary_desc, hexary_error, snapdb_accounts], - ./sub_tries_helper + "."/[sub_tries_helper, swap_in] {.push raises: [Defect].} @@ -148,10 +148,61 @@ proc healingCtx( "nCheckNodes=" & $env.fetchAccounts.checkNodes.len & "," & "nSickSubTries=" & $env.fetchAccounts.sickSubTries.len & "}" +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +template discardRlpError(info: static[string]; code: untyped) = + try: + code + except RlpError as e: + discard + +template noExceptionOops(info: static[string]; code: untyped) = + try: + code + except Defect as e: + raise e + except Exception as e: + raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg + # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ +proc reorgHealingState( + buddy: SnapBuddyRef; + env: SnapPivotRef; + ) = + let + ctx = buddy.ctx + rootKey = env.stateHeader.stateRoot.to(NodeKey) + getFn = ctx.data.snapDb.getAccountFn + + nCheckNodes0 = env.fetchAccounts.checkNodes.len + nSickSubTries0 = env.fetchAccounts.sickSubTries.len + nProcessed0 = env.fetchAccounts.processed.fullfactor.toPC(3) + + # Reclassify nodes into existing/allocated and dangling ones + if buddy.swapInAccounts(env) == 0: + # Nothing to swap in, so force reclassification + noExceptionOops("reorgHealingState"): + var delayed: seq[NodeSpecs] + for node in env.fetchAccounts.sickSubTries: + if node.nodeKey.ByteArray32.getFn().len == 0: + delayed.add node # still subject to healing + else: + env.fetchAccounts.checkNodes.add node + env.fetchAccounts.sickSubTries = delayed + + when extraTraceMessages: + let + nCheckNodes1 = env.fetchAccounts.checkNodes.len + nSickSubTries1 = env.fetchAccounts.sickSubTries.len + trace logTxt "sick nodes reclassified", nCheckNodes0, nSickSubTries0, + nCheckNodes1, nSickSubTries1, nProcessed0 + + proc updateMissingNodesList( buddy: SnapBuddyRef; env: SnapPivotRef; @@ -234,15 +285,14 @@ proc getMissingNodesFromNetwork( # allows other processes to access the full `sickSubTries` list. env.fetchAccounts.sickSubTries = env.fetchAccounts.sickSubTries & fetchNodes - let error = rc.error - if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): - discard - when extraTraceMessages: + let + error = rc.error + ok = await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors) + when extraTraceMessages: + if ok: trace logTxt "fetch nodes error => stop", peer, ctx=buddy.healingCtx(env), error - else: - discard - when extraTraceMessages: + else: trace logTxt "fetch nodes error", peer, ctx=buddy.healingCtx(env), error @@ -253,23 +303,31 @@ proc kvAccountLeaf( buddy: SnapBuddyRef; node: NodeSpecs; env: SnapPivotRef; - ): (bool,NodeKey,Account) - {.gcsafe, raises: [Defect,RlpError]} = + ): (bool,NodeKey,Account) = ## Re-read leaf node from persistent database (if any) let peer = buddy.peer + var + nNibbles = -1 - nodeRlp = rlpFromBytes node.data - (_,prefix) = hexPrefixDecode node.partialPath - (_,segment) = hexPrefixDecode nodeRlp.listElem(0).toBytes - nibbles = prefix & segment - if nibbles.len == 64: - let data = nodeRlp.listElem(1).toBytes - return (true, nibbles.getBytes.convertTo(NodeKey), rlp.decode(data,Account)) + discardRlpError("kvAccountLeaf"): + let + nodeRlp = rlpFromBytes node.data + prefix = (hexPrefixDecode node.partialPath)[1] + segment = (hexPrefixDecode nodeRlp.listElem(0).toBytes)[1] + nibbles = prefix & segment + + nNibbles = nibbles.len + if nNibbles == 64: + let + data = nodeRlp.listElem(1).toBytes + nodeKey = nibbles.getBytes.convertTo(NodeKey) + accData = rlp.decode(data,Account) + return (true, nodeKey, accData) when extraTraceMessages: - trace logTxt "non-leaf node path", peer, - ctx=buddy.healingCtx(env), nNibbles=nibbles.len + trace logTxt "non-leaf node path or corrupt data", peer, + ctx=buddy.healingCtx(env), nNibbles proc registerAccountLeaf( @@ -313,11 +371,7 @@ proc accountsHealingImpl( peer = buddy.peer # Update for changes since last visit - try: - db.getAccountFn.subTriesNodesReclassify( - env.stateHeader.stateRoot.to(NodeKey), env.fetchAccounts) - except Exception as e: - raiseAssert "Not possible @ accountsHealingImpl(" & $e.name & "):" & e.msg + buddy.reorgHealingState(env) if env.fetchAccounts.sickSubTries.len == 0: # Traverse the hexary trie for more missing nodes. This call is expensive. diff --git a/nimbus/sync/snap/worker/pivot_helper.nim b/nimbus/sync/snap/worker/pivot_helper.nim index 184cf7995..d0dceea3a 100644 --- a/nimbus/sync/snap/worker/pivot_helper.nim +++ b/nimbus/sync/snap/worker/pivot_helper.nim @@ -13,12 +13,11 @@ import bearssl/rand, chronos, eth/[common, trie/trie_defs], - stew/[interval_set, keyed_queue], + stew/[interval_set, keyed_queue, sorted_set], ../../sync_desc, ".."/[constants, range_desc, worker_desc], ./db/[hexary_error, snapdb_accounts, snapdb_pivot], - "."/[heal_accounts, heal_storage_slots, - range_fetch_accounts, range_fetch_storage_slots, ticker] + ./ticker {.push raises: [Defect].} @@ -26,6 +25,10 @@ const extraAsserts = false or true ## Enable some asserts +proc pivotAccountsHealingOk*(env: SnapPivotRef;ctx: SnapCtxRef): bool {.gcsafe.} +proc pivotAccountsComplete*(env: SnapPivotRef): bool {.gcsafe.} +proc pivotMothball*(env: SnapPivotRef) {.gcsafe.} + # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ @@ -40,9 +43,14 @@ proc init( batch.unprocessed.init() batch.processed = NodeTagRangeSet.init() - # Initialise partial path the envelope of which covers the full range of - # account keys `0..high(NodeTag)`. This will trigger healing on the full - # range all possible keys. + # Once applicable when the hexary trie is non-empty, healing is started on + # the full range of all possible accounts. So the partial path batch list + # is initialised with the empty partial path encoded as `@[0]` which refers + # to the first (typically `Branch`) node. The envelope of `@[0]` covers the + # maximum range of accounts. + # + # Note that `@[]` incidentally has the same effect as `@[0]` although it + # is formally no partial path. batch.checkNodes.add NodeSpecs( partialPath: @[0.byte], nodeKey: stateRoot.to(NodeKey)) @@ -84,12 +92,7 @@ proc beforeTopMostlyClean*(pivotTable: var SnapPivotTable) = ## usable any more after cleaning but might be useful as historic record. let rc = pivotTable.beforeLastValue if rc.isOk: - let env = rc.value - env.fetchStorageFull.clear() - env.fetchStoragePart.clear() - env.fetchAccounts.checkNodes.setLen(0) - env.fetchAccounts.sickSubTries.setLen(0) - env.obsolete = true + rc.value.pivotMothball proc topNumber*(pivotTable: var SnapPivotTable): BlockNumber = @@ -100,10 +103,10 @@ proc topNumber*(pivotTable: var SnapPivotTable): BlockNumber = proc update*( - pivotTable: var SnapPivotTable; ## Pivot table - header: BlockHeader; ## Header to generate new pivot from - ctx: SnapCtxRef; ## Some global context - reverse = false; ## Update from bottom (e.g. for recovery) + pivotTable: var SnapPivotTable; # Pivot table + header: BlockHeader; # Header to generate new pivot from + ctx: SnapCtxRef; # Some global context + reverse = false; # Update from bottom (e.g. for recovery) ) = ## Activate environment for state root implied by `header` argument. This ## function appends a new environment unless there was any not far enough @@ -112,6 +115,14 @@ proc update*( ## Note that the pivot table is assumed to be sorted by the block numbers of ## the pivot header. ## + # Calculate minimum block distance. + let minBlockDistance = block: + let rc = pivotTable.lastValue + if rc.isOk and rc.value.pivotAccountsHealingOk(ctx): + pivotBlockDistanceThrottledPivotChangeMin + else: + pivotBlockDistanceMin + # Check whether the new header follows minimum depth requirement. This is # where the queue is assumed to have increasing block numbers. if reverse or @@ -122,6 +133,7 @@ proc update*( stateHeader: header, fetchAccounts: SnapRangeBatchRef()) env.fetchAccounts.init(header.stateRoot, ctx) + env.storageAccounts.init() var topEnv = env # Append per-state root environment to LRU queue @@ -139,7 +151,8 @@ proc update*( topEnv = pivotTable.lastValue.value else: - discard pivotTable.lruAppend(header.stateRoot, env, ctx.buddiesMax) + discard pivotTable.lruAppend( + header.stateRoot, env, pivotTableLruEntriesMax) # Update healing threshold let @@ -149,8 +162,8 @@ proc update*( proc tickerStats*( - pivotTable: var SnapPivotTable; ## Pivot table - ctx: SnapCtxRef; ## Some global context + pivotTable: var SnapPivotTable; # Pivot table + ctx: SnapCtxRef; # Some global context ): TickerStatsUpdater = ## This function returns a function of type `TickerStatsUpdater` that prints ## out pivot table statitics. The returned fuction is supposed to drive @@ -181,7 +194,6 @@ proc tickerStats*( let sLen = kvp.data.nSlotLists.float sSum += sLen sSqSum += sLen * sLen - let env = ctx.data.pivotTable.lastValue.get(otherwise = nil) accCoverage = ctx.data.coveredAccounts.fullFactor @@ -210,16 +222,41 @@ proc tickerStats*( # Public functions: particular pivot # ------------------------------------------------------------------------------ +proc pivotMothball*(env: SnapPivotRef) = + ## Clean up most of this argument `env` pivot record and mark it `archived`. + ## Note that archived pivots will be checked for swapping in already known + ## accounts and storage slots. + env.fetchAccounts.checkNodes.setLen(0) + env.fetchAccounts.sickSubTries.setLen(0) + env.fetchAccounts.unprocessed.init() + + # Simplify storage slots queues by resolving partial slots into full list + for kvp in env.fetchStoragePart.nextPairs: + discard env.fetchStorageFull.append( + kvp.key, SnapSlotsQueueItemRef(acckey: kvp.data.accKey)) + env.fetchStoragePart.clear() + + # Provide index into `fetchStorageFull` + env.storageAccounts.clear() + for kvp in env.fetchStorageFull.nextPairs: + let rc = env.storageAccounts.insert(kvp.data.accKey.to(NodeTag)) + # Note that `rc.isErr` should not exist as accKey => storageRoot + if rc.isOk: + rc.value.data = kvp.key + + # Finally, mark that node `archived` + env.archived = true + + proc pivotAccountsComplete*( - env: SnapPivotRef; ## Current pivot environment + env: SnapPivotRef; # Current pivot environment ): bool = ## Returns `true` if accounts are fully available for this this pivot. env.fetchAccounts.processed.isFull - proc pivotAccountsHealingOk*( - env: SnapPivotRef; ## Current pivot environment - ctx: SnapCtxRef; ## Some global context + env: SnapPivotRef; # Current pivot environment + ctx: SnapCtxRef; # Some global context ): bool = ## Returns `true` if accounts healing is enabled for this pivot. ## @@ -238,47 +275,9 @@ proc pivotAccountsHealingOk*( return true -proc execSnapSyncAction*( - env: SnapPivotRef; ## Current pivot environment - buddy: SnapBuddyRef; ## Worker peer - ) {.async.} = - ## Execute a synchronisation run. - let - ctx = buddy.ctx - - block: - # Clean up storage slots queue first it it becomes too large - let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len - if snapStorageSlotsQuPrioThresh < nStoQu: - await buddy.rangeFetchStorageSlots(env) - if buddy.ctrl.stopped or env.obsolete: - return - - if not env.pivotAccountsComplete(): - await buddy.rangeFetchAccounts(env) - if buddy.ctrl.stopped or env.obsolete: - return - - await buddy.rangeFetchStorageSlots(env) - if buddy.ctrl.stopped or env.obsolete: - return - - if env.pivotAccountsHealingOk(ctx): - await buddy.healAccounts(env) - if buddy.ctrl.stopped or env.obsolete: - return - - # Some additional storage slots might have been popped up - await buddy.rangeFetchStorageSlots(env) - if buddy.ctrl.stopped or env.obsolete: - return - - await buddy.healStorageSlots(env) - - proc saveCheckpoint*( - env: SnapPivotRef; ## Current pivot environment - ctx: SnapCtxRef; ## Some global context + env: SnapPivotRef; # Current pivot environment + ctx: SnapCtxRef; # Some global context ): Result[int,HexaryError] = ## Save current sync admin data. On success, the size of the data record ## saved is returned (e.g. for logging.) @@ -304,9 +303,9 @@ proc saveCheckpoint*( proc recoverPivotFromCheckpoint*( - env: SnapPivotRef; ## Current pivot environment - ctx: SnapCtxRef; ## Global context (containing save state) - topLevel: bool; ## Full data set on top level only + env: SnapPivotRef; # Current pivot environment + ctx: SnapCtxRef; # Global context (containing save state) + topLevel: bool; # Full data set on top level only ) = ## Recover some pivot variables and global list `coveredAccounts` from ## checkpoint data. If the argument `toplevel` is set `true`, also the @@ -323,27 +322,34 @@ proc recoverPivotFromCheckpoint*( # Import processed interval for (minPt,maxPt) in recov.state.processed: if topLevel: - discard env.fetchAccounts.processed.merge(minPt, maxPt) env.fetchAccounts.unprocessed.reduce(minPt, maxPt) + discard env.fetchAccounts.processed.merge(minPt, maxPt) discard ctx.data.coveredAccounts.merge(minPt, maxPt) # Handle storage slots - if topLevel: - let stateRoot = recov.state.header.stateRoot - for w in recov.state.slotAccounts: - let pt = NodeTagRange.new(w.to(NodeTag),w.to(NodeTag)) + let stateRoot = recov.state.header.stateRoot + for w in recov.state.slotAccounts: + let pt = NodeTagRange.new(w.to(NodeTag),w.to(NodeTag)) - if 0 < env.fetchAccounts.processed.covered(pt): - # Ignoring slots that have accounts to be downloaded, anyway - let rc = ctx.data.snapDb.getAccountsData(stateRoot, w) - if rc.isErr: - # Oops, how did that account get lost? - discard env.fetchAccounts.processed.reduce pt - env.fetchAccounts.unprocessed.merge pt - elif rc.value.storageRoot != emptyRlpHash: - env.fetchStorageFull.merge AccountSlotsHeader( - accKey: w, - storageRoot: rc.value.storageRoot) + if 0 < env.fetchAccounts.processed.covered(pt): + # Ignoring slots that have accounts to be downloaded, anyway + let rc = ctx.data.snapDb.getAccountsData(stateRoot, w) + if rc.isErr: + # Oops, how did that account get lost? + discard env.fetchAccounts.processed.reduce pt + env.fetchAccounts.unprocessed.merge pt + elif rc.value.storageRoot != emptyRlpHash: + env.fetchStorageFull.merge AccountSlotsHeader( + accKey: w, + storageRoot: rc.value.storageRoot) + + # Handle mothballed pivots for swapping in (see `pivotMothball()`) + if not topLevel: + for kvp in env.fetchStorageFull.nextPairs: + let rc = env.storageAccounts.insert(kvp.data.accKey.to(NodeTag)) + if rc.isOk: + rc.value.data = kvp.key + env.archived = true # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/range_fetch_accounts.nim b/nimbus/sync/snap/worker/range_fetch_accounts.nim index 0d866a20a..c546c26d5 100644 --- a/nimbus/sync/snap/worker/range_fetch_accounts.nim +++ b/nimbus/sync/snap/worker/range_fetch_accounts.nim @@ -39,7 +39,8 @@ import ../../sync_desc, ".."/[constants, range_desc, worker_desc], ./com/[com_error, get_account_range], - ./db/[hexary_envelope, snapdb_accounts] + ./db/[hexary_envelope, snapdb_accounts], + "."/[pivot_helper, swap_in] {.push raises: [Defect].} @@ -127,7 +128,7 @@ proc accountsRangefetchImpl( buddy.data.errors.resetComError() let - gotAccounts = dd.data.accounts.len + gotAccounts = dd.data.accounts.len # comprises `gotStorage` gotStorage = dd.withStorage.len #when extraTraceMessages: @@ -179,10 +180,29 @@ proc accountsRangefetchImpl( # Register accounts with storage slots on the storage TODO list. env.fetchStorageFull.merge dd.withStorage + var nSwapInLaps = 0 + if env.archived: + # Current pivot just became outdated, rebuild storage slots index (if any) + if 0 < gotStorage: + trace logTxt "mothballing", peer, pivot, gotStorage + env.pivotMothball + + elif swapInAccountsCoverageTrigger <= ctx.data.coveredAccounts.fullFactor: + # Swap in from other pivots + when extraTraceMessages: + trace logTxt "before swap in", peer, pivot, gotAccounts, gotStorage, + coveredHere=covered.fullFactor.toPC(2), + processed=fa.processed.fullFactor.toPC(2), + nProcessedChunks=fa.processed.chunks.uint.toSI + + if swapInAccountsPivotsMin <= ctx.data.pivotTable.len: + nSwapInLaps = buddy.swapInAccounts(env) + when extraTraceMessages: - trace logTxt "request done", peer, pivot, - covered=covered.fullFactor.toPC(2), - processed=fa.processed.fullFactor.toPC(2) + trace logTxt "request done", peer, pivot, gotAccounts, gotStorage, + nSwapInLaps, coveredHere=covered.fullFactor.toPC(2), + processed=fa.processed.fullFactor.toPC(2), + nProcessedChunks=fa.processed.chunks.uint.toSI return true @@ -210,7 +230,7 @@ proc rangeFetchAccounts*( var nFetchAccounts = 0 # for logging while not fa.processed.isFull() and buddy.ctrl.running and - not env.obsolete: + not env.archived: nFetchAccounts.inc if not await buddy.accountsRangefetchImpl(env): break diff --git a/nimbus/sync/snap/worker/range_fetch_storage_slots.nim b/nimbus/sync/snap/worker/range_fetch_storage_slots.nim index 840c94133..15dcb24c5 100644 --- a/nimbus/sync/snap/worker/range_fetch_storage_slots.nim +++ b/nimbus/sync/snap/worker/range_fetch_storage_slots.nim @@ -333,7 +333,7 @@ proc rangeFetchStorageSlots*( var fullRangeItemsleft = 1 + (fullRangeLen-1) div snapStorageSlotsFetchMax while 0 < fullRangeItemsleft and buddy.ctrl.running and - not env.obsolete: + not env.archived: # Pull out the next request list from the queue let req = buddy.getNextSlotItemsFull(env) if req.len == 0: @@ -344,7 +344,7 @@ proc rangeFetchStorageSlots*( var partialRangeItemsLeft = env.fetchStoragePart.len while 0 < partialRangeItemsLeft and buddy.ctrl.running and - not env.obsolete: + not env.archived: # Pull out the next request list from the queue let req = buddy.getNextSlotItemPartial(env) if req.len == 0: diff --git a/nimbus/sync/snap/worker/sub_tries_helper.nim b/nimbus/sync/snap/worker/sub_tries_helper.nim index af133cc85..f604a1eb7 100644 --- a/nimbus/sync/snap/worker/sub_tries_helper.nim +++ b/nimbus/sync/snap/worker/sub_tries_helper.nim @@ -15,7 +15,7 @@ import eth/[common, p2p], stew/interval_set, ".."/[constants, range_desc, worker_desc], - ./db/[hexary_desc, hexary_error, hexary_envelope, hexary_inspect] + ./db/[hexary_desc, hexary_error, hexary_inspect] {.push raises: [Defect].} @@ -53,22 +53,6 @@ proc doInspect( ok(stats) - -proc getOverlapping( - batch: SnapRangeBatchRef; ## Healing data support - iv: NodeTagRange; ## Reference interval - ): Result[NodeTagRange,void] = - ## Find overlapping interval in `batch` - block: - let rc = batch.processed.ge iv.minPt - if rc.isOk and rc.value.minPt <= iv.maxPt: - return ok(rc.value) - block: - let rc = batch.processed.le iv.maxPt - if rc.isOk and iv.minPt <= rc.value.maxPt: - return ok(rc.value) - err() - # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ @@ -141,87 +125,6 @@ proc subTriesFromPartialPaths*( batch.lockTriePerusal = false return err(error) - -proc subTriesNodesReclassify*( - getFn: HexaryGetFn; ## Abstract database access - rootKey: NodeKey; ## Start node into hexary trie - batch: SnapRangeBatchRef; ## Healing data support - ) {.gcsafe, raises: [Defect,KeyError].} = - ## Check whether previously missing nodes from the `sickSubTries` list have - ## been magically added to the database since it was checked last time. These - ## nodes will me moved to `checkNodes` for further processing. Also, some - ## full sub-tries might have been added which can be checked against - ## the `processed` range set. - - # Move `sickSubTries` entries that have now an exisiting node to the - # list of partial paths to be re-checked. - block: - var delayed: seq[NodeSpecs] - for w in batch.sickSubTries: - if 0 < getFn(w.nodeKey.ByteArray32).len: - batch.checkNodes.add w - else: - delayed.add w - batch.sickSubTries = delayed - - # Remove `checkNodes` entries with complete known sub-tries. - var - doneWith: seq[NodeSpecs] # loop will not recurse on that list - count = 0 # for logging only - - # `While` loop will terminate with processed paths in `doneWith`. - block: - var delayed: seq[NodeSpecs] - while 0 < batch.checkNodes.len: - - when extraTraceMessages: - trace logTxt "reclassify", count, - nCheckNodes=batch.checkNodes.len - - for w in batch.checkNodes: - let - iv = w.hexaryEnvelope - nCov = batch.processed.covered iv - - if iv.len <= nCov: - # Fully processed envelope, no need to keep `w` any longer - when extraTraceMessages: - trace logTxt "reclassify discard", count, partialPath=w, - nDelayed=delayed.len - continue - - if 0 < nCov: - # Partially processed range, fetch an overlapping interval and - # remove that from the envelope of `w`. - try: - let paths = block: - let rc = w.partialPath.hexaryEnvelopeDecompose( - rootKey, batch.getOverlapping(iv).value, getFn) - if rc.isErr: - continue - rc.value - delayed &= paths - when extraTraceMessages: - trace logTxt "reclassify dismantled", count, partialPath=w, - nPaths=paths.len, nDelayed=delayed.len - continue - except RlpError: - discard - - # Not processed at all. So keep `w` but there is no need to look - # at it again in the next lap. - doneWith.add w - - # Prepare for next lap - batch.checkNodes.swap delayed - delayed.setLen(0) - - batch.checkNodes = doneWith.hexaryEnvelopeUniq - - when extraTraceMessages: - trace logTxt "reclassify finalise", count, - nDoneWith=doneWith.len, nCheckNodes=batch.checkNodes.len - # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/swap_in.nim b/nimbus/sync/snap/worker/swap_in.nim new file mode 100644 index 000000000..a2cfc9b76 --- /dev/null +++ b/nimbus/sync/snap/worker/swap_in.nim @@ -0,0 +1,392 @@ +# Nimbus +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +## Swap in already allocated sub-tries +## =================================== +## +## This module imports sub-tries from other pivots into the current. It does +## so by detecting the top of an existing sub-trie in the current pivot and +## searches other pivots for the part of the sub-trie that is already +## available there. So it can be marked accomplished on the current pivot. +## +## Note that the terminology hinges on *account pivots* but is implemented in +## a more general way where +## +## * the current pivot is of type `SnapRangeBatchRef` +## +## * other pivots are represented by an iterator of type `SwapInPivots` +## +## So the algorithm can be transferred to other that accounting pivot +## situations. +## +## Algorithm +## --------- +## +## * On the *current pivot*, use the `processed` ranges of accounts to find all +## the nodes the envelopes of which are disjunct to the `processed` ranges +## (see module `hexary_envelope` for supporting functions.) +## +## * Select all the non-dangling/existing nodes disjunct envelopes from the +## previous step. +## +## * For all the selected non-dangling nodes from the previous step, check +## which ones are present in other pivots. This means that for a given +## existing node in the current pivot its *partial path* can be applied +## to the *state root* key of another pivot ending up at the same node key. +## +## The portion of `processed` ranges on the other pivot that intersects with +## the envelope of the node has been downloaded already. It is equally +## applicable to the current pivot as it applies to the same sub-trie. So +## the intersection of `processed` with the node envelope can be copied to +## to the `processed` ranges of the current pivot. +## +## * Rinse and repeat. +## +import + std/[sequtils, strutils], + chronicles, + eth/[common, p2p], + stew/[byteutils, interval_set, keyed_queue, sorted_set], + ../../../utils/prettify, + ".."/[range_desc, worker_desc], + ./db/[hexary_desc, hexary_error, hexary_envelope, + hexary_paths, snapdb_accounts] + +{.push raises: [Defect].} + +logScope: + topics = "snap-swapin" + +type + SwapInPivot = object + ## Subset of `SnapPivotRef` with relevant parts, only + rootKey: NodeKey ## Storage slots & accounts + processed: NodeTagRangeSet ## Storage slots & accounts + pivot: SnapPivotRef ## Accounts only + +const + extraTraceMessages = false or true + ## Enabled additional logging noise + +when extraTraceMessages: + import std/math, ../../types + +# ------------------------------------------------------------------------------ +# Private logging helpers +# ------------------------------------------------------------------------------ + +template logTxt(info: static[string]): static[string] = + "Swap-in helper " & info + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +proc decompose( + node: NodeSpecs; # Contains hex encoded partial path + iv: NodeTagRange; # Proofed range of leaf paths + rootKey: NodeKey; # Start node into hexary trie + getFn: HexaryGetFn; # Abstract database access + ): Result[seq[NodeSpecs],void] = + ## Decompose, succeed only if there is a change + var error: HexaryError + + try: + let rc = node.partialPath.hexaryEnvelopeDecompose(rootKey, iv, getFn) + if rc.isErr: + error = rc.error + elif rc.value.len != 1 or rc.value[0].nodeKey != node.nodeKey: + return ok(rc.value) + else: + return err() + except RlpError: + error = RlpEncoding + + when extraTraceMessages: + trace logTxt "envelope decomposition failed", + node=node.partialPath.toHex, error + + err() + + +proc existsInTrie( + node: NodeSpecs; # Probe node to test to exist + rootKey: NodeKey; # Start node into hexary trie + getFn: HexaryGetFn; # Abstract database access + ): bool = + ## Check whether this node exists on the sub-trie starting at ` rootKey` + var error: HexaryError + + try: + let rc = node.partialPath.hexaryPathNodeKey(rootKey, getFn) + if rc.isOk: + return rc.value == node.nodeKey + except RlpError: + error = RlpEncoding + + when extraTraceMessages: + trace logTxt "check nodes failed", + partialPath=node.partialPath.toHex, error + + false + + +template noKeyErrorOops(info: static[string]; code: untyped) = + try: + code + except KeyError as e: + raiseAssert "Not possible (" & info & "): " & e.msg + +template noExceptionOops(info: static[string]; code: untyped) = + try: + code + except Defect as e: + raise e + except Exception as e: + raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc decomposeCheckNodes( + pivot: SnapRangeBatchRef; # Healing data support + rootKey: NodeKey; # Start node into hexary trie + getFn: HexaryGetFn; # Abstract database access + ): Result[seq[NodeSpecs],void] = + ## Decompose the `checkNodes` list of the argument `pivot` relative to the + ## set `processed` of processed leaf node ranges. + ## + ## The function fails if there wan no change to the `checkNodes` list. + var + delayed: seq[NodeSpecs] + didSomething = 0 + + # Remove `checkNodes` entries with known complete sub-tries. + for node in pivot.checkNodes: + var paths: seq[NodeSpecs] + + # For a Partially processed range, fetch overlapping intervals and + # sort of remove them from the envelope of `w`. + for touched in pivot.processed.hexaryEnvelopeTouchedBy(node).increasing: + let rc = node.decompose(touched, rootKey, getFn) + if rc.isOk: + paths &= rc.value + didSomething.inc + when extraTraceMessages: + trace logTxt "checkNodes decompose", nDelayed=delayed.len, + node=node.partialPath.toHex, nPaths=paths.len, + newPaths=rc.value.mapIt(it.partialPath.toHex).join(",") + # End inner for() + + delayed &= paths + # End outer for() + + if 0 < didSomething: + noKeyErrorOops("subTriesCheckNodesDecompose"): + # Remove duplicates in resulting path list + return ok(delayed.hexaryEnvelopeUniq) + + err() + + +proc otherProcessedRanges( + node: NodeSpecs; # Top node of portential sub-trie + otherPivots: seq[SwapInPivot]; # Other pivots list + rootKey: NodeKey; # Start node into hexary trie + getFn: HexaryGetFn; # Abstract database access + ): seq[NodeTagRangeSet] = + ## Collect already processed ranges from other pivots intersecting with the + ## envelope of the argument `node`. The list of other pivots is represented + ## by the argument iterator `otherPivots`. + let envelope = node.hexaryEnvelope + var count = 0 # logging & debugging + + noExceptionOops("otherProcessedRanges"): + # For the current `node` select all hexary sub-tries that contain the same + # node `node.nodeKey` for the partial path `node.partianPath`. + for rp in otherPivots.items: + # Check whether the node is shared + let haveNode = node.existsInTrie(rp.rootKey, getFn) + + var subCount = 0 # logging & debugging + count.inc # logging & debugging + + result.add NodeTagRangeSet.init() + + if not haveNode: + trace logTxt "history loop", count, node=node.partialPath.toHex, + processed=rp.processed.fullFactor.toPC(3), haveNode + + if haveNode: + when extraTraceMessages: + trace logTxt "history loop => sub start", count, + nTouched=rp.processed.hexaryEnvelopeTouchedBy(node).chunks, haveNode + + # Import already processed part of the envelope of `node` into the + # `batch.processed` set of ranges. + for iv in rp.processed.hexaryEnvelopeTouchedBy(node).increasing: + let segment = (envelope * iv).value + discard result[^1].merge segment + + subCount.inc # dlogging & ebugging + when extraTraceMessages: + trace logTxt "history loop => sub", count, subCount, + touchedLen=segment.fullFactor.toPC(3) + +# ------------------------------------------------------------------------------ +# Private functions, swap-in functionality +# ------------------------------------------------------------------------------ + +proc swapIn*( + pivot: SnapRangeBatchRef; # Healing state for target hexary trie + otherPivots: seq[SwapInPivot]; # Other pivots list + rootKey: NodeKey; # Start node into target hexary trie + getFn: HexaryGetFn; # Abstract database access + loopMax = 20; # Prevent from looping too often + ): (int,seq[NodeTagRangeSet]) = + ## Collect processed already ranges from argument `otherPivots` and register + ## it onto the argument `pivot`. This function recognises and imports + ## directly accessible sub-tries where the top-level node exists. + var + lapCount = 0 + notDoneYet = true + swappedIn = newSeq[NodeTagRangeSet](otherPivots.len) + + # Initialise return value + for n in 0 ..< swappedIn.len: + swappedIn[n] = NodeTagRangeSet.init() + + while notDoneYet and lapCount < loopMax: + var + merged = 0.u256 + nCheckNodesBefore = 0 # debugging + + # Decompose `checkNodes` into sub-tries disjunct from `processed` + let toBeReclassified = block: + let rc = pivot.decomposeCheckNodes(rootKey, getFn) + if rc.isErr: + return (lapCount,swappedIn) # nothing to do + rc.value + + lapCount.inc + notDoneYet = false + + # Reclassify nodes into existing/allocated and dangling ones + noKeyErrorOops("swapIn"): + var + checkNodes: seq[NodeSpecs] + sickNodes: seq[NodeSpecs] + for node in toBeReclassified: + # Check whether previously missing nodes from the `sickSubTries` list + # have been magically added to the database since it was checked last + # time. These nodes will me moved to `checkNodes` for further + # processing. + if node.nodeKey.ByteArray32.getFn().len == 0: + sickNodes.add node # probably subject to healing + else: + let iv = node.hexaryEnvelope + if pivot.processed.covered(iv) < iv.len: + checkNodes.add node # may be swapped in + pivot.checkNodes = checkNodes.hexaryEnvelopeUniq + pivot.sickSubTries = sickNodes.hexaryEnvelopeUniq + + nCheckNodesBefore = pivot.checkNodes.len # logging & debugging + + # Swap in node ranges from other pivots + for node in pivot.checkNodes: + for n,rangeSet in node.otherProcessedRanges(otherPivots,rootKey,getFn): + for iv in rangeSet.increasing: + discard swappedIn[n].merge iv # imported range / other pivot + merged += pivot.processed.merge iv # import this range + pivot.unprocessed.reduce iv # no need to fetch it again + notDoneYet = 0 < merged # loop control + + # Remove fully covered nodes + block: + var checkNodes: seq[NodeSpecs] + for node in toBeReclassified: + let iv = node.hexaryEnvelope + if pivot.processed.covered(iv) < iv.len: + checkNodes.add node # may be swapped in + pivot.checkNodes = checkNodes.hexaryEnvelopeUniq + + when extraTraceMessages: + let mergedFactor = merged.to(float) / (2.0^256) + trace logTxt "inherited ranges", nCheckNodesBefore, + nCheckNodes=pivot.checkNodes.len, merged=mergedFactor.toPC(3) + + # End while() + + (lapCount,swappedIn) + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc swapInAccounts*( + buddy: SnapBuddyRef; # Worker peer + env: SnapPivotRef; # Current pivot environment + loopMax = 20; # Prevent from looping too often + ): int = + ## Variant of `swapIn()` for the particular case of accounts database pivots. + let + ctx = buddy.ctx + rootKey = env.stateHeader.stateRoot.to(NodeKey) + getFn = ctx.data.snapDb.getAccountFn + + others = toSeq(ctx.data.pivotTable.nextPairs) + + # Swap in from mothballed pifots different from the current one + .filterIt(it.data.archived and it.key.to(NodeKey) != rootKey) + + # Extract relevant parts + .mapIt(SwapInPivot( + rootKey: it.key.to(NodeKey), + processed: it.data.fetchAccounts.processed, + pivot: it.data)) + var + nLaps: int + swappedIn: seq[NodeTagRangeSet] + + noExceptionOops("swapInAccounts"): + (nLaps,swappedIn) = env.fetchAccounts.swapIn(others,rootKey,getFn,loopMax) + + noKeyErrorOops("swapInAccounts"): + # Update storage slots + doAssert swappedIn.len == others.len + for n in 0 ..< others.len: + + when extraTraceMessages: + trace logTxt "post-processing storage slots", inx=n, maxInx=others.len, + changes=swappedIn[n].fullFactor.toPC(3), chunks=swappedIn[n].chunks + + # Revisit all imported account key ranges + for iv in swappedIn[n].increasing: + + # The `storageAccounts` list contains indices for storage slots, mapping + # account keys => storage root + var rc = others[n].pivot.storageAccounts.ge(iv.minPt) + while rc.isOk and rc.value.key <= iv.maxPt: + + # Fetch storage slots specs from `fetchStorageFull` list + let stRoot = rc.value.data + if others[n].pivot.fetchStorageFull.hasKey(stRoot): + let accKey = others[n].pivot.fetchStorageFull[stRoot].accKey + discard env.fetchStorageFull.append( + stRoot, SnapSlotsQueueItemRef(acckey: accKey)) + + rc = others[n].pivot.storageAccounts.gt(rc.value.key) + + nLaps + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker_desc.nim b/nimbus/sync/snap/worker_desc.nim index ead55f38e..08eda903c 100644 --- a/nimbus/sync/snap/worker_desc.nim +++ b/nimbus/sync/snap/worker_desc.nim @@ -11,7 +11,7 @@ import std/hashes, eth/[common, p2p], - stew/[interval_set, keyed_queue], + stew/[interval_set, keyed_queue, sorted_set], ../../db/select_backend, ../sync_desc, ./worker/com/com_error, @@ -22,6 +22,9 @@ import {.push raises: [Defect].} type + SnapAccountsList* = SortedSet[NodeTag,Hash256] + ## Sorted pair of `(account,state-root)` entries + SnapSlotsQueue* = KeyedQueue[Hash256,SnapSlotsQueueItemRef] ## Handles list of storage slots data for fetch indexed by storage root. ## @@ -72,7 +75,10 @@ type # Info nAccounts*: uint64 ## Imported # of accounts nSlotLists*: uint64 ## Imported # of account storage tries - obsolete*: bool ## Not latest pivot, anymore + + # Mothballing, ready to be swapped into newer pivot record + storageAccounts*: SnapAccountsList ## Accounts with missing stortage slots + archived*: bool ## Not latest pivot, anymore SnapPivotTable* = KeyedQueue[Hash256,SnapPivotRef] ## LRU table, indexed by state root