diff --git a/nimbus/sync/snap/constants.nim b/nimbus/sync/snap/constants.nim index f3f429788..b923bb24a 100644 --- a/nimbus/sync/snap/constants.nim +++ b/nimbus/sync/snap/constants.nim @@ -41,26 +41,30 @@ const # -------------- - snapRequestBytesLimit* = 2 * 1024 * 1024 - ## Soft bytes limit to request in `snap` protocol calls. + fetchRequestBytesLimit* = 2 * 1024 * 1024 + ## Soft bytes limit to request in `snap/1` protocol calls. - snapRequestTrieNodesFetchMax* = 1024 - ## Informal maximal number of trie nodes to fetch at once in `snap` + fetchRequestTrieNodesMax* = 1024 + ## Informal maximal number of trie nodes to fetch at once in `snap/1` ## protocol calls. This is not an official limit but found with several ## implementations (e.g. Geth.) ## - ## Resticting the fetch list length early allows to better paralellise + ## Resticting the fetch list length early allows to better parallelise ## healing. + fetchRequestStorageSlotsMax* = 2 * 1024 + ## Maximal number of storage tries to fetch with a single request message. - snapAccountsSaveProcessedChunksMax* = 1000 + # -------------- + + accountsSaveProcessedChunksMax* = 1000 ## Recovery data are stored if the processed ranges list contains no more ## than this many range *chunks*. ## ## If the range set is too much fragmented, no data will be saved and ## restart has to perform from scratch or an earlier checkpoint. - snapAccountsSaveStorageSlotsMax* = 20_000 + accountsSaveStorageSlotsMax* = 20_000 ## Recovery data are stored if the oustanding storage slots to process do ## not amount to more than this many entries. ## @@ -68,10 +72,12 @@ const ## has to perform from scratch or an earlier checkpoint. - snapStorageSlotsFetchMax* = 2 * 1024 - ## Maximal number of storage tries to fetch with a single message. + storageSlotsTrieInheritPerusalMax* = 30_000 + ## Maximal number of nodes to visit in order to find out whether this + ## storage slots trie is complete. This allows to *inherit* the full trie + ## for an existing root node if the trie is small enough. - snapStorageSlotsQuPrioThresh* = 5_000 + storageSlotsQuPrioThresh* = 5_000 ## For a new worker, prioritise processing the storage slots queue over ## processing accounts if the queue has more than this many items. ## @@ -81,68 +87,33 @@ const # -------------- - swapInAccountsCoverageTrigger* = 0.30 - ## Similar to `healAccountsCoverageTrigger` below only for trying to - ## swap in from earlier pivots. - - swapInAccountsPivotsMin* = 2 - ## Require at least this man pivots available before any swap in can - ## take place (must be at least 2.) This value is typically combined - ## with `swapInAccountsCoverageTrigger`. - - # -------------- - - healInspectionBatch* = 10_000 - ## Number of nodes to inspect in a single batch. In between batches, a - ## task/thread switch is allowed. - - healInspectionBatchWaitNanoSecs* = 500 - ## Wait some time asynchroneously after processing `healInspectionBatch` - ## nodes to allow for a pseudo -task switch. - - - healAccountsCoverageTrigger* = 1.3 + healAccountsCoverageTrigger* = 1.01 ## Apply accounts healing if the global snap download coverage factor ## exceeds this setting. The global coverage factor is derived by merging ## all account ranges retrieved for all pivot state roots (see - ## `coveredAccounts` in `CtxData`.) - ## - ## A small value of this constant leads to early healing. This produces - ## stray leaf account records so fragmenting larger intervals of missing - ## account ranges. This in turn leads to smaller but more range requests - ## over the network. More requests might be a disadvantage if peers only - ## serve a maximum number requests (rather than data.) + ## `coveredAccounts` in the object `CtxData`.) Note that a coverage factor + ## greater than 100% is not exact but rather a lower bound estimate. - healAccountsPivotTriggerMinFactor* = 0.17 - ## Additional condition to meet before starting healing. The current - ## pivot must have at least this much processed as recorded in the - ## `processed` ranges set. This is the minimim value (see below.) - - healAccountsPivotTriggerWeight* = 0.01 - healAccountsPivotTriggerNMax* = 10 - ## Enable healing not before the `processed` ranges set fill factor has - ## at least the following value. - ## :: - ## MinFactor + max(0, NMax - pivotTable.len) * Weight + healAccountsInspectionPlanBLevel* = 4 + ## Search this level deep for missing nodes if `hexaryEnvelopeDecompose()` + ## only produces existing nodes. ## - ## (the `healAccountsPivotTrigger` prefix of the constant names is ommited.) - ## - ## This effects in favouring late healing when more pivots have been - ## downloaded. + ## The maximal number of nodes visited at level 3 is *4KiB* and at level 4 + ## is *64Kib*. - healAccountsBatchFetchMax* = 10 * 1024 + healAccountsBatchMax* = 10 * 1024 ## Keep on gloing in healing task up until this many nodes have been ## fetched from the network or some error contition terminates the task. ## - ## This constant should be larger than `snapStorageSlotsFetchMax` + ## This constant should be larger than `fetchRequestStorageSlotsMax` healSlorageSlotsTrigger* = 0.70 ## Consider per account storage slost healing if a per-account hexary ## sub-trie has reached this factor of completeness. - healStorageSlotsInspectionBatch* = 10_000 - ## Similar to `healAccountsInspectionBatch` but for storage slots. + healStorageSlotsInspectionPlanBLevel* = 4 + ## Similar to `healAccountsInspectionPlanBLevel` healStorageSlotsBatchMax* = 32 ## Maximal number of storage tries to to heal in a single batch run. Only @@ -177,9 +148,19 @@ const ## Set 0 to disable. static: - doAssert 1 < swapInAccountsPivotsMin - doAssert snapStorageSlotsQuPrioThresh < snapAccountsSaveStorageSlotsMax - doAssert snapStorageSlotsFetchMax < healAccountsBatchFetchMax + doAssert storageSlotsQuPrioThresh < accountsSaveStorageSlotsMax + doAssert fetchRequestTrieNodesMax < healAccountsBatchMax + + +# Deprecated, to be expired +const + healInspectionBatch* = 10_000 + ## Number of nodes to inspect in a single batch. In between batches, a + ## task/thread switch is allowed. + + healInspectionBatchWaitNanoSecs* = 500 + ## Wait some time asynchroneously after processing `healInspectionBatch` + ## nodes to allow for a pseudo -task switch. # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker.nim b/nimbus/sync/snap/worker.nim index fc8f2aa3f..f3bb5da05 100644 --- a/nimbus/sync/snap/worker.nim +++ b/nimbus/sync/snap/worker.nim @@ -289,43 +289,35 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} = nSlotLists = env.nSlotLists processed = env.fetchAccounts.processed.fullFactor.toPC(2) nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len - accHealThresh = env.healThresh.toPC(2) trace "Multi sync runner", peer, pivot, nAccounts, nSlotLists, processed, - nStoQu, accHealThresh + nStoQu # This one is the syncing work horse which downloads the database await env.execSnapSyncAction(buddy) - if env.archived: - let - peer = buddy.peer - nAccounts = env.nAccounts - nSlotLists = env.nSlotLists - when extraTraceMessages: - trace "Mothballing", peer, pivot=("#" & $env.stateHeader.blockNumber), - nAccounts=env.nAccounts, nSlotLists=env.nSlotLists - env.pivotMothball() - return # pivot has changed + # Various logging entries (after accounts and storage slots download) + let + nAccounts = env.nAccounts + nSlotLists = env.nSlotLists + processed = env.fetchAccounts.processed.fullFactor.toPC(2) + nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len - block: + if env.archived: + # Archive pivot if it became stale + when extraTraceMessages: + trace "Mothballing", peer, pivot, nAccounts, nSlotLists + env.pivotMothball() + + else: # Save state so sync can be partially resumed at next start up - let - nAccounts = env.nAccounts - nSlotLists = env.nSlotLists - processed = env.fetchAccounts.processed.fullFactor.toPC(2) - nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len - accHealThresh = env.healThresh.toPC(2) - rc = env.saveCheckpoint(ctx) + let rc = env.saveCheckpoint(ctx) if rc.isErr: error "Failed to save recovery checkpoint", peer, pivot, nAccounts, nSlotLists, processed, nStoQu, error=rc.error else: when extraTraceMessages: trace "Saved recovery checkpoint", peer, pivot, nAccounts, nSlotLists, - processed, nStoQu, blobSize=rc.value, accHealThresh - - if buddy.ctrl.stopped: - return # peer worker has gone + processed, nStoQu, blobSize=rc.value # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/com/get_account_range.nim b/nimbus/sync/snap/worker/com/get_account_range.nim index 581a7cf66..ed7a0e3ff 100644 --- a/nimbus/sync/snap/worker/com/get_account_range.nim +++ b/nimbus/sync/snap/worker/com/get_account_range.nim @@ -45,7 +45,7 @@ proc getAccountRangeReq( peer = buddy.peer try: let reply = await peer.getAccountRange( - root, iv.minPt.to(Hash256), iv.maxPt.to(Hash256), snapRequestBytesLimit) + root, iv.minPt.to(Hash256), iv.maxPt.to(Hash256), fetchRequestBytesLimit) return ok(reply) except CatchableError as e: trace trSnapRecvError & "waiting for GetAccountRange reply", peer, pivot, @@ -66,8 +66,7 @@ proc getAccountRange*( let peer = buddy.peer if trSnapTracePacketsOk: - trace trSnapSendSending & "GetAccountRange", peer, pivot, - accRange=iv, bytesLimit=snapRequestBytesLimit + trace trSnapSendSending & "GetAccountRange", peer, pivot, accRange=iv var dd = block: let rc = await buddy.getAccountRangeReq(stateRoot, iv, pivot) diff --git a/nimbus/sync/snap/worker/com/get_storage_ranges.nim b/nimbus/sync/snap/worker/com/get_storage_ranges.nim index 1302a9d31..11f07a545 100644 --- a/nimbus/sync/snap/worker/com/get_storage_ranges.nim +++ b/nimbus/sync/snap/worker/com/get_storage_ranges.nim @@ -61,13 +61,13 @@ proc getStorageRangesReq( root, accounts, # here the interval bounds are an `array[32,byte]` iv.get.minPt.to(Hash256).data, iv.get.maxPt.to(Hash256).data, - snapRequestBytesLimit) + fetchRequestBytesLimit) else: reply = await peer.getStorageRanges( root, accounts, # here the interval bounds are of empty `Blob` type emptyBlob, emptyBlob, - snapRequestBytesLimit) + fetchRequestBytesLimit) return ok(reply) except CatchableError as e: @@ -101,8 +101,7 @@ proc getStorageRanges*( return err(ComEmptyAccountsArguments) if trSnapTracePacketsOk: - trace trSnapSendSending & "GetStorageRanges", peer, pivot, - nAccounts, bytesLimit=snapRequestBytesLimit + trace trSnapSendSending & "GetStorageRanges", peer, pivot, nAccounts let iv = accounts[0].subRange diff --git a/nimbus/sync/snap/worker/com/get_trie_nodes.nim b/nimbus/sync/snap/worker/com/get_trie_nodes.nim index 3cf886034..df0a1eb39 100644 --- a/nimbus/sync/snap/worker/com/get_trie_nodes.nim +++ b/nimbus/sync/snap/worker/com/get_trie_nodes.nim @@ -43,7 +43,8 @@ proc getTrieNodesReq( let peer = buddy.peer try: - let reply = await peer.getTrieNodes(stateRoot, paths, snapRequestBytesLimit) + let reply = await peer.getTrieNodes( + stateRoot, paths, fetchRequestBytesLimit) return ok(reply) except CatchableError as e: @@ -74,8 +75,7 @@ proc getTrieNodes*( let nTotal = paths.mapIt(it.len).foldl(a+b, 0) if trSnapTracePacketsOk: - trace trSnapSendSending & "GetTrieNodes", peer, pivot, - nPaths, nTotal, bytesLimit=snapRequestBytesLimit + trace trSnapSendSending & "GetTrieNodes", peer, pivot, nPaths, nTotal let trieNodes = block: let rc = await buddy.getTrieNodesReq(stateRoot, paths, pivot) diff --git a/nimbus/sync/snap/worker/db/hexary_desc.nim b/nimbus/sync/snap/worker/db/hexary_desc.nim index 1660b2011..08d4f5f34 100644 --- a/nimbus/sync/snap/worker/db/hexary_desc.nim +++ b/nimbus/sync/snap/worker/db/hexary_desc.nim @@ -134,21 +134,6 @@ type nodeKey*: RepairKey ## Leaf hash into hexary repair table payload*: Blob ## Data payload - TrieNodeStatCtxRef* = ref object - ## Context to resume searching for dangling links - case persistent*: bool - of true: - hddCtx*: seq[(NodeKey,NibblesSeq)] - else: - memCtx*: seq[(RepairKey,NibblesSeq)] - - TrieNodeStat* = object - ## Trie inspection report - dangling*: seq[NodeSpecs] ## Referes to nodes with incomplete refs - level*: int ## Maximum nesting depth of dangling nodes - stopped*: bool ## Potential loop detected if `true` - resumeCtx*: TrieNodeStatCtxRef ## Context for resuming inspection - HexaryTreeDbRef* = ref object ## Hexary trie plus helper structures tab*: Table[RepairKey,RNodeRef] ## key-value trie table, in-memory db @@ -294,14 +279,6 @@ proc ppImpl(db: HexaryTreeDbRef; root: NodeKey): seq[string] = except Exception as e: result &= " ! Ooops ppImpl(): name=" & $e.name & " msg=" & e.msg -proc ppDangling(a: seq[NodeSpecs]; maxItems = 30): string = - proc ppBlob(w: Blob): string = - w.mapIt(it.toHex(2)).join.toLowerAscii - let - q = a.mapIt(it.partialPath.ppBlob)[0 ..< min(maxItems,a.len)] - andMore = if maxItems < a.len: ", ..[#" & $a.len & "].." else: "" - "{" & q.join(",") & andMore & "}" - # ------------------------------------------------------------------------------ # Public debugging helpers # ------------------------------------------------------------------------------ @@ -347,13 +324,6 @@ proc pp*(db: HexaryTreeDbRef; indent=4): string = ## varinat of `pp()` above db.ppImpl(NodeKey.default).join(indent.toPfx) -proc pp*(a: TrieNodeStat; db: HexaryTreeDbRef; maxItems = 30): string = - result = "(" & $a.level - if a.stopped: - result &= "stopped," - result &= $a.dangling.len & "," & - a.dangling.ppDangling(maxItems) & ")" - # ------------------------------------------------------------------------------ # Public constructor (or similar) # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/db/hexary_inspect.nim b/nimbus/sync/snap/worker/db/hexary_inspect.nim index c6cd97492..2e9f6b6fc 100644 --- a/nimbus/sync/snap/worker/db/hexary_inspect.nim +++ b/nimbus/sync/snap/worker/db/hexary_inspect.nim @@ -9,7 +9,7 @@ # except according to those terms. import - std/tables, + std/[sequtils, strutils, tables], chronicles, eth/[common, trie/nibbles], stew/results, @@ -21,12 +21,41 @@ import logScope: topics = "snap-db" +type + TrieNodeStatCtxRef* = ref object + ## Context to resume searching for dangling links + case persistent*: bool + of true: + hddCtx*: seq[(NodeKey,NibblesSeq)] + else: + memCtx*: seq[(RepairKey,NibblesSeq)] + + TrieNodeStat* = object + ## Trie inspection report + dangling*: seq[NodeSpecs] ## Referes to nodes with incomplete refs + count*: uint64 ## Number of nodes visited + level*: uint8 ## Maximum nesting depth of dangling nodes + stopped*: bool ## Potential loop detected if `true` + resumeCtx*: TrieNodeStatCtxRef ## Context for resuming inspection + const extraTraceMessages = false # or true when extraTraceMessages: import stew/byteutils +# ------------------------------------------------------------------------------ +# Private helpers, debugging +# ------------------------------------------------------------------------------ + +proc ppDangling(a: seq[NodeSpecs]; maxItems = 30): string = + proc ppBlob(w: Blob): string = + w.mapIt(it.toHex(2)).join.toLowerAscii + let + q = a.mapIt(it.partialPath.ppBlob)[0 ..< min(maxItems,a.len)] + andMore = if maxItems < a.len: ", ..[#" & $a.len & "].." else: "" + "{" & q.join(",") & andMore & "}" + # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ @@ -114,34 +143,45 @@ proc to*(resumeCtx: TrieNodeStatCtxRef; T: type seq[NodeSpecs]): T = proc hexaryInspectTrie*( - db: HexaryTreeDbRef; ## Database - root: NodeKey; ## State root - paths: seq[Blob] = @[]; ## Starting paths for search - resumeCtx: TrieNodeStatCtxRef = nil; ## Context for resuming inspection - suspendAfter = high(uint64); ## To be resumed - stopAtLevel = 64; ## Instead of loop detector + db: HexaryTreeDbRef; # Database + root: NodeKey; # State root + partialPaths: seq[Blob] = @[]; # Starting paths for search + resumeCtx: TrieNodeStatCtxRef = nil; # Context for resuming inspection + suspendAfter = high(uint64); # To be resumed + stopAtLevel = 64u8; # Width-first depth level + maxDangling = high(int); # Maximal number of dangling results ): TrieNodeStat {.gcsafe, raises: [Defect,KeyError]} = ## Starting with the argument list `paths`, find all the non-leaf nodes in ## the hexary trie which have at least one node key reference missing in ## the trie database. The references for these nodes are collected and ## returned. - ## * Search list `paths` argument entries that do not refer to a hexary node - ## are ignored. - ## * For any search list `paths` argument entry, this function stops if - ## the search depth exceeds `stopAtLevel` levels of linked sub-nodes. - ## * Argument `paths` list entries and partial paths on the way that do not - ## refer to a valid extension or branch type node are silently ignored. ## - ## Trie inspection can be automatically suspended after having visited - ## `suspendAfter` nodes to be resumed at the last state. An application of - ## this feature would look like - ## :: - ## var ctx = TrieNodeStatCtxRef() - ## while not ctx.isNil: - ## let state = hexaryInspectTrie(db, root, paths, resumeCtx=ctx, 1024) - ## ... - ## ctx = state.resumeCtx + ## * Argument `partialPaths` list entries that do not refer to an existing + ## and allocated hexary trie node are silently ignored. So are enytries + ## that not refer to either a valid extension or a branch type node. + ## + ## * This function traverses the hexary trie in *width-first* mode + ## simultaneously for any entry of the argument `partialPaths` list. Abart + ## from completing the search there are three conditions when the search + ## pauses to return the current state (via `resumeCtx`, see next bullet + ## point): + ## + The depth level of the running algorithm exceeds `stopAtLevel`. + ## + The number of visited nodes exceeds `suspendAfter`. + ## + Te number of cunnently collected dangling nodes exceeds `maxDangling`. + ## If the function pauses because the current depth exceeds `stopAtLevel` + ## then the `stopped` flag of the result object will be set, as well. + ## + ## * When paused for some of the reasons listed above, the `resumeCtx` field + ## of the result object contains the current state so that the function + ## can resume searching from where is paused. An application using this + ## feature could look like: + ## :: + ## var ctx = TrieNodeStatCtxRef() + ## while not ctx.isNil: + ## let state = hexaryInspectTrie(db, root, paths, resumeCtx=ctx, 1024) + ## ... + ## ctx = state.resumeCtx ## let rootKey = root.to(RepairKey) if not db.tab.hasKey(rootKey): @@ -150,7 +190,6 @@ proc hexaryInspectTrie*( var reVisit: seq[(RepairKey,NibblesSeq)] again: seq[(RepairKey,NibblesSeq)] - numActions = 0u64 resumeOk = false # Initialise lists from previous session @@ -160,30 +199,32 @@ proc hexaryInspectTrie*( resumeOk = true reVisit = resumeCtx.memCtx - if paths.len == 0 and not resumeOk: + if partialPaths.len == 0 and not resumeOk: reVisit.add (rootKey,EmptyNibbleRange) else: # Add argument paths - for w in paths: + for w in partialPaths: let (isLeaf,nibbles) = hexPrefixDecode w if not isLeaf: let rc = nibbles.hexaryPathNodeKey(rootKey, db, missingOk=false) if rc.isOk: reVisit.add (rc.value.to(RepairKey), nibbles) - while 0 < reVisit.len and numActions <= suspendAfter: + # Stopping on `suspendAfter` has precedence over `stopAtLevel` + while 0 < reVisit.len and result.count <= suspendAfter: if stopAtLevel < result.level: result.stopped = true break for n in 0 ..< reVisit.len: - let (rKey,parentTrail) = reVisit[n] - if suspendAfter < numActions: + if suspendAfter < result.count or + maxDangling <= result.dangling.len: # Swallow rest - again = again & reVisit[n ..< reVisit.len] + again &= reVisit[n ..< reVisit.len] break let + (rKey, parentTrail) = reVisit[n] node = db.tab[rKey] parent = rKey.convertTo(NodeKey) @@ -203,7 +244,7 @@ proc hexaryInspectTrie*( # Ooops, forget node and key discard - numActions.inc + result.count.inc # End `for` result.level.inc @@ -218,12 +259,13 @@ proc hexaryInspectTrie*( proc hexaryInspectTrie*( - getFn: HexaryGetFn; ## Database abstraction - rootKey: NodeKey; ## State root - paths: seq[Blob] = @[]; ## Starting paths for search - resumeCtx: TrieNodeStatCtxRef = nil; ## Context for resuming inspection - suspendAfter = high(uint64); ## To be resumed - stopAtLevel = 64; ## Instead of loop detector + getFn: HexaryGetFn; # Database abstraction + rootKey: NodeKey; # State root + partialPaths: seq[Blob] = @[]; # Starting paths for search + resumeCtx: TrieNodeStatCtxRef = nil; # Context for resuming inspection + suspendAfter = high(uint64); # To be resumed + stopAtLevel = 64u8; # Width-first depth level + maxDangling = high(int); # Maximal number of dangling results ): TrieNodeStat {.gcsafe, raises: [Defect,RlpError]} = ## Variant of `hexaryInspectTrie()` for persistent database. @@ -240,7 +282,6 @@ proc hexaryInspectTrie*( var reVisit: seq[(NodeKey,NibblesSeq)] again: seq[(NodeKey,NibblesSeq)] - numActions = 0u64 resumeOk = false # Initialise lists from previous session @@ -250,18 +291,19 @@ proc hexaryInspectTrie*( resumeOk = true reVisit = resumeCtx.hddCtx - if paths.len == 0 and not resumeOk: + if partialPaths.len == 0 and not resumeOk: reVisit.add (rootKey,EmptyNibbleRange) else: # Add argument paths - for w in paths: + for w in partialPaths: let (isLeaf,nibbles) = hexPrefixDecode w if not isLeaf: let rc = nibbles.hexaryPathNodeKey(rootKey, getFn, missingOk=false) if rc.isOk: reVisit.add (rc.value, nibbles) - while 0 < reVisit.len and numActions <= suspendAfter: + # Stopping on `suspendAfter` has precedence over `stopAtLevel` + while 0 < reVisit.len and result.count <= suspendAfter: when extraTraceMessages: trace "Hexary inspect processing", nPaths, maxLeafPaths, level=result.level, nReVisit=reVisit.len, nDangling=result.dangling.len @@ -271,13 +313,15 @@ proc hexaryInspectTrie*( break for n in 0 ..< reVisit.len: - let (parent,parentTrail) = reVisit[n] - if suspendAfter < numActions: + if suspendAfter < result.count or + maxDangling <= result.dangling.len: # Swallow rest again = again & reVisit[n ..< reVisit.len] break - let parentBlob = parent.to(Blob).getFn() + let + (parent, parentTrail) = reVisit[n] + parentBlob = parent.to(Blob).getFn() if parentBlob.len == 0: # Ooops, forget node and key continue @@ -301,7 +345,7 @@ proc hexaryInspectTrie*( # Ooops, forget node and key discard - numActions.inc + result.count.inc # End `for` result.level.inc @@ -319,6 +363,17 @@ proc hexaryInspectTrie*( level=result.level, nResumeCtx=reVisit.len, nDangling=result.dangling.len, maxLevel=stopAtLevel, stopped=result.stopped +# ------------------------------------------------------------------------------ +# Public functions, debugging +# ------------------------------------------------------------------------------ + +proc pp*(a: TrieNodeStat; db: HexaryTreeDbRef; maxItems = 30): string = + result = "(" & $a.level + if a.stopped: + result &= "stopped," + result &= $a.dangling.len & "," & + a.dangling.ppDangling(maxItems) & ")" + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/db/hexary_interpolate.nim b/nimbus/sync/snap/worker/db/hexary_interpolate.nim index 96bc76c0b..f172c2474 100644 --- a/nimbus/sync/snap/worker/db/hexary_interpolate.nim +++ b/nimbus/sync/snap/worker/db/hexary_interpolate.nim @@ -42,9 +42,6 @@ proc pp(w: seq[RPathXStep]; db: HexaryTreeDbRef; indent = 4): string = let pfx = "\n" & " ".repeat(indent) w.mapIt(it.pp(db)).join(pfx) -proc pp(rc: Result[TrieNodeStat, HexaryError]; db: HexaryTreeDbRef): string = - if rc.isErr: $rc.error else: rc.value.pp(db) - # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/db/hexary_nearby.nim b/nimbus/sync/snap/worker/db/hexary_nearby.nim index 357e73e7e..a4eddadbd 100644 --- a/nimbus/sync/snap/worker/db/hexary_nearby.nim +++ b/nimbus/sync/snap/worker/db/hexary_nearby.nim @@ -355,10 +355,10 @@ proc hexaryNearbyRight*( ## with non-decreasing path value). This is similar to the ## `hexary_path.next()` function, only that this algorithm does not ## backtrack if there are dangling links in between and rather returns - ## a error. + ## an error. ## - ## This code is intended be used for verifying a left-bound proof to verify - ## that there is no leaf node. + ## This code is intended to be used for verifying a left-bound proof to + ## verify that there is no leaf node *right* of a boundary path value. # Some easy cases if path.path.len == 0: @@ -543,8 +543,8 @@ proc hexaryNearbyLeft*( {.gcsafe, raises: [Defect,KeyError]} = ## Similar to `hexaryNearbyRight()`. ## - ## This code is intended be used for verifying a right-bound proof to verify - ## that there is no leaf node. + ## This code is intended to be used for verifying a right-bound proof to + ## verify that there is no leaf node *left* to a boundary path value. # Some easy cases if path.path.len == 0: diff --git a/nimbus/sync/snap/worker/db/notused/snapdb_check.nim b/nimbus/sync/snap/worker/db/notused/snapdb_check.nim deleted file mode 100644 index 69d3dd0f3..000000000 --- a/nimbus/sync/snap/worker/db/notused/snapdb_check.nim +++ /dev/null @@ -1,283 +0,0 @@ -# Nimbus -# Copyright (c) 2021 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or distributed -# except according to those terms. - -## Check/analyse DB completeness -## ============================= - -import - chronicles, - eth/[common, p2p, trie/trie_defs], - stew/keyed_queue, - ../../../../utils/prettify, - ../../../sync_desc, - "../.."/[range_desc, worker_desc], - "."/[hexary_desc, hexary_error, hexary_inspect, - snapdb_accounts, snapdb_storage_slots] - -{.push raises: [Defect].} - -logScope: - topics = "snap-db" - -const - extraTraceMessages = false or true - ## Enabled additional logging noise - -# ------------------------------------------------------------------------------ -# Private logging helpers -# ------------------------------------------------------------------------------ - -template logTxt(info: static[string]): static[string] = - "Check DB " & info - -proc accountsCtx( - buddy: SnapBuddyRef; - env: SnapPivotRef; - ): string = - let - ctx = buddy.ctx - "{" & - "pivot=" & "#" & $env.stateHeader.blockNumber & "," & - "nAccounts=" & $env.nAccounts & "," & - ("covered=" & env.fetchAccounts.processed.fullFactor.toPC(0) & "/" & - ctx.data.coveredAccounts.fullFactor.toPC(0)) & "," & - "nNodesCheck=" & $env.fetchAccounts.nodes.check.len & "," & - "nNodesMissing=" & $env.fetchAccounts.nodes.missing.len & "}" - -# ------------------------------------------------------------------------------ -# Private helpers -# ------------------------------------------------------------------------------ - -proc storageSlotsCtx( - buddy: SnapBuddyRef; - storageRoot: Hash256; - env: SnapPivotRef; - ): string = - let - ctx = buddy.ctx - rc = block: - let rcx = env.fetchStorageFull.eq(storageRoot) - if rcx.isOk: rcx - else: env.fetchStoragePart.eq(storageRoot) - if rc.isErr: - return "n/a" - let - data = rc.value - slots = data.slots - result = "{" & - "inherit=" & (if data.inherit: "t" else: "f") & "," - if not slots.isNil: - result &= "" & - "covered=" & slots.processed.fullFactor.toPC(0) & - "nNodesCheck=" & $slots.nodes.check.len & "," & - "nNodesMissing=" & $slots.nodes.missing.len - result &= "}" - -# ------------------------------------------------------------------------------ -# Private functions -# ------------------------------------------------------------------------------ - -proc checkStorageSlotsTrie( - buddy: SnapBuddyRef; - accKey: NodeKey; - storageRoot: Hash256; - env: SnapPivotRef; - ): Result[bool,HexaryError] = - ## Check whether a storage slots hexary trie is complete. - let - ctx = buddy.ctx - db = ctx.data.snapDb - peer = buddy.peer - - rc = db.inspectStorageSlotsTrie(peer, accKey, storageRoot) - - if rc.isErr: - return err(rc.error) - - ok(rc.value.dangling.len == 0) - - -iterator accountsWalk( - buddy: SnapBuddyRef; - env: SnapPivotRef; - ): (NodeKey,Account,HexaryError) = - let - ctx = buddy.ctx - db = ctx.data.snapDb - peer = buddy.peer - stateRoot = env.stateHeader.stateRoot - walk = SnapDbAccountsRef.init(db, stateRoot, peer) - - var - accKey = NodeKey.default - count = 0 - runOk = true - - while runOk: - count.inc - - let nextKey = block: - let rc = walk.nextAccountsChainDbKey(accKey) - if rc.isErr: - if rc.error != AccountNotFound: - error logTxt "accounts walk stopped", peer, - account=accKey.to(NodeTag), - ctx=buddy.accountsCtx(env), count, reason=rc.error - runOk = false - continue - rc.value - - accKey = nextKey - - let accData = block: - let rc = walk.getAccountsData(accKey, persistent = true) - if rc.isErr: - error logTxt "accounts walk error", peer, account=accKey, - ctx=buddy.accountsCtx(env), count, error=rc.error - runOk = false - continue - rc.value - - yield (accKey, accData, NothingSerious) - -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ - -proc checkAccountsTrieIsComplete*( - buddy: SnapBuddyRef; - env: SnapPivotRef; - ): bool = - ## Check whether accounts hexary trie is complete - let - ctx = buddy.ctx - peer = buddy.peer - db = ctx.data.snapDb - rootKey = env.stateHeader.stateRoot.to(NodeKey) - var - error: HexaryError - - try: - let stats = db.getAccountFn.hexaryInspectTrie(rootKey) - if not stats.stopped: - return stats.dangling.len == 0 - - error = TrieLoopAlert - except RlpError: - error = RlpEncoding - except KeyError as e: - raiseAssert "Not possible @ importRawAccountNodes: " & e.msg - except Exception as e: - raiseAssert "Ooops checkAccountsTrieIsComplete(): name=" & - $e.name & " msg=" & e.msg - - error logTxt "accounts health check failed", peer, - ctx=buddy.accountsCtx(env), error - return false - - -proc checkAccountsListOk*( - buddy: SnapBuddyRef; - env: SnapPivotRef; - noisy = false; - ): bool = - ## Loop over accounts, returns `false` for some error. - let - ctx = buddy.ctx - peer = buddy.peer - var - accounts = 0 - storage = 0 - nextMsgThresh = 1 - - for (key,accData,error) in buddy.accountsWalk(env): - - if error != NothingSerious: - error logTxt "accounts loop stopped", peer, ctx=buddy.accountsCtx(env), - accounts, storage, error - return false - - accounts.inc - if accData.storageRoot != emptyRlpHash: - storage.inc - - when extraTraceMessages: - if noisy and nextMsgThresh <= accounts: - debug logTxt "accounts loop check point", peer, - ctx=buddy.accountsCtx(env), accounts, storage - nextMsgThresh *= 2 - - when extraTraceMessages: - let isComplete = buddy.checkAccountsTrieIsComplete(env) - debug logTxt "accounts list report", peer, ctx=buddy.accountsCtx(env), - accounts, storage, isComplete - - true - - -proc checkStorageSlotsTrieIsComplete*( - buddy: SnapBuddyRef; - accKey: NodeKey; - storageRoot: Hash256; - env: SnapPivotRef; - ): bool = - ## Check whether a storage slots hexary trie is complete. - let - peer = buddy.peer - rc = buddy.checkStorageSlotsTrie(accKey, storageRoot, env) - if rc.isOk: - return rc.value - - when extraTraceMessages: - let nStoQueue = env.fetchStorageFull.len + env.fetchStoragePart.len - debug logTxt "atorage slots health check failed", peer, nStoQueue, - ctx=buddy.storageSlotsCtx(storageRoot, env), error=rc.error - -proc checkStorageSlotsTrieIsComplete*( - buddy: SnapBuddyRef; - env: SnapPivotRef; - ): bool = - ## Check for all accounts thye whether storage slot hexary tries are complete. - let - ctx = buddy.ctx - peer = buddy.peer - var - accounts = 0 - incomplete = 0 - complete = 0 - - for (accKey,accData,error) in buddy.accountsWalk(env): - if error != NothingSerious: - let nStoQueue = env.fetchStorageFull.len + env.fetchStoragePart.len - error logTxt "atorage slots accounts loop stopped", peer, nStoQueue, - accounts, incomplete, complete, error - return false - - accounts.inc - let storageRoot = accData.storageRoot - if storageRoot == emptyRlpHash: - continue - - let rc = buddy.checkStorageSlotsTrie(accKey, storageRoot, env) - if rc.isOk and rc.value: - complete.inc - else: - incomplete.inc - - when extraTraceMessages: - let nStoQueue = env.fetchStorageFull.len + env.fetchStoragePart.len - debug logTxt "storage slots report", peer, ctx=buddy.accountsCtx(env), - nStoQueue, accounts, incomplete, complete - - 0 < accounts and incomplete == 0 - -# ------------------------------------------------------------------------------ -# End -# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim b/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim index f6bcdd1c5..523830ced 100644 --- a/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim +++ b/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim @@ -490,34 +490,6 @@ proc inspectStorageSlotsTrie*( pathList, resumeCtx, suspendAfter, persistent=true, ignoreError) -proc getStorageSlotsNodeKey*( - ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor - path: Blob; ## Partial node path - persistent = false; ## Read data from disk - ): Result[NodeKey,HexaryError] = - ## For a partial node path argument `path`, return the raw node key. - var rc: Result[NodeKey,void] - noRlpExceptionOops("getStorageSlotsNodeKey()"): - if persistent: - rc = path.hexarypathNodeKey(ps.root, ps.getStorageSlotsFn) - else: - rc = path.hexarypathNodeKey(ps.root, ps.hexaDb) - if rc.isOk: - return ok(rc.value) - err(NodeNotFound) - -proc getStorageSlotsNodeKey*( - pv: SnapDbRef; ## Base descriptor on `ChainDBRef` - peer: Peer; ## For log messages, only - accKey: NodeKey; ## Account key - root: Hash256; ## state root - path: Blob; ## Partial node path - ): Result[NodeKey,HexaryError] = - ## Variant of `getStorageSlotsNodeKey()` for persistent storage. - SnapDbStorageSlotsRef.init( - pv, accKey, root, peer).getStorageSlotsNodeKey(path, persistent=true) - - proc getStorageSlotsData*( ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor path: NodeKey; ## Account to visit @@ -553,31 +525,6 @@ proc getStorageSlotsData*( SnapDbStorageSlotsRef.init( pv, accKey, root, peer).getStorageSlotsData(path, persistent=true) - -proc haveStorageSlotsData*( - ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor - persistent = false; ## Read data from disk - ): bool = - ## Return `true` if there is at least one intermediate hexary node for this - ## accounts storage slots trie. - ## - ## Caveat: There is no unit test yet - noGenericExOrKeyError("haveStorageSlotsData()"): - if persistent: - return 0 < ps.getStorageSlotsFn()(ps.root.ByteArray32).len - else: - return ps.hexaDb.tab.hasKey(ps.root.to(RepairKey)) - -proc haveStorageSlotsData*( - pv: SnapDbRef; ## Base descriptor on `ChainDBRef` - peer: Peer, ## For log messages, only - accKey: NodeKey; ## Account key - root: Hash256; ## state root - ): bool = - ## Variant of `haveStorageSlotsData()` for persistent storage. - SnapDbStorageSlotsRef.init( - pv, accKey, root, peer).haveStorageSlotsData(persistent=true) - # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/pivot.nim b/nimbus/sync/snap/worker/pivot.nim index a9d87e766..a6534d05f 100644 --- a/nimbus/sync/snap/worker/pivot.nim +++ b/nimbus/sync/snap/worker/pivot.nim @@ -9,8 +9,7 @@ # except according to those terms. import - std/[math, sequtils], - bearssl/rand, + std/[math, sets, sequtils], chronos, eth/[common, trie/trie_defs], stew/[interval_set, keyed_queue, sorted_set], @@ -18,7 +17,8 @@ import ".."/[constants, range_desc, worker_desc], ./db/[hexary_error, snapdb_accounts, snapdb_pivot], ./pivot/[heal_accounts, heal_storage_slots, - range_fetch_accounts, range_fetch_storage_slots], + range_fetch_accounts, range_fetch_storage_slots, + storage_queue_helper], ./ticker {.push raises: [Defect].} @@ -27,13 +27,32 @@ const extraAsserts = false or true ## Enable some asserts -proc pivotAccountsHealingOk*(env: SnapPivotRef;ctx: SnapCtxRef): bool {.gcsafe.} proc pivotMothball*(env: SnapPivotRef) {.gcsafe.} # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ +proc accountsHealingOk( + env: SnapPivotRef; # Current pivot environment + ctx: SnapCtxRef; # Some global context + ): bool = + ## Returns `true` if accounts healing is enabled for this pivot. + not env.fetchAccounts.processed.isEmpty and + healAccountsCoverageTrigger <= ctx.pivotAccountsCoverage() + + +proc coveredAccounts100PcRollOver( + ctx: SnapCtxRef; + ) = + ## Roll over `coveredAccounts` registry when it reaches 100%. + if ctx.data.coveredAccounts.isFull: + # All of accounts hashes are covered by completed range fetch processes + # for all pivot environments. So reset covering and record full-ness level. + ctx.data.covAccTimesFull.inc + ctx.data.coveredAccounts.clear() + + proc init( batch: SnapRangeBatchRef; stateRoot: Hash256; @@ -46,11 +65,7 @@ proc init( # Initialise accounts range fetch batch, the pair of `fetchAccounts[]` # range sets. - if ctx.data.coveredAccounts.isFull: - # All of accounts hashes are covered by completed range fetch processes - # for all pivot environments. So reset covering and record full-ness level. - ctx.data.covAccTimesFull.inc - ctx.data.coveredAccounts.clear() + ctx.coveredAccounts100PcRollOver() # Deprioritise already processed ranges by moving it to the second set. for iv in ctx.data.coveredAccounts.increasing: @@ -96,7 +111,7 @@ proc update*( # Calculate minimum block distance. let minBlockDistance = block: let rc = pivotTable.lastValue - if rc.isOk and rc.value.pivotAccountsHealingOk(ctx): + if rc.isOk and rc.value.accountsHealingOk(ctx): pivotBlockDistanceThrottledPivotChangeMin else: pivotBlockDistanceMin @@ -112,7 +127,6 @@ proc update*( fetchAccounts: SnapRangeBatchRef()) env.fetchAccounts.init(header.stateRoot, ctx) env.storageAccounts.init() - var topEnv = env # Append per-state root environment to LRU queue if reverse: @@ -124,20 +138,10 @@ proc update*( let rc = pivotTable.secondKey if rc.isOk: pivotTable.del rc.value - - # Update healing threshold for top pivot entry - topEnv = pivotTable.lastValue.value - else: discard pivotTable.lruAppend( header.stateRoot, env, pivotTableLruEntriesMax) - # Update healing threshold - let - slots = max(0, healAccountsPivotTriggerNMax - pivotTable.len) - delta = slots.float * healAccountsPivotTriggerWeight - topEnv.healThresh = healAccountsPivotTriggerMinFactor + delta - proc tickerStats*( pivotTable: var SnapPivotTable; # Pivot table @@ -183,8 +187,8 @@ proc tickerStats*( procChunks = 0 if not env.isNil: pivotBlock = some(env.stateHeader.blockNumber) - stoQuLen = some(env.fetchStorageFull.len + env.fetchStoragePart.len) procChunks = env.fetchAccounts.processed.chunks + stoQuLen = some(env.storageQueueTotal()) TickerStats( pivotBlock: pivotBlock, @@ -223,25 +227,6 @@ proc pivotMothball*(env: SnapPivotRef) = env.archived = true -proc pivotAccountsHealingOk*( - env: SnapPivotRef; # Current pivot environment - ctx: SnapCtxRef; # Some global context - ): bool = - ## Returns `true` if accounts healing is enabled for this pivot. - ## - # Only start accounts healing if there is some completion level, already. - # - # We check against the global coverage factor, i.e. a measure for how much - # of the total of all accounts have been processed. Even if the hexary trie - # database for the current pivot state root is sparsely filled, there is a - # good chance that it can inherit some unchanged sub-trie from an earlier - # pivot state root download. The healing process then works like sort of - # glue. - if healAccountsCoverageTrigger <= ctx.pivotAccountsCoverage(): - if env.healThresh <= env.fetchAccounts.processed.fullFactor: - return true - - proc execSnapSyncAction*( env: SnapPivotRef; # Current pivot environment buddy: SnapBuddyRef; # Worker peer @@ -253,7 +238,7 @@ proc execSnapSyncAction*( block: # Clean up storage slots queue first it it becomes too large let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len - if snapStorageSlotsQuPrioThresh < nStoQu: + if storageSlotsQuPrioThresh < nStoQu: await buddy.rangeFetchStorageSlots(env) if buddy.ctrl.stopped or env.archived: return @@ -261,6 +246,9 @@ proc execSnapSyncAction*( if not env.fetchAccounts.processed.isFull: await buddy.rangeFetchAccounts(env) + # Update 100% accounting + ctx.coveredAccounts100PcRollOver() + # Run at least one round fetching storage slosts even if the `archived` # flag is set in order to keep the batch queue small. if not buddy.ctrl.stopped: @@ -269,7 +257,7 @@ proc execSnapSyncAction*( if buddy.ctrl.stopped or env.archived: return - if env.pivotAccountsHealingOk(ctx): + if env.accountsHealingOk(ctx): await buddy.healAccounts(env) if buddy.ctrl.stopped or env.archived: return @@ -282,7 +270,7 @@ proc execSnapSyncAction*( # Don't bother with storage slots healing before accounts healing takes # place. This saves communication bandwidth. The pivot might change soon, # anyway. - if env.pivotAccountsHealingOk(ctx): + if env.accountsHealingOk(ctx): await buddy.healStorageSlots(env) @@ -295,12 +283,12 @@ proc saveCheckpoint*( ## let fa = env.fetchAccounts - nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len + nStoQu = env.storageQueueTotal() - if snapAccountsSaveProcessedChunksMax < fa.processed.chunks: + if accountsSaveProcessedChunksMax < fa.processed.chunks: return err(TooManyProcessedChunks) - if snapAccountsSaveStorageSlotsMax < nStoQu: + if accountsSaveStorageSlotsMax < nStoQu: return err(TooManySlotAccounts) ctx.data.snapDb.savePivot SnapDbPivotRegistry( @@ -310,7 +298,8 @@ proc saveCheckpoint*( processed: toSeq(env.fetchAccounts.processed.increasing) .mapIt((it.minPt,it.maxPt)), slotAccounts: (toSeq(env.fetchStorageFull.nextKeys) & - toSeq(env.fetchStoragePart.nextKeys)).mapIt(it.to(NodeKey))) + toSeq(env.fetchStoragePart.nextKeys)).mapIt(it.to(NodeKey)) & + toSeq(env.parkedStorage.items)) proc recoverPivotFromCheckpoint*( @@ -350,9 +339,7 @@ proc recoverPivotFromCheckpoint*( discard env.fetchAccounts.processed.reduce pt env.fetchAccounts.unprocessed.merge pt elif rc.value.storageRoot != emptyRlpHash: - env.fetchStorageFull.merge AccountSlotsHeader( - accKey: w, - storageRoot: rc.value.storageRoot) + env.storageQueueAppendFull(rc.value.storageRoot, w) # Handle mothballed pivots for swapping in (see `pivotMothball()`) if not topLevel: diff --git a/nimbus/sync/snap/worker/pivot/heal_accounts.nim b/nimbus/sync/snap/worker/pivot/heal_accounts.nim index 37fb712e8..51ca47dae 100644 --- a/nimbus/sync/snap/worker/pivot/heal_accounts.nim +++ b/nimbus/sync/snap/worker/pivot/heal_accounts.nim @@ -8,8 +8,8 @@ # at your option. This file may not be copied, modified, or distributed # except according to those terms. -## Heal accounts DB: -## ================= +## Heal accounts DB +## ================ ## ## This module is a variation of the `swap-in` module in the sense that it ## searches for missing nodes in the database (which means that links to @@ -22,32 +22,34 @@ ## * Run `swapInAccounts()` so that inheritable sub-tries are imported from ## previous pivots. ## -## * Find nodes with envelopes that have no account in common with any range -## interval of the `processed` set of the current pivot. Stop if there are -## no such nodes. +## * Find dangling nodes in the current account trie by trying plan A, and +## continuing with plan B only if A fails. ## -## * Extract the missing nodes from the previous step, i.e. the nodes that -## are known to exist but are not allocated. If all nodes are allocated, -## employ the `hexaryInspect()` function in a limited mode do find dangling -## (i.e. missing) sub-nodes of these allocated nodes. Stop if this function -## fails to find any such nodes. +## A. Try to find nodes with envelopes that have no account in common with +## any range interval of the `processed` set of the accounts trie. This +## action will +## + either determine that there are no such envelopes implying that the +## accounts trie is complete (then stop here) +## + or result in envelopes related to nodes that are all allocated on the +## accounts trie (fail, use *plan B* below) +## + or result in some envelopes related to dangling nodes. ## -## * From the nodes of the previous step, extract non-allocated nodes and -## fetch them from the network. +## B. Employ the `hexaryInspect()` trie perusal function in a limited mode +## for finding dangling (i.e. missing) sub-nodes below the allocated nodes. ## ## * Install that nodes from the network. ## ## * Rinse and repeat ## -## Discussion: -## ----------- +## Discussion +## ---------- ## -## The worst case scenario in the third step might also be solved by allocating -## more accounts and running this healing algorith again. +## A worst case scenario of a failing *plan B* must be solved by fetching and +## storing more accounts and running this healing algorithm again. ## -## Due to its potentially poor performance there is no way to recursively -## search the whole database hexary trie for more dangling nodes using the -## `hexaryInspect()` function. +## Due to the potentially poor performance using `hexaryInspect()`.there is no +## general solution for *plan B* by recursively searching the whole accounts +## hexary trie database for more dangling nodes. ## import std/[math, sequtils, tables], @@ -61,7 +63,7 @@ import ../com/[com_error, get_trie_nodes], ../db/[hexary_desc, hexary_envelope, hexary_error, hexary_inspect, snapdb_accounts], - ./swap_in + "."/[storage_queue_helper, swap_in] {.push raises: [Defect].} @@ -129,42 +131,55 @@ proc compileMissingNodesList( buddy: SnapBuddyRef; env: SnapPivotRef; ): seq[NodeSpecs] = - ## Find some missing glue nodes in current database to be fetched - ## individually. + ## Find some missing glue nodes in accounts database to be fetched. let ctx = buddy.ctx + peer = buddy.peer rootKey = env.stateHeader.stateRoot.to(NodeKey) getFn = ctx.data.snapDb.getAccountFn fa = env.fetchAccounts # Import from earlier run - while buddy.swapInAccounts(env) != 0: - discard + while buddy.ctx.swapInAccounts(env) != 0: + if buddy.ctrl.stopped: + return var nodes: seq[NodeSpecs] - noExceptionOops("getMissingNodesList"): + noExceptionOops("compileMissingNodesList"): # Get unallocated nodes to be fetched let rc = fa.processed.hexaryEnvelopeDecompose(rootKey, getFn) if rc.isOk: nodes = rc.value + # Check whether the hexary trie is complete + if nodes.len == 0: + # Fill gaps + discard fa.processed.merge(low(NodeTag),high(NodeTag)) + fa.unprocessed.clear() + return + # Remove allocated nodes - let missingNodes = nodes.filterIt(it.nodeKey.ByteArray32.getFn().len == 0) - if 0 < missingNodes.len: + let missing = nodes.filterIt(it.nodeKey.ByteArray32.getFn().len == 0) + if 0 < missing.len: when extraTraceMessages: - trace logTxt "missing nodes", ctx=buddy.healingCtx(env), - nResult=missingNodes.len, result=missingNodes.toPC - return missingNodes + trace logTxt "missing nodes", peer, ctx=buddy.healingCtx(env), + nResult=missing.len, result=missing.toPC + return missing # Plan B, carefully employ `hexaryInspect()` if 0 < nodes.len: try: - let stats = getFn.hexaryInspectTrie( - rootKey, nodes.mapIt(it.partialPath), suspendAfter=healInspectionBatch) - if 0 < stats.dangling.len: - trace logTxt "missing nodes (plan B)", ctx=buddy.healingCtx(env), + let + paths = nodes.mapIt it.partialPath + stats = getFn.hexaryInspectTrie(rootKey, paths, + stopAtLevel = healAccountsInspectionPlanBLevel, + maxDangling = fetchRequestTrieNodesMax) + result = stats.dangling + + when extraTraceMessages: + trace logTxt "missing nodes (plan B)", peer, ctx=buddy.healingCtx(env), + nLevel=stats.level, nVisited=stats.count, nResult=stats.dangling.len, result=stats.dangling.toPC - return stats.dangling except: discard @@ -184,7 +199,7 @@ proc fetchMissingNodes( pivot = "#" & $env.stateHeader.blockNumber # for logging nMissingNodes= missingNodes.len - nFetchNodes = max(0, nMissingNodes - snapRequestTrieNodesFetchMax) + nFetchNodes = max(0, nMissingNodes - fetchRequestTrieNodesMax) # There is no point in fetching too many nodes as it will be rejected. So # rest of the `missingNodes` list is ignored to be picked up later. @@ -276,9 +291,7 @@ proc registerAccountLeaf( # Update storage slots batch if acc.storageRoot != emptyRlpHash: - env.fetchStorageFull.merge AccountSlotsHeader( - acckey: accKey, - storageRoot: acc.storageRoot) + env.storageQueueAppendFull(acc.storageRoot, accKey) # ------------------------------------------------------------------------------ # Private functions: do the healing for one round @@ -297,9 +310,12 @@ proc accountsHealingImpl( peer = buddy.peer fa = env.fetchAccounts - # Update for changes since last visit - missingNodes = buddy.compileMissingNodesList(env) + # Import from earlier runs (if any) + while ctx.swapInAccounts(env) != 0: + discard + # Update for changes since last visit + let missingNodes = buddy.compileMissingNodesList(env) if missingNodes.len == 0: # Nothing to do trace logTxt "nothing to do", peer, ctx=buddy.healingCtx(env) @@ -370,8 +386,8 @@ proc healAccounts*( var nNodesFetched = 0 nFetchLoop = 0 - # Stop after `healAccountsBatchFetchMax` nodes have been fetched - while nNodesFetched < healAccountsBatchFetchMax: + # Stop after `healAccountsBatchMax` nodes have been fetched + while nNodesFetched < healAccountsBatchMax: var nNodes = await buddy.accountsHealingImpl(env) if nNodes <= 0: break diff --git a/nimbus/sync/snap/worker/pivot/heal_storage_slots.nim b/nimbus/sync/snap/worker/pivot/heal_storage_slots.nim index c15643206..f66af1761 100644 --- a/nimbus/sync/snap/worker/pivot/heal_storage_slots.nim +++ b/nimbus/sync/snap/worker/pivot/heal_storage_slots.nim @@ -8,34 +8,63 @@ # at your option. This file may not be copied, modified, or distributed # except according to those terms. -## Heal storage DB: -## ================ +## Heal storage slots DB +## ===================== ## ## This module works similar to `heal_accounts` applied to each per-account ## storage slots hexary trie. These per-account trie work items are stored in -## the pair of queues `env.fetchStorageFull` and `env.fetchStoragePart`. +## the queue `env.fetchStoragePart`. ## -## There is one additional short cut for speeding up processing. If a -## per-account storage slots hexary trie is marked inheritable, it will be -## checked whether it is complete and can be used wholesale. +## Theere is another such queue `env.fetchStorageFull` which is not used here. ## -## Inheritable tries appear after a pivot state root change. Typically, not all -## account data have changed and so the same per-account storage slots are -## valid. +## In order to be able to checkpoint the current list of storage accounts (by +## a parallel running process), unfinished storage accounts are temporarily +## held in the set `env.parkedStorage`. +## +## Algorithm applied to each entry of `env.fetchStoragePart` +## -------------------------------------------------------- +## +## * Find dangling nodes in the current slot trie by trying plan A, and +## continuing with plan B only if A fails. +## +## A. Try to find nodes with envelopes that have no slot in common with +## any range interval of the `processed` set of the current slot trie. This +## action will +## + either determine that there are no such envelopes implying that the +## current slot trie is complete (then stop here) +## + or result in envelopes related to nodes that are all allocated on the +## current slot trie (fail, use *plan B* below) +## + or result in some envelopes related to dangling nodes. +## +## B. Employ the `hexaryInspect()` trie perusal function in a limited mode +## for finding dangling (i.e. missing) sub-nodes below the allocated nodes. +## +## * Install that nodes from the network. +## +## * Rinse and repeat +## +## Discussion +## ---------- +## +## A worst case scenario of a failing *plan B* must be solved by fetching +## and storing more slots and running this healing algorithm again. +## +## Due to the potentially poor performance using `hexaryInspect()`.there is +## no general solution for *plan B* by recursively searching the whole slot +## hexary trie database for more dangling nodes. ## - import - std/[sequtils, tables], + std/[math, sequtils, tables], chronicles, chronos, eth/[common, p2p, trie/nibbles, trie/trie_defs, rlp], - stew/[interval_set, keyed_queue], + stew/[byteutils, interval_set, keyed_queue], ../../../../utils/prettify, - ../../../sync_desc, + "../../.."/[sync_desc, types], "../.."/[constants, range_desc, worker_desc], ../com/[com_error, get_trie_nodes], - ../db/[hexary_desc, hexary_error, snapdb_storage_slots], - ./sub_tries_helper + ../db/[hexary_desc, hexary_envelope, hexary_inspect, snapdb_storage_slots], + ./storage_queue_helper {.push raises: [Defect].} @@ -53,136 +82,132 @@ const template logTxt(info: static[string]): static[string] = "Storage slots healing " & info +proc `$`(node: NodeSpecs): string = + node.partialPath.toHex + +proc `$`(rs: NodeTagRangeSet): string = + rs.fullFactor.toPC(0) + +proc `$`(iv: NodeTagRange): string = + iv.fullFactor.toPC(3) + +proc toPC(w: openArray[NodeSpecs]; n: static[int] = 3): string = + let sumUp = w.mapIt(it.hexaryEnvelope.len).foldl(a+b, 0.u256) + (sumUp.to(float) / (2.0^256)).toPC(n) + +proc healingCtx( + buddy: SnapBuddyRef; + env: SnapPivotRef; + ): string = + "{" & + "pivot=" & "#" & $env.stateHeader.blockNumber & "," & + "runState=" & $buddy.ctrl.state & "," & + "nStoQu=" & $env.storageQueueTotal() & "," & + "nSlotLists=" & $env.nSlotLists & "}" + proc healingCtx( buddy: SnapBuddyRef; kvp: SnapSlotsQueuePair; env: SnapPivotRef; ): string = - let slots = kvp.data.slots "{" & "pivot=" & "#" & $env.stateHeader.blockNumber & "," & - "covered=" & slots.unprocessed.emptyFactor.toPC(0) & "," & - "nNodesCheck=" & $slots.nodes.check.len & "," & - "nNodesMissing=" & $slots.nodes.missing.len & "}" + "runState=" & $buddy.ctrl.state & "," & + "covered=" & $kvp.data.slots.processed & "," & + "nStoQu=" & $env.storageQueueTotal() & "," & + "nSlotLists=" & $env.nSlotLists & "}" + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +#template discardRlpError(info: static[string]; code: untyped) = +# try: +# code +# except RlpError as e: +# discard + +template noExceptionOops(info: static[string]; code: untyped) = + try: + code + except Defect as e: + raise e + except Exception as e: + raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ -proc acceptWorkItemAsIs( - buddy: SnapBuddyRef; - kvp: SnapSlotsQueuePair; - ): Result[bool,HexaryError] = - ## Check whether this work item is done and the corresponding storage trie - ## can be completely inherited. - if kvp.data.inherit: - let - ctx = buddy.ctx - peer = buddy.peer - db = ctx.data.snapDb - accKey = kvp.data.accKey - storageRoot = kvp.key - - rc = db.inspectStorageSlotsTrie(peer, accKey, storageRoot) - - # Check whether the hexary trie is complete - if rc.isOk: - return ok(rc.value.dangling.len == 0) - - return err(rc.error) - - ok(false) - - -proc verifyStillMissingNodes( +proc compileMissingNodesList( buddy: SnapBuddyRef; kvp: SnapSlotsQueuePair; env: SnapPivotRef; - ) = - ## Check whether previously missing nodes from the `nodes.missing` list - ## have been magically added to the database since it was checked last - ## time. These nodes will me moved to `nodes.check` for further processing. + ): seq[NodeSpecs] = + ## Find some missing glue nodes in storage slots database to be fetched. let ctx = buddy.ctx - db = ctx.data.snapDb peer = buddy.peer - accKey = kvp.data.accKey - storageRoot = kvp.key slots = kvp.data.slots + rootKey = kvp.key.to(NodeKey) + getFn = ctx.data.snapDb.getStorageSlotsFn(kvp.data.accKey) - var delayed: seq[NodeSpecs] - for w in slots.nodes.missing: - let rc = db.getStorageSlotsNodeKey(peer, accKey, storageRoot, w.partialPath) - if rc.isOk: - # Check nodes for dangling links - slots.nodes.check.add w - else: - # Node is still missing - delayed.add w + var nodes: seq[NodeSpecs] + noExceptionOops("compileMissingNodesList"): + if not slots.processed.isEmpty: + # Get unallocated nodes to be fetched + let rc = slots.processed.hexaryEnvelopeDecompose(rootKey, getFn) + if rc.isOk: + nodes = rc.value - # Must not modify sequence while looping over it - slots.nodes.missing = slots.nodes.missing & delayed + # Check whether the hexary trie is complete + if nodes.len == 0: + # Fill gaps + discard slots.processed.merge(low(NodeTag),high(NodeTag)) + slots.unprocessed.clear() + return + + # Remove allocated nodes + let missing = nodes.filterIt(it.nodeKey.ByteArray32.getFn().len == 0) + if 0 < missing.len: + when extraTraceMessages: + trace logTxt "missing nodes", peer, ctx=buddy.healingCtx(kvp,env), + nResult=missing.len, result=missing.toPC + return missing + + # Plan B, carefully employ `hexaryInspect()` + if 0 < nodes.len: + try: + let + paths = nodes.mapIt it.partialPath + stats = getFn.hexaryInspectTrie(rootKey, paths, + stopAtLevel = healStorageSlotsInspectionPlanBLevel, + maxDangling = fetchRequestTrieNodesMax) + result = stats.dangling + + when extraTraceMessages: + trace logTxt "missing nodes (plan B)", peer, + ctx=buddy.healingCtx(kvp,env), nLevel=stats.level, + nVisited=stats.count, nResult=stats.dangling.len + except: + discard -proc updateMissingNodesList( - buddy: SnapBuddyRef; - kvp: SnapSlotsQueuePair; - env: SnapPivotRef; - ): Future[bool] - {.async.} = - ## Starting with a given set of potentially dangling intermediate trie nodes - ## `nodes.check`, this set is filtered and processed. The outcome is fed back - ## to the vey same list `nodes.check`. - let - ctx = buddy.ctx - db = ctx.data.snapDb - peer = buddy.peer - accKey = kvp.data.accKey - storageRoot = kvp.key - slots = kvp.data.slots - - let rc = await db.getStorageSlotsFn(accKey).subTriesFromPartialPaths( - storageRoot, # State root related to storage slots - slots, # Storage slots download specs - snapRequestTrieNodesFetchMax) # Maxinmal datagram request size - if rc.isErr: - let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len - if rc.error == TrieIsLockedForPerusal: - trace logTxt "failed", peer, itCtx=buddy.healingCtx(kvp,env), - nSlotLists=env.nSlotLists, nStorageQueue, error=rc.error - else: - error logTxt "failed => stop", peer, itCtx=buddy.healingCtx(kvp,env), - nSlotLists=env.nSlotLists, nStorageQueue, error=rc.error - # Attempt to switch pivot, there is not much else one can do here - buddy.ctrl.zombie = true - return false - - return true - - -proc getMissingNodesFromNetwork( +proc getNodesFromNetwork( buddy: SnapBuddyRef; kvp: SnapSlotsQueuePair; + missing: seq[NodeSpecs]; env: SnapPivotRef; ): Future[seq[NodeSpecs]] {.async.} = - ## Extract from `nodes.missing` the next batch of nodes that need + ## Extract from `missing` the next batch of nodes that need ## to be merged it into the database let ctx = buddy.ctx peer = buddy.peer - accKey = kvp.data.accKey + accPath = kvp.data.accKey.to(Blob) storageRoot = kvp.key - pivot = "#" & $env.stateHeader.blockNumber # for logging - slots = kvp.data.slots - - nSickSubTries = slots.nodes.missing.len - inxLeft = max(0, nSickSubTries - snapRequestTrieNodesFetchMax) - - # There is no point in processing too many nodes at the same time. So leave - # the rest on the `nodes.missing` queue to be handled later. - let fetchNodes = slots.nodes.missing[inxLeft ..< nSickSubTries] - slots.nodes.missing.setLen(inxLeft) + fetchNodes = missing[0 ..< fetchRequestTrieNodesMax] # Initalise for `getTrieNodes()` for fetching nodes from the network var @@ -192,148 +217,39 @@ proc getMissingNodesFromNetwork( pathList.add @[w.partialPath] nodeKey[w.partialPath] = w.nodeKey - # Fetch nodes from the network. Note that the remainder of the `nodes.missing` - # list might be used by another process that runs semi-parallel. + # Fetch nodes from the network. let - req = @[accKey.to(Blob)] & fetchNodes.mapIt(it.partialPath) + pivot = "#" & $env.stateHeader.blockNumber # for logging + req = @[accPath] & fetchNodes.mapIt(it.partialPath) rc = await buddy.getTrieNodes(storageRoot, @[req], pivot) if rc.isOk: # Reset error counts for detecting repeated timeouts, network errors, etc. buddy.data.errors.resetComError() - # Register unfetched missing nodes for the next pass - for w in rc.value.leftOver: - for n in 1 ..< w.len: - slots.nodes.missing.add NodeSpecs( - partialPath: w[n], - nodeKey: nodeKey[w[n]]) return rc.value.nodes.mapIt(NodeSpecs( partialPath: it.partialPath, nodeKey: nodeKey[it.partialPath], data: it.data)) - # Restore missing nodes list now so that a task switch in the error checker - # allows other processes to access the full `nodes.missing` list. - slots.nodes.missing = slots.nodes.missing & fetchNodes - let error = rc.error if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): - discard when extraTraceMessages: - let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len trace logTxt "fetch nodes error => stop", peer, - itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists, - nStorageQueue, error - else: - discard - when extraTraceMessages: - let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len - trace logTxt "fetch nodes error", peer, - itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists, - nStorageQueue, error - - return @[] + ctx=buddy.healingCtx(kvp,env), error -proc kvStorageSlotsLeaf( - buddy: SnapBuddyRef; - kvp: SnapSlotsQueuePair; - node: NodeSpecs; - env: SnapPivotRef; - ): (bool,NodeKey) - {.gcsafe, raises: [Defect,RlpError]} = +proc slotKey(node: NodeSpecs): (bool,NodeKey) = ## Read leaf node from persistent database (if any) - let - peer = buddy.peer - - nodeRlp = rlpFromBytes node.data - (_,prefix) = hexPrefixDecode node.partialPath - (_,segment) = hexPrefixDecode nodeRlp.listElem(0).toBytes - nibbles = prefix & segment - if nibbles.len == 64: - return (true, nibbles.getBytes.convertTo(NodeKey)) - - -proc registerStorageSlotsLeaf( - buddy: SnapBuddyRef; - kvp: SnapSlotsQueuePair; - slotKey: NodeKey; - env: SnapPivotRef; - ) = - ## Process single trie node as would be done with an interval by - ## the `storeStorageSlots()` function - let - peer = buddy.peer - slots = kvp.data.slots - pt = slotKey.to(NodeTag) - - # Find range set (from list) containing `pt` - var ivSet: NodeTagRangeSet - block foundCoveringRange: - for w in slots.unprocessed: - if 0 < w.covered(pt,pt): - ivSet = w - break foundCoveringRange - return # already processed, forget this account leaf - - # Register this isolated leaf node that was added - discard ivSet.reduce(pt,pt) - - when extraTraceMessages: - let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len - trace logTxt "single node", peer, - itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists, - nStorageQueue, slotKey=pt - - -proc assembleWorkItemsQueue( - buddy: SnapBuddyRef; - env: SnapPivotRef; - ): (seq[SnapSlotsQueuePair],int) = - ## .. - var - toBeHealed: seq[SnapSlotsQueuePair] - nAcceptedAsIs = 0 - - # Search the current slot item batch list for items to complete via healing - for kvp in env.fetchStoragePart.nextPairs: - # Marked items indicate that a partial sub-trie existsts which might have - # been inherited from an earlier storage root. - if kvp.data.inherit: - - # Remove `kvp` work item from the queue object (which is allowed within a - # `for` loop over a `KeyedQueue` object type.) - env.fetchStorageFull.del(kvp.key) - - # With some luck, the `kvp` work item refers to a complete storage trie - # that can be be accepted as-is in wich case `kvp` can be just dropped. - let rc = buddy.acceptWorkItemAsIs(kvp) - if rc.isOk and rc.value: - env.nSlotLists.inc - nAcceptedAsIs.inc # for logging - continue # dropping `kvp` - - toBeHealed.add kvp - if healStorageSlotsBatchMax <= toBeHealed.len: - return (toBeHealed, nAcceptedAsIs) - - # Ditto for partial items queue - for kvp in env.fetchStoragePart.nextPairs: - if healSlorageSlotsTrigger <= kvp.data.slots.unprocessed.emptyFactor: - env.fetchStoragePart.del(kvp.key) - - let rc = buddy.acceptWorkItemAsIs(kvp) - if rc.isOk and rc.value: - env.nSlotLists.inc - nAcceptedAsIs.inc # for logging - continue # dropping `kvp` - - # Add to local batch to be processed, below - toBeHealed.add kvp - if healStorageSlotsBatchMax <= toBeHealed.len: - break - - (toBeHealed, nAcceptedAsIs) + try: + let + nodeRlp = rlpFromBytes node.data + (_,prefix) = hexPrefixDecode node.partialPath + (_,segment) = hexPrefixDecode nodeRlp.listElem(0).toBytes + nibbles = prefix & segment + if nibbles.len == 64: + return (true, nibbles.getBytes.convertTo(NodeKey)) + except: + discard # ------------------------------------------------------------------------------ # Private functions: do the healing for one work item (sub-trie) @@ -343,131 +259,62 @@ proc storageSlotsHealing( buddy: SnapBuddyRef; kvp: SnapSlotsQueuePair; env: SnapPivotRef; - ): Future[bool] - {.async.} = + ) {.async.} = ## Returns `true` is the sub-trie is complete (probably inherited), and ## `false` if there are nodes left to be completed. let ctx = buddy.ctx db = ctx.data.snapDb peer = buddy.peer - accKey = kvp.data.accKey - slots = kvp.data.slots + missing = buddy.compileMissingNodesList(kvp, env) + + if missing.len == 0: + trace logTxt "nothing to do", peer, ctx=buddy.healingCtx(kvp,env) + return when extraTraceMessages: - block: - let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len - trace logTxt "started", peer, itCtx=buddy.healingCtx(kvp,env), - nSlotLists=env.nSlotLists, nStorageQueue - - # Update for changes since last visit - buddy.verifyStillMissingNodes(kvp, env) - - # ??? - if slots.nodes.check.len != 0: - if not await buddy.updateMissingNodesList(kvp,env): - return false - - # Check whether the trie is complete. - if slots.nodes.missing.len == 0: - trace logTxt "complete", peer, itCtx=buddy.healingCtx(kvp,env) - return true + trace logTxt "started", peer, ctx=buddy.healingCtx(kvp,env) # Get next batch of nodes that need to be merged it into the database - let nodeSpecs = await buddy.getMissingNodesFromNetwork(kvp,env) + let nodeSpecs = await buddy.getNodesFromNetwork(kvp, missing, env) if nodeSpecs.len == 0: return # Store nodes onto disk - let report = db.importRawStorageSlotsNodes(peer, accKey, nodeSpecs) + let report = db.importRawStorageSlotsNodes(peer, kvp.data.accKey, nodeSpecs) if 0 < report.len and report[^1].slot.isNone: # Storage error, just run the next lap (not much else that can be done) - let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len - error logTxt "error updating persistent database", peer, - itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists, - nStorageQueue, nNodes=nodeSpecs.len, error=report[^1].error - slots.nodes.missing = slots.nodes.missing & nodeSpecs - return false + error logTxt "database error", peer, ctx=buddy.healingCtx(kvp,env), + nNodes=nodeSpecs.len, error=report[^1].error + return when extraTraceMessages: - block: - let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len - trace logTxt "nodes merged into database", peer, - itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists, - nStorageQueue, nNodes=nodeSpecs.len + trace logTxt "nodes merged into database", peer, + ctx=buddy.healingCtx(kvp,env), nNodes=nodeSpecs.len - # Filter out error and leaf nodes + # Filter out leaf nodes var nLeafNodes = 0 # for logging for w in report: - if w.slot.isSome: # non-indexed entries appear typically at the end, though - let inx = w.slot.unsafeGet + if w.slot.isSome and w.kind.get(otherwise = Branch) == Leaf: - if w.error != NothingSerious or w.kind.isNone: - # error, try downloading again - slots.nodes.missing.add nodeSpecs[inx] + # Leaf Node has been stored, so register it + let + inx = w.slot.unsafeGet + (isLeaf, slotKey) = nodeSpecs[inx].slotKey + if isLeaf: + let + slotTag = slotKey.to(NodeTag) + iv = NodeTagRange.new(slotTag,slotTag) + kvp.data.slots.unprocessed.reduce iv + discard kvp.data.slots.processed.merge iv + nLeafNodes.inc - elif w.kind.unsafeGet != Leaf: - # re-check this node - slots.nodes.check.add nodeSpecs[inx] - - else: - # Node has been stored, double check - let (isLeaf, slotKey) = - buddy.kvStorageSlotsLeaf(kvp, nodeSpecs[inx], env) - if isLeaf: - # Update `uprocessed` registry, collect storage roots (if any) - buddy.registerStorageSlotsLeaf(kvp, slotKey, env) - nLeafNodes.inc - else: - slots.nodes.check.add nodeSpecs[inx] + when extraTraceMessages: + trace logTxt "stored slot", peer, + ctx=buddy.healingCtx(kvp,env), slotKey=slotTag when extraTraceMessages: - let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len - trace logTxt "job done", peer, itCtx=buddy.healingCtx(kvp,env), - nSlotLists=env.nSlotLists, nStorageQueue, nLeafNodes - - -proc healingIsComplete( - buddy: SnapBuddyRef; - kvp: SnapSlotsQueuePair; - env: SnapPivotRef; - ): Future[bool] - {.async.} = - ## Check whether the storage trie can be completely inherited and prepare for - ## healing if not. - ## - ## Returns `true` is the sub-trie is complete (probably inherited), and - ## `false` if there are nodes left to be completed. - let - ctx = buddy.ctx - db = ctx.data.snapDb - peer = buddy.peer - accKey = kvp.data.accKey - storageRoot = kvp.key - - # Check whether this work item can be completely inherited - if kvp.data.inherit: - let rc = db.inspectStorageSlotsTrie(peer, accKey, storageRoot) - - if rc.isErr: - # Oops, not much we can do here (looping trie?) - let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len - error logTxt "problem inspecting storage trie", peer, - nSlotLists=env.nSlotLists, nStorageQueue, storageRoot, error=rc.error - return false - - # Check whether the hexary trie can be inherited as-is. - if rc.value.dangling.len == 0: - return true # done - - # Full range covered by unprocessed items - kvp.data.slots = SnapRangeBatchRef( - nodes: SnapTodoNodes( - missing: rc.value.dangling)) - kvp.data.slots.unprocessed.init() - - # Proceed with healing - return await buddy.storageSlotsHealing(kvp, env) + trace logTxt "job done", peer, ctx=buddy.healingCtx(kvp,env), nLeafNodes # ------------------------------------------------------------------------------ # Public functions @@ -480,43 +327,46 @@ proc healStorageSlots*( ## Fetching and merging missing slorage slots trie database nodes. let ctx = buddy.ctx - db = ctx.data.snapDb peer = buddy.peer - var - (toBeHealed, nAcceptedAsIs) = buddy.assembleWorkItemsQueue(env) + + # Extract healing slot items from partial slots list + var toBeHealed: seq[SnapSlotsQueuePair] + for kvp in env.fetchStoragePart.nextPairs: + # Delete from queue and process this entry + env.fetchStoragePart.del kvp.key + + # Move to returned list unless duplicated in full slots list + if env.fetchStorageFull.eq(kvp.key).isErr: + toBeHealed.add kvp + env.parkedStorage.incl kvp.data.accKey # temporarily parked + if healStorageSlotsBatchMax <= toBeHealed.len: + break # Run against local batch let nHealerQueue = toBeHealed.len if 0 < nHealerQueue: when extraTraceMessages: - block: - let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len - trace logTxt "processing", peer, - nSlotLists=env.nSlotLists, nStoQu, nHealerQueue, nAcceptedAsIs + trace logTxt "processing", peer, ctx=buddy.healingCtx(env), nHealerQueue for n in 0 ..< toBeHealed.len: + # Stop processing, hand back the rest + if buddy.ctrl.stopped: + for m in n ..< toBeHealed.len: + let kvp = toBeHealed[n] + discard env.fetchStoragePart.append(kvp.key, kvp.data) + env.parkedStorage.excl kvp.data.accKey + break + let kvp = toBeHealed[n] + await buddy.storageSlotsHealing(kvp, env) - if buddy.ctrl.running: - if await buddy.healingIsComplete(kvp,env): - env.nSlotLists.inc - nAcceptedAsIs.inc - continue + # Re-queue again unless ready + env.parkedStorage.excl kvp.data.accKey # un-register + if not kvp.data.slots.processed.isFull: + discard env.fetchStoragePart.append(kvp.key, kvp.data) - if kvp.data.slots.isNil: - env.fetchStorageFull.merge kvp # should be the exception - else: - env.fetchStoragePart.merge kvp - - when extraTraceMessages: - let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len - trace logTxt "done", peer, nSlotLists=env.nSlotLists, nStoQu, - nHealerQueue, nAcceptedAsIs, runState=buddy.ctrl.state - - elif 0 < nAcceptedAsIs: - let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len - trace logTxt "work items", peer, nSlotLists=env.nSlotLists, - nStoQu, nHealerQueue, nAcceptedAsIs, runState=buddy.ctrl.state + when extraTraceMessages: + trace logTxt "done", peer, ctx=buddy.healingCtx(env), nHealerQueue # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim b/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim index 02c535c13..70c60d2e0 100644 --- a/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim +++ b/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim @@ -8,26 +8,38 @@ # at your option. This file may not be copied, modified, or distributed # except according to those terms. -## Fetch account ranges -## ==================== +## Fetch accounts DB ranges +## ======================== ## -## Acccount ranges not on the database yet are organised in the set -## `env.fetchAccounts.unprocessed` of intervals (of account hashes.) +## Acccount ranges allocated on the database are organised in the set +## `env.fetchAccounts.processed` and the ranges that can be fetched are in +## the pair of range sets `env.fetchAccounts.unprocessed`. The ranges of these +## sets are mutually disjunct yet the union of all ranges does not fully +## comprise the complete `[0,2^256]` range. The missing parts are the ranges +## currently processed by worker peers. ## -## When processing, the followin happens. +## Algorithm +## --------- ## ## * Some interval `iv` is removed from the `env.fetchAccounts.unprocessed` -## set. This interval set might then be safely accessed and manipulated by -## other worker instances. +## pair of set (so the interval `iv` is protected from other worker +## instances and might be safely accessed and manipulated by this function.) +## Stop if there are no more intervals. ## -## * The data points in the interval `iv` (aka ccount hashes) are fetched from -## another peer over the network. +## * The accounts data points in the interval `iv` (aka account hashes) are +## fetched from the network. This results in *key-value* pairs for accounts. ## -## * The received data points of the interval `iv` are verified and merged -## into the persistent database. +## * The received *key-value* pairs from the previous step are verified and +## merged into the accounts hexary trie persistent database. ## -## * Data points in `iv` that were invalid or not recevied from the network -## are merged back it the set `env.fetchAccounts.unprocessed`. +## * *Key-value* pairs that were invalid or were not recevied from the network +## are merged back into the range set `env.fetchAccounts.unprocessed`. The +## remainder of successfully added ranges (and verified key gaps) are merged +## into `env.fetchAccounts.processed`. +## +## * For *Key-value* pairs that have an active account storage slot sub-trie, +## the account including administrative data is queued in +## `env.fetchStorageFull`. ## import chronicles, @@ -40,7 +52,7 @@ import "../.."/[constants, range_desc, worker_desc], ../com/[com_error, get_account_range], ../db/[hexary_envelope, snapdb_accounts], - ./swap_in + "."/[storage_queue_helper, swap_in] {.push raises: [Defect].} @@ -58,6 +70,22 @@ const template logTxt(info: static[string]): static[string] = "Accounts range " & info +proc `$`(rs: NodeTagRangeSet): string = + rs.fullFactor.toPC(0) + +proc `$`(iv: NodeTagRange): string = + iv.fullFactor.toPC(3) + +proc fetchCtx( + buddy: SnapBuddyRef; + env: SnapPivotRef; + ): string = + "{" & + "pivot=" & "#" & $env.stateHeader.blockNumber & "," & + "runState=" & $buddy.ctrl.state & "," & + "nStoQu=" & $env.storageQueueTotal() & "," & + "nSlotLists=" & $env.nSlotLists & "}" + # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ @@ -88,26 +116,28 @@ proc accountsRangefetchImpl( db = ctx.data.snapDb fa = env.fetchAccounts stateRoot = env.stateHeader.stateRoot - pivot = "#" & $env.stateHeader.blockNumber # for logging # Get a range of accounts to fetch from let iv = block: let rc = buddy.getUnprocessed(env) if rc.isErr: when extraTraceMessages: - trace logTxt "currently all processed", peer, pivot + trace logTxt "currently all processed", peer, ctx=buddy.fetchCtx(env) return rc.value # Process received accounts and stash storage slots to fetch later let dd = block: - let rc = await buddy.getAccountRange(stateRoot, iv, pivot) + let + pivot = "#" & $env.stateHeader.blockNumber + rc = await buddy.getAccountRange(stateRoot, iv, pivot) if rc.isErr: fa.unprocessed.merge iv # fail => interval back to pool let error = rc.error if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): when extraTraceMessages: - trace logTxt "fetch error => stop", peer, pivot, reqLen=iv.len, error + trace logTxt "fetch error", peer, ctx=buddy.fetchCtx(env), + reqLen=iv.len, error return rc.value @@ -118,10 +148,6 @@ proc accountsRangefetchImpl( gotAccounts = dd.data.accounts.len # comprises `gotStorage` gotStorage = dd.withStorage.len - #when extraTraceMessages: - # trace logTxt "fetched", peer, gotAccounts, gotStorage, - # pivot, reqLen=iv.len, gotLen=dd.consumed.len - # Now, we fully own the scheduler. The original interval will savely be placed # back for a moment (the `unprocessed` range set to be corrected below.) fa.unprocessed.merge iv @@ -143,8 +169,8 @@ proc accountsRangefetchImpl( # Bad data, just try another peer buddy.ctrl.zombie = true when extraTraceMessages: - trace logTxt "import failed => stop", peer, gotAccounts, gotStorage, - pivot, reqLen=iv.len, gotLen=covered.total, error=rc.error + trace logTxt "import failed", peer, ctx=buddy.fetchCtx(env), + gotAccounts, gotStorage, reqLen=iv.len, covered, error=rc.error return rc.value @@ -165,25 +191,21 @@ proc accountsRangefetchImpl( discard fa.processed.merge w # Register accounts with storage slots on the storage TODO list. - env.fetchStorageFull.merge dd.withStorage + env.storageQueueAppend dd.withStorage + # Swap in from other pivots unless mothballed, already var nSwapInLaps = 0 - if not env.archived and - swapInAccountsCoverageTrigger <= ctx.pivotAccountsCoverage(): - # Swap in from other pivots + if not env.archived: when extraTraceMessages: - trace logTxt "before swap in", peer, pivot, gotAccounts, gotStorage, - coveredHere=covered.fullFactor.toPC(2), - processed=fa.processed.fullFactor.toPC(2), + trace logTxt "before swap in", peer, ctx=buddy.fetchCtx(env), covered, + gotAccounts, gotStorage, processed=fa.processed, nProcessedChunks=fa.processed.chunks.uint.toSI - if swapInAccountsPivotsMin <= ctx.data.pivotTable.len: - nSwapInLaps = buddy.swapInAccounts(env) + nSwapInLaps = ctx.swapInAccounts env when extraTraceMessages: - trace logTxt "request done", peer, pivot, gotAccounts, gotStorage, - nSwapInLaps, coveredHere=covered.fullFactor.toPC(2), - processed=fa.processed.fullFactor.toPC(2), + trace logTxt "request done", peer, ctx=buddy.fetchCtx(env), gotAccounts, + gotStorage, nSwapInLaps, covered, processed=fa.processed, nProcessedChunks=fa.processed.chunks.uint.toSI return true @@ -204,10 +226,9 @@ proc rangeFetchAccounts*( let ctx = buddy.ctx peer = buddy.peer - pivot = "#" & $env.stateHeader.blockNumber # for logging when extraTraceMessages: - trace logTxt "start", peer, pivot + trace logTxt "start", peer, ctx=buddy.fetchCtx(env) var nFetchAccounts = 0 # for logging while not fa.processed.isFull() and @@ -219,12 +240,11 @@ proc rangeFetchAccounts*( # Clean up storage slots queue first it it becomes too large let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len - if snapStorageSlotsQuPrioThresh < nStoQu: + if storageSlotsQuPrioThresh < nStoQu: break when extraTraceMessages: - trace logTxt "done", peer, pivot, nFetchAccounts, - runState=buddy.ctrl.state + trace logTxt "done", peer, ctx=buddy.fetchCtx(env), nFetchAccounts # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/pivot/range_fetch_storage_slots.nim b/nimbus/sync/snap/worker/pivot/range_fetch_storage_slots.nim index 9756d4c66..0d2df7eb4 100644 --- a/nimbus/sync/snap/worker/pivot/range_fetch_storage_slots.nim +++ b/nimbus/sync/snap/worker/pivot/range_fetch_storage_slots.nim @@ -8,50 +8,59 @@ # at your option. This file may not be copied, modified, or distributed # except according to those terms. -## Fetch storage slots -## =================== +## Fetch storage slots DB ranges +## ============================= ## -## Flow chart for storage slots download -## ------------------------------------- -## :: -## {missing-storage-slots} <-----------------+ -## | | -## v | -## | -## | | -## v | -## {storage-slots} | -## | | -## v | -## | -## | | | -## v v | -## {completed} {partial} | -## | | | -## | +------------------------+ -## v -## +## In principle, this algorithm is a generalised version of the one for +## installing on the accounts hexary trie database. The difference is that +## there are many such storage slots hexary trie database which are typically +## quite small. So the normal action is to download and install a full hexary +## trie rather than merging a partial one. ## -## Legend: -## * `<..>`: some action, process, etc. -## * `{missing-storage-slots}`: list implemented as pair of queues -## `env.fetchStorageFull` and `env.fetchStoragePart` -## * `(storage-slots}`: list is optimised out -## * `{completed}`: list is optimised out -## * `{partial}`: list is optimised out +## Algorithm +## --------- +## +## * Handle full storage slot hexary trie entries +## +## + Remove a list of full storage slot hexary trie entries from the queue of +## full requests `env.fetchStorageFull`. +## +## The *full* adjective indicates that a complete trie will be installed +## rather an a partial one merged. Stop if there are no more full entries +## and proceed with handling partial entries. +## +## + Fetch and install the full trie entries of that list from the network. +## +## + For a list entry that was partially received (there is only one per +## reply message), store the remaining parts to install on the queue of +## partial storage slot hexary trie entries `env.fetchStoragePart`. +## +## + Rinse and repeat +## +## * Handle partial storage slot hexary trie entries +## +## + Remove a single partial storage slot hexary trie entry from the queue +## of partial requests `env.fetchStoragePart`. +## +## The detailed handling of this entry resembles the algorithm described +## for fetching accounts regarding sets of ranges `processed` and +## `unprocessed`. Stop if there are no more entries. +## +## + Fetch and install the partial trie entry from the network. +## +## + Rinse and repeat ## ## Discussion ## ---------- -## Handling storage slots can be seen as an generalisation of handling account -## ranges (see `range_fetch_accounts` module.) Contrary to the situation with -## accounts, storage slots are typically downloaded in the size of a full list -## that can be expanded to a full hexary trie for the given storage root. ## -## Only in rare cases a storage slots list is incomplete, a partial hexary -## trie. In that case, the list of storage slots is processed as described -## for accounts (see `range_fetch_accounts` module.) +## If there is a hexary trie integrity problem when storing a response to a +## full or partial entry request, re-queue the entry on the queue of partial +## requests `env.fetchStoragePart` with the next partial range to fetch half +## of the current request. +## +## In general, if an error occurs, the entry that caused the error is moved +## or re-stored onto the queue of partial requests `env.fetchStoragePart`. ## - import chronicles, chronos, @@ -59,9 +68,10 @@ import stew/[interval_set, keyed_queue], stint, ../../../sync_desc, - "../.."/[constants, range_desc, worker_desc], + "../.."/[range_desc, worker_desc], ../com/[com_error, get_storage_ranges], - ../db/[hexary_error, snapdb_storage_slots] + ../db/[hexary_error, snapdb_storage_slots], + ./storage_queue_helper {.push raises: [Defect].} @@ -78,90 +88,20 @@ const template logTxt(info: static[string]): static[string] = "Storage slots range " & info -# ------------------------------------------------------------------------------ -# Private helpers -# ------------------------------------------------------------------------------ - -proc getNextSlotItemsFull( +proc fetchCtx( buddy: SnapBuddyRef; env: SnapPivotRef; - ): seq[AccountSlotsHeader] = - ## Get list of full work item from the batch queue. - ## - ## If there is an indication that the storage trie may have some data - ## already it is ignored here and marked `inherit` so that it will be - ## picked up by the healing process. + ): string = let ctx = buddy.ctx - peer = buddy.peer - var - nInherit = 0 - for kvp in env.fetchStorageFull.nextPairs: - let it = AccountSlotsHeader( - accKey: kvp.data.accKey, - storageRoot: kvp.key) - - # Verify whether a storage sub-trie exists, already - if kvp.data.inherit or - ctx.data.snapDb.haveStorageSlotsData(peer, it.accKey, it.storageRoot): - kvp.data.inherit = true - nInherit.inc # update for logging - continue - - result.add it - env.fetchStorageFull.del(kvp.key) # ok to delete this item from batch queue - - # Maximal number of items to fetch - if snapStorageSlotsFetchMax <= result.len: - break - - when extraTraceMessages: - trace logTxt "fetch full", peer, nSlotLists=env.nSlotLists, - nStorageQuFull=env.fetchStorageFull.len, nToProcess=result.len, nInherit - - -proc getNextSlotItemPartial( - buddy: SnapBuddyRef; - env: SnapPivotRef; - ): seq[AccountSlotsHeader] = - ## Get work item from the batch queue. - let - ctx = buddy.ctx - peer = buddy.peer - - for kvp in env.fetchStoragePart.nextPairs: - if not kvp.data.slots.isNil: - # Extract range and return single item request queue - let rc = kvp.data.slots.unprocessed.fetch(maxLen = high(UInt256)) - if rc.isOk: - - # Delete from batch queue if the range set becomes empty - if kvp.data.slots.unprocessed.isEmpty: - env.fetchStoragePart.del(kvp.key) - - when extraTraceMessages: - trace logTxt "fetch partial", peer, - nSlotLists=env.nSlotLists, - nStorageQuPart=env.fetchStoragePart.len, - subRange=rc.value, account=kvp.data.accKey - - return @[AccountSlotsHeader( - accKey: kvp.data.accKey, - storageRoot: kvp.key, - subRange: some rc.value)] - - # Oops, empty range set? Remove range and move item to the full requests - kvp.data.slots = nil - env.fetchStorageFull.merge kvp - - -proc backToSlotItemsQueue(env: SnapPivotRef; req: seq[AccountSlotsHeader]) = - if 0 < req.len: - if req[^1].subRange.isSome: - env.fetchStoragePart.merge req[^1] - env.fetchStorageFull.merge req[0 ..< req.len-1] - else: - env.fetchStorageFull.merge req + nStoQu = (env.fetchStorageFull.len + + env.fetchStoragePart.len + + env.parkedStorage.len) + "{" & + "pivot=" & "#" & $env.stateHeader.blockNumber & "," & + "runState=" & $buddy.ctrl.state & "," & + "nStoQu=" & $nStoQu & "," & + "nSlotLists=" & $env.nSlotLists & "}" # ------------------------------------------------------------------------------ # Private functions @@ -171,7 +111,8 @@ proc storeStoragesSingleBatch( buddy: SnapBuddyRef; req: seq[AccountSlotsHeader]; env: SnapPivotRef; - ) {.async.} = + ): Future[bool] + {.async.} = ## Fetch account storage slots and store them in the database. let ctx = buddy.ctx @@ -183,28 +124,19 @@ proc storeStoragesSingleBatch( var stoRange = block: let rc = await buddy.getStorageRanges(stateRoot, req, pivot) if rc.isErr: - env.backToSlotItemsQueue req - let error = rc.error if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): - let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len - trace logTxt "fetch error => stop", peer, pivot, - nSlotLists=env.nSlotLists, nReq=req.len, nStorageQueue, error - return + trace logTxt "fetch error => stop", peer, ctx=buddy.fetchCtx(env), + nReq=req.len, error + return false # all of `req` failed rc.value # Reset error counts for detecting repeated timeouts, network errors, etc. buddy.data.errors.resetComError() var gotSlotLists = stoRange.data.storages.len - - #when extraTraceMessages: - # let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len - # trace logTxt "fetched", peer, pivot, nSlotLists=env.nSlotLists, - # nSlotLists=gotSlotLists, nReq=req.len, - # nStorageQueue, nLeftOvers=stoRange.leftOver.len - if 0 < gotSlotLists: + # Verify/process storages data and save it to disk let report = ctx.data.snapDb.importStorageSlots( peer, stoRange.data, noBaseBoundCheck = true) @@ -214,14 +146,11 @@ proc storeStoragesSingleBatch( if report[^1].slot.isNone: # Failed to store on database, not much that can be done here - env.backToSlotItemsQueue req gotSlotLists.dec(report.len - 1) # for logging only - let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len - error logTxt "import failed", peer, pivot, - nSlotLists=env.nSlotLists, nSlotLists=gotSlotLists, nReq=req.len, - nStorageQueue, error=report[^1].error - return + error logTxt "import failed", peer, ctx=buddy.fetchCtx(env), + nSlotLists=gotSlotLists, nReq=req.len, error=report[^1].error + return false # all of `req` failed # Push back error entries to be processed later for w in report: @@ -234,44 +163,19 @@ proc storeStoragesSingleBatch( if w.error == RootNodeMismatch: # Some pathological case, needs further investigation. For the # moment, provide partial fetches. - const - halfTag = (high(UInt256) div 2).NodeTag - halfTag1 = halfTag + 1.u256 - env.fetchStoragePart.merge [ - AccountSlotsHeader( - accKey: acc.accKey, - storageRoot: acc.storageRoot, - subRange: some NodeTagRange.new(low(NodeTag), halfTag)), - AccountSlotsHeader( - accKey: acc.accKey, - storageRoot: acc.storageRoot, - subRange: some NodeTagRange.new(halfTag1, high(NodeTag)))] + env.storageQueueAppendPartialBisect acc elif w.error == RightBoundaryProofFailed and acc.subRange.isSome and 1 < acc.subRange.unsafeGet.len: # Some pathological case, needs further investigation. For the # moment, provide a partial fetches. - let - iv = acc.subRange.unsafeGet - halfTag = iv.minPt + (iv.len div 2) - halfTag1 = halfTag + 1.u256 - env.fetchStoragePart.merge [ - AccountSlotsHeader( - accKey: acc.accKey, - storageRoot: acc.storageRoot, - subRange: some NodeTagRange.new(iv.minPt, halfTag)), - AccountSlotsHeader( - accKey: acc.accKey, - storageRoot: acc.storageRoot, - subRange: some NodeTagRange.new(halfTag1, iv.maxPt))] + env.storageQueueAppendPartialBisect acc else: # Reset any partial result (which would be the last entry) to # requesting the full interval. So all the storage slots are # re-fetched completely for this account. - env.fetchStorageFull.merge AccountSlotsHeader( - accKey: acc.accKey, - storageRoot: acc.storageRoot) + env.storageQueueAppendFull acc # Last entry might be partial (if any) # @@ -284,10 +188,9 @@ proc storeStoragesSingleBatch( # Update local statistics counter for `nSlotLists` counter update gotSlotLists.dec - let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len - error logTxt "processing error", peer, pivot, nSlotLists=env.nSlotLists, + error logTxt "processing error", peer, ctx=buddy.fetchCtx(env), nSlotLists=gotSlotLists, nReqInx=inx, nReq=req.len, - nStorageQueue, nDangling=w.dangling.len, error=w.error + nDangling=w.dangling.len, error=w.error # Update statistics if gotSlotLists == 1 and @@ -299,7 +202,8 @@ proc storeStoragesSingleBatch( env.nSlotLists.inc(gotSlotLists) # Return unprocessed left overs to batch queue - env.backToSlotItemsQueue stoRange.leftOver + env.storageQueueAppend(stoRange.leftOver, req[^1].subRange) + return true # ------------------------------------------------------------------------------ # Public functions @@ -314,50 +218,62 @@ proc rangeFetchStorageSlots*( ## each work item on the queue at least once.For partial partial slot range ## items this means in case of success that the outstanding range has become ## at least smaller. - let - peer = buddy.peer - fullRangeLen = env.fetchStorageFull.len - partRangeLen = env.fetchStoragePart.len # Fetch storage data and save it on disk. Storage requests are managed by # request queues for handling full/partial replies and re-fetch issues. For # all practical puroses, this request queue should mostly be empty. - if 0 < fullRangeLen or 0 < partRangeLen: + if 0 < env.fetchStorageFull.len or 0 < env.fetchStoragePart.len: + let + ctx = buddy.ctx + peer = buddy.peer when extraTraceMessages: - trace logTxt "start", peer, nSlotLists=env.nSlotLists, - nStorageQueue=(fullRangeLen+partRangeLen) + trace logTxt "start", peer, ctx=buddy.fetchCtx(env) - # Processing the full range will implicitely handle inheritable storage - # slots first with each batch item (see `getNextSlotItemsFull()`.) - # - # Run this batch even if `archived` flag is set in order to shrink the - # batch queue. - var fullRangeItemsleft = 1 + (fullRangeLen-1) div snapStorageSlotsFetchMax - while 0 < fullRangeItemsleft and - buddy.ctrl.running: + + # Run batch even if `archived` flag is set in order to shrink the queues. + var delayed: seq[AccountSlotsHeader] + while buddy.ctrl.running: # Pull out the next request list from the queue - let req = buddy.getNextSlotItemsFull(env) + let (req, nComplete, nPartial) = ctx.storageQueueFetchFull(env) if req.len == 0: break - fullRangeItemsleft.dec - await buddy.storeStoragesSingleBatch(req, env) + when extraTraceMessages: + trace logTxt "fetch full", peer, ctx=buddy.fetchCtx(env), + nStorageQuFull=env.fetchStorageFull.len, nReq=req.len, + nPartial, nComplete - var partialRangeItemsLeft = env.fetchStoragePart.len - while 0 < partialRangeItemsLeft and - buddy.ctrl.running: - # Pull out the next request list from the queue - let req = buddy.getNextSlotItemPartial(env) - if req.len == 0: + if await buddy.storeStoragesSingleBatch(req, env): + for w in req: + env.parkedStorage.excl w.accKey # Done with these items + else: + delayed &= req + env.storageQueueAppend delayed + + # Ditto for partial queue + delayed.setLen(0) + while buddy.ctrl.running: + # Pull out the next request item from the queue + let rc = env.storageQueueFetchPartial() + if rc.isErr: break - partialRangeItemsLeft.dec - await buddy.storeStoragesSingleBatch(req, env) + + when extraTraceMessages: + let + subRange = rc.value.subRange.get + account = rc.value.accKey + trace logTxt "fetch partial", peer, ctx=buddy.fetchCtx(env), + nStorageQuPart=env.fetchStoragePart.len, subRange, account + + if await buddy.storeStoragesSingleBatch(@[rc.value], env): + env.parkedStorage.excl rc.value.accKey # Done with this item + else: + delayed.add rc.value + env.storageQueueAppend delayed when extraTraceMessages: - let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len - trace logTxt "done", peer, nSlotLists=env.nSlotLists, nStorageQueue, - fullRangeItemsleft, partialRangeItemsLeft, runState=buddy.ctrl.state + trace logTxt "done", peer, ctx=buddy.fetchCtx(env) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/pivot/storage_queue_helper.nim b/nimbus/sync/snap/worker/pivot/storage_queue_helper.nim new file mode 100644 index 000000000..439c9534c --- /dev/null +++ b/nimbus/sync/snap/worker/pivot/storage_queue_helper.nim @@ -0,0 +1,292 @@ +# Nimbus +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +import + eth/[common, p2p], + stew/[interval_set, keyed_queue], + ../../../sync_desc, + "../.."/[constants, range_desc, worker_desc], + ../db/[hexary_inspect, snapdb_storage_slots] + +{.push raises: [Defect].} + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +template noExceptionOops(info: static[string]; code: untyped) = + try: + code + except Defect as e: + raise e + except Exception as e: + raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc getOrMakePartial( + env: SnapPivotRef; + stoRoot: Hash256; + accKey: NodeKey; + ): (SnapSlotsQueueItemRef, bool) = + ## Create record on `fetchStoragePart` or return existing one + let rc = env.fetchStoragePart.lruFetch stoRoot + if rc.isOk: + result = (rc.value, true) # Value exists + else: + result = (SnapSlotsQueueItemRef(accKey: accKey), false) # New value + env.parkedStorage.excl accKey # Un-park + discard env.fetchStoragePart.append(stoRoot, result[0]) + + if result[0].slots.isNil: + result[0].slots = SnapRangeBatchRef(processed: NodeTagRangeSet.init()) + result[0].slots.unprocessed.init() + +# ------------------------------------------------------------------------------ +# Public helpers +# ------------------------------------------------------------------------------ + +proc storageQueueTotal*(env: SnapPivotRef): int = + ## Total number of entries on the storage queues + env.fetchStorageFull.len + env.fetchStoragePart.len + env.parkedStorage.len + +# ------------------------------------------------------------------------------ +# Public functions, append queue items +# ------------------------------------------------------------------------------ + +proc storageQueueAppendFull*( + env: SnapPivotRef; + stoRoot: Hash256; + accKey: NodeKey; + ) = + ## Append item to `fetchStorageFull` queue + env.fetchStoragePart.del stoRoot # Not a partial item anymore (if any) + env.parkedStorage.excl accKey # Un-park + discard env.fetchStorageFull.append( + stoRoot, SnapSlotsQueueItemRef(accKey: accKey)) + +proc storageQueueAppendFull*( + env: SnapPivotRef; + acc: AccountSlotsHeader; + ) = + ## variant of `storageQueueAppendFull()` + env.storageQueueAppendFull(acc.storageRoot, acc.accKey) + +proc storageQueueAppendFull*( + env: SnapPivotRef; + kvp: SnapSlotsQueuePair; + ) = + ## variant of `storageQueueAppendFull()` + env.storageQueueAppendFull(kvp.key, kvp.data.accKey) + + +proc storageQueueAppendPartialBisect*( + env: SnapPivotRef; + acc: AccountSlotsHeader; + ) = + ## Append to partial queue so that the next fetch range is half the size of + ## the current next range. + + # Fetch/rotate queue item + let data = env.getOrMakePartial(acc.storageRoot, acc.accKey)[0] + + # Derive unprocessed ranges => into lower priority queue + data.slots.unprocessed.clear() + discard data.slots.unprocessed[1].merge(low(NodeTag),high(NodeTag)) + for iv in data.slots.processed.increasing: + discard data.slots.unprocessed[1].reduce iv # complements processed ranges + + # Prioritise half of first unprocessed range + let rc = data.slots.unprocessed[1].ge() + if rc.isErr: + env.fetchStoragePart.del acc.storageRoot # Oops, nothing to do + return # Done + let halfTag = rc.value.minPt + ((rc.value.maxPt - rc.value.minPt) div 2) + data.slots.unprocessed.merge(rc.value.minPt, halfTag) + + +proc storageQueueAppend*( + env: SnapPivotRef; + reqList: openArray[AccountSlotsHeader]; + subRange = none(NodeTagRange); # For a partially fetched slot + ) = + for n,w in reqList: + env.parkedStorage.excl w.accKey # Un-park + + # Only last item (when `n+1 == reqList.len`) may be registered partial + if w.subRange.isNone or n + 1 < reqList.len: + env.storageQueueAppendFull w + + else: + env.fetchStorageFull.del w.storageRoot + + let + (data, hasItem) = env.getOrMakePartial(w.storageRoot, w.accKey) + iv = w.subRange.unsafeGet + + # Register partial range + if subRange.isSome: + # The `subRange` is the original request, `iv` the uncompleted part + let reqRange = subRange.unsafeGet + if not hasItem: + # Re-initialise book keeping + discard data.slots.processed.merge(low(NodeTag),high(NodeTag)) + discard data.slots.processed.reduce reqRange + data.slots.unprocessed.clear() + + # Calculate `reqRange - iv` which are the completed ranges + let temp = NodeTagRangeSet.init() + discard temp.merge reqRange + discard temp.reduce iv + + # Update `processed` ranges by adding `reqRange - iv` + for w in temp.increasing: + discard data.slots.processed.merge w + + # Update `unprocessed` ranges + data.slots.unprocessed.merge reqRange + data.slots.unprocessed.reduce iv + + elif hasItem: + # Restore unfetched request + data.slots.unprocessed.merge iv + + else: + # Makes no sense with a `leftOver` item + env.storageQueueAppendFull w + +# ------------------------------------------------------------------------------ +# Public functions, make/create queue items +# ------------------------------------------------------------------------------ + +proc storageQueueGetOrMakePartial*( + env: SnapPivotRef; + stoRoot: Hash256; + accKey: NodeKey; + ): SnapSlotsQueueItemRef = + ## Create record on `fetchStoragePart` or return existing one + env.getOrMakePartial(stoRoot, accKey)[0] + +proc storageQueueGetOrMakePartial*( + env: SnapPivotRef; + acc: AccountSlotsHeader; + ): SnapSlotsQueueItemRef = + ## Variant of `storageQueueGetOrMakePartial()` + env.getOrMakePartial(acc.storageRoot, acc.accKey)[0] + +# ------------------------------------------------------------------------------ +# Public functions, fetch and remove queue items +# ------------------------------------------------------------------------------ + +proc storageQueueFetchFull*( + ctx: SnapCtxRef; # Global context + env: SnapPivotRef; # Current pivot environment + ): (seq[AccountSlotsHeader],int,int) = + ## Fetch a list of at most `fetchRequestStorageSlotsMax` full work items + ## from the batch queue. + ## + ## This function walks through the items queue and collects work items where + ## the hexary trie has not been fully or partially allocated on the database + ## already. These collected items are returned as first item of the return + ## code tuple. + ## + ## There will be a sufficient (but not necessary) quick check whether a + ## partally allocated work item is complete, already. In which case it is + ## removed from the queue. The number of removed items is returned as + ## second item of the return code tuple. + ## + ## Otherwise, a partially allocated item is meoved to the partial queue. The + ## number of items moved to the partial queue is returned as third item of + ## the return code tuple. + ## + var + rcList: seq[AccountSlotsHeader] + nComplete = 0 + nPartial = 0 + + noExceptionOops("getNextSlotItemsFull"): + for kvp in env.fetchStorageFull.nextPairs: + let + getFn = ctx.data.snapDb.getStorageSlotsFn kvp.data.accKey + rootKey = kvp.key.to(NodeKey) + accItem = AccountSlotsHeader( + accKey: kvp.data.accKey, + storageRoot: kvp.key) + + # This item will either be returned, discarded, or moved to the partial + # queue subject for healing. So it will be removed from this queue. + env.fetchStorageFull.del kvp.key # OK to delete current link + + # Check whether the tree is fully empty + if rootKey.ByteArray32.getFn.len == 0: + # Collect for return + rcList.add accItem + env.parkedStorage.incl accItem.accKey # Registerd as absent + + # Maximal number of items to fetch + if fetchRequestStorageSlotsMax <= rcList.len: + break + else: + # Check how much there is below the top level storage slots node. For + # a small storage trie, this check will be exhaustive. + let stats = getFn.hexaryInspectTrie(rootKey, + suspendAfter = storageSlotsTrieInheritPerusalMax, + maxDangling = 1) + + if stats.dangling.len == 0 and stats.resumeCtx.isNil: + # This storage trie could be fully searched and there was no dangling + # node. So it is complete and can be fully removed from the batch. + nComplete.inc # Update for logging + else: + # This item becomes a partially available slot + let data = env.storageQueueGetOrMakePartial accItem + nPartial.inc # Update for logging + + (rcList, nComplete, nPartial) + + +proc storageQueueFetchPartial*( + env: SnapPivotRef; + ): Result[AccountSlotsHeader,void] = + ## Get work item from the batch queue. This will typically return the full + ## work item and remove it from the queue unless the parially completed + ## range is fragmented. + block findItem: + for kvp in env.fetchStoragePart.nextPairs: + # Extract range and return single item request queue + let rc = kvp.data.slots.unprocessed.fetch(maxLen = high(UInt256)) + if rc.isOk: + result = ok(AccountSlotsHeader( + accKey: kvp.data.accKey, + storageRoot: kvp.key, + subRange: some rc.value)) + + # Delete from batch queue if the `unprocessed` range set becomes empty + # and the `processed` set is the complemet of `rc.value`. + if kvp.data.slots.unprocessed.isEmpty and + high(UInt256) - rc.value.len <= kvp.data.slots.processed.total: + env.fetchStoragePart.del kvp.key + env.parkedStorage.incl kvp.data.accKey # Temporarily parked + return + else: + # Otherwise rotate queue + break findItem + # End for() + + return err() + + # Rotate queue item + discard env.fetchStoragePart.lruFetch result.value.storageRoot + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/pivot/sub_tries_helper.nim b/nimbus/sync/snap/worker/pivot/sub_tries_helper.nim deleted file mode 100644 index f35ee399c..000000000 --- a/nimbus/sync/snap/worker/pivot/sub_tries_helper.nim +++ /dev/null @@ -1,131 +0,0 @@ -# Nimbus -# Copyright (c) 2021 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or distributed -# except according to those terms. - -import - std/sequtils, - chronicles, - chronos, - eth/[common, p2p], - stew/interval_set, - "../.."/[constants, range_desc, worker_desc], - ../db/[hexary_desc, hexary_error, hexary_inspect] - -{.push raises: [Defect].} - -logScope: - topics = "snap-subtrie" - -const - extraTraceMessages = false or true - ## Enabled additional logging noise - -# ------------------------------------------------------------------------------ -# Private logging helpers -# ------------------------------------------------------------------------------ - -template logTxt(info: static[string]): static[string] = - "Sub-trie helper " & info - -# ------------------------------------------------------------------------------ -# Private functions -# ------------------------------------------------------------------------------ - -proc doInspect( - getFn: HexaryGetFn; ## Abstract database access - rootKey: NodeKey; ## Start of hexary trie - nodes: seq[NodeSpecs]; ## Nodes with prob. dangling child links - resumeCtx: TrieNodeStatCtxRef; ## Resume previous inspection - ): Result[TrieNodeStat,HexaryError] - {.gcsafe, raises: [Defect,RlpError].} = - ## .. - let stats = getFn.hexaryInspectTrie( - rootKey, nodes.mapIt(it.partialPath), resumeCtx, healInspectionBatch) - - if stats.stopped: - return err(TrieLoopAlert) - - ok(stats) - -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ - -proc subTriesFromPartialPaths*( - getFn: HexaryGetFn; ## Abstract database access - stateRoot: Hash256; ## Start of hexary trie - batch: SnapRangeBatchRef; ## Healing data support - nodesMissingMaxLen = high(int); ## Max length of `nodes.missing` - ): Future[Result[void,HexaryError]] - {.async.} = - ## Starting with a given set of potentially dangling account nodes - ## `checkNodes`, this set is filtered and processed. The outcome is - ## fed back to the vey same list `checkNodes` - - # Process might be expensive, so only a single instance is allowed to run it - if batch.lockTriePerusal: - return err(TrieIsLockedForPerusal) - batch.lockTriePerusal = true - - let - rootKey = stateRoot.to(NodeKey) - var - error: HexaryError - count = 0 # for logging - start = Moment.now() # for logging - - block errorWhenOutside: - try: - while batch.nodes.missing.len < nodesMissingMaxLen: - # Inspect hexary trie for dangling nodes - let rc = getFn.doInspect(rootKey, batch.nodes.check, batch.resumeCtx) - if rc.isErr: - error = rc.error - break errorWhenOutside - - count.inc - - # Update context for async threading environment - batch.resumeCtx = rc.value.resumeCtx - batch.nodes.check.setLen(0) - - # Collect result - batch.nodes.missing = batch.nodes.missing & rc.value.dangling - - # Done unless there is some resumption context - if rc.value.resumeCtx.isNil: - break - - when extraTraceMessages: - trace logTxt "inspection wait", count, - elapsed=(Moment.now()-start), - sleep=healInspectionBatchWaitNanoSecs, - nodesMissingLen=batch.nodes.missing.len, nodesMissingMaxLen, - resumeCtxLen = batch.resumeCtx.hddCtx.len - - # Allow async task switch and continue. Note that some other task might - # steal some of the `nodes.missing` var argument. - await sleepAsync healInspectionBatchWaitNanoSecs.nanoseconds - - batch.lockTriePerusal = false - return ok() - - except RlpError: - error = RlpEncoding - - batch.nodes.missing = batch.nodes.missing & batch.resumeCtx.to(seq[NodeSpecs]) - batch.resumeCtx = nil - - batch.lockTriePerusal = false - return err(error) - -# ------------------------------------------------------------------------------ -# End -# ------------------------------------------------------------------------------ - diff --git a/nimbus/sync/snap/worker/pivot/swap_in.nim b/nimbus/sync/snap/worker/pivot/swap_in.nim index 8a214382e..aea7e50be 100644 --- a/nimbus/sync/snap/worker/pivot/swap_in.nim +++ b/nimbus/sync/snap/worker/pivot/swap_in.nim @@ -140,15 +140,16 @@ proc uncoveredEnvelopes( var decomposed = "n/a" let rc = processed.hexaryEnvelopeDecompose(rootKey, getFn) if rc.isOk: - # Remove non-allocated nodes + # Return allocated nodes only result = rc.value.filterIt(0 < it.nodeKey.ByteArray32.getFn().len) when extraTraceMessages: decomposed = rc.value.toPC when extraTraceMessages: - trace logTxt "uncovered envelopes", processed, nProcessed=processed.chunks, - decomposed, nResult=result.len, result=result.toPC + trace logTxt "unprocessed envelopes", processed, + nProcessed=processed.chunks, decomposed, + nResult=result.len, result=result.toPC proc otherProcessedRanges( @@ -233,17 +234,17 @@ proc swapIn( merged += processed.merge iv # Import range as processed unprocessed.reduce iv # No need to re-fetch - when extraTraceMessages: - trace logTxt "inherited ranges", lapCount, nCheckNodes=checkNodes.len, - merged=((merged.to(float) / (2.0^256)).toPC(3)), - allMerged=((allMerged.to(float) / (2.0^256)).toPC(3)) - if merged == 0: # Loop control break lapCount.inc allMerged += merged # Statistics, logging + when extraTraceMessages: + trace logTxt "inherited ranges", lapCount, nCheckNodes=checkNodes.len, + merged=((merged.to(float) / (2.0^256)).toPC(3)), + allMerged=((allMerged.to(float) / (2.0^256)).toPC(3)) + # End while() (swappedIn,lapCount) @@ -326,15 +327,6 @@ proc swapInAccounts*( nLaps - -proc swapInAccounts*( - buddy: SnapBuddyRef; # Worker peer - env: SnapPivotRef; # Current pivot environment - loopMax = 100; # Prevent from looping too often - ): int = - ## Variant of `swapInAccounts()` - buddy.ctx.swapInAccounts(env, loopMax) - # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker_desc.nim b/nimbus/sync/snap/worker_desc.nim index a58b61d3f..f09aea00b 100644 --- a/nimbus/sync/snap/worker_desc.nim +++ b/nimbus/sync/snap/worker_desc.nim @@ -15,7 +15,7 @@ import ../../db/select_backend, ../sync_desc, ./worker/com/com_error, - ./worker/db/[hexary_desc, snapdb_desc, snapdb_pivot], + ./worker/db/[snapdb_desc, snapdb_pivot], ./worker/ticker, ./range_desc @@ -41,7 +41,6 @@ type ## range + healing support. accKey*: NodeKey ## Owner account slots*: SnapRangeBatchRef ## slots to fetch, nil => all slots - inherit*: bool ## mark this trie seen already SnapTodoRanges* = array[2,NodeTagRangeSet] ## Pair of sets of ``unprocessed`` node ranges that need to be fetched and @@ -50,30 +49,22 @@ type ## This data structure is used for coordinating peers that run quasi ## parallel. - SnapTodoNodes* = object - ## Pair of node lists subject to swap-in and healing - check*: seq[NodeSpecs] ## Existing nodes, sub-trie unknown - missing*: seq[NodeSpecs] ## Top ref for sub-tries to be healed - SnapRangeBatchRef* = ref object ## `NodeTag` ranges to fetch, healing support unprocessed*: SnapTodoRanges ## Range of slots to be fetched processed*: NodeTagRangeSet ## Node ranges definitely processed - nodes*: SnapTodoNodes ## Single nodes to double check - resumeCtx*: TrieNodeStatCtxRef ## State for resuming trie inpection - lockTriePerusal*: bool ## Only one process at a time SnapPivotRef* = ref object ## Per-state root cache for particular snap data environment stateHeader*: BlockHeader ## Pivot state, containg state root - # Accounts download + # Accounts download coverage fetchAccounts*: SnapRangeBatchRef ## Set of accounts ranges to fetch - healThresh*: float ## Start healing when fill factor reached # Storage slots download fetchStorageFull*: SnapSlotsQueue ## Fetch storage trie for these accounts fetchStoragePart*: SnapSlotsQueue ## Partial storage trie to com[plete + parkedStorage*: HashSet[NodeKey] ## Storage batch items in use storageDone*: bool ## Done with storage, block sync next # Info @@ -147,7 +138,7 @@ proc pivotAccountsCoverage*(ctx: SnapCtxRef): float = proc init*(q: var SnapTodoRanges) = ## Populate node range sets with maximal range in the first range set. This - ## kind of pair or interval sets is manages as follows: + ## kind of pair or interval sets is managed as follows: ## * As long as possible, fetch and merge back intervals on the first set. ## * If the first set is empty and some intervals are to be fetched, swap ## first and second interval lists. @@ -157,6 +148,11 @@ proc init*(q: var SnapTodoRanges) = q[1] = NodeTagRangeSet.init() discard q[0].merge(low(NodeTag),high(NodeTag)) +proc clear*(q: var SnapTodoRanges) = + ## Reset argument range sets empty. + q[0].clear() + q[1].clear() + proc merge*(q: var SnapTodoRanges; iv: NodeTagRange) = ## Unconditionally merge the node range into the account ranges list. @@ -221,81 +217,6 @@ proc verify*(q: var SnapTodoRanges): bool = return false true -# ------------------------------------------------------------------------------ -# Public helpers: SlotsQueue -# ------------------------------------------------------------------------------ - -proc merge*(q: var SnapSlotsQueue; kvp: SnapSlotsQueuePair) = - ## Append/prepend a queue item record into the batch queue. - let - reqKey = kvp.key - rc = q.eq(reqKey) - if rc.isErr: - # Append to list - discard q.append(reqKey, kvp.data) - else: - # Entry exists already - let qData = rc.value - if not qData.slots.isNil: - # So this entry is not maximal and can be extended - if kvp.data.slots.isNil: - # Remove restriction for this entry and move it to the right end - qData.slots = nil - discard q.lruFetch reqKey - else: - # Merge argument intervals into target set - for ivSet in kvp.data.slots.unprocessed: - for iv in ivSet.increasing: - qData.slots.unprocessed.reduce iv - -proc merge*(q: var SnapSlotsQueue; fetchReq: AccountSlotsHeader) = - ## Append/prepend a slot header record into the batch queue. If there is - ## a range merger, the argument range will be sortred in a way so that it - ## is processed separately with highest priority. - let - reqKey = fetchReq.storageRoot - rc = q.eq(reqKey) - if rc.isOk: - # Entry exists already - let qData = rc.value - if not qData.slots.isNil: - # So this entry is not maximal and can be extended - if fetchReq.subRange.isNone: - # Remove restriction for this entry and move it to the right end - qData.slots = nil - discard q.lruFetch reqKey - else: - # Merge argument interval into target separated from the already - # existing sets (note that this works only for the last set) - for iv in qData.slots.unprocessed[0].increasing: - # Move all to second set - discard qData.slots.unprocessed[1].merge iv - # Clear first set and add argument range - qData.slots.unprocessed[0].clear() - qData.slots.unprocessed.merge fetchReq.subRange.unsafeGet - - elif fetchReq.subRange.isNone: - # Append full range to the list - discard q.append(reqKey, SnapSlotsQueueItemRef( - accKey: fetchReq.accKey)) - - else: - # Partial range, add healing support and interval - var unprocessed = [NodeTagRangeSet.init(), NodeTagRangeSet.init()] - discard unprocessed[0].merge(fetchReq.subRange.unsafeGet) - discard q.append(reqKey, SnapSlotsQueueItemRef( - accKey: fetchReq.accKey, - slots: SnapRangeBatchRef( - unprocessed: unprocessed, - processed: NodeTagRangeSet.init()))) - -proc merge*( - q: var SnapSlotsQueue; - reqList: openArray[SnapSlotsQueuePair|AccountSlotsHeader]) = - ## Variant fof `merge()` for a list argument - for w in reqList: - q.merge w - # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------