From 44a57496d938a22e71c5328c35ebaa7936ae932b Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Mon, 28 Nov 2022 09:03:23 +0000 Subject: [PATCH] Snap sync interval complement method to speed up trie perusal (#1328) * Add quick hexary trie inspector, called `dismantle()` why: + Full hexary trie perusal is slow if running down leaf nodes + For known range of leaf nodes, work out the UInt126-complement of partial sub-trie paths (for existing nodes). The result should cover no (or only a few) sub-tries with leaf nodes. * Extract common healing methods => `sub_tries_helper.nim` details: Also apply quick hexary trie inspection tool `dismantle()` Replace `inspectAccountsTrie()` wrapper by `hexaryInspectTrie()` * Re-arrange task dispatching in main peer worker * Refactor accounts and storage slots downloaders * Rename `HexaryDbError` => `HexaryError` --- nimbus/sync/snap/constants.nim | 21 +- nimbus/sync/snap/range_desc.nim | 29 +- nimbus/sync/snap/worker.nim | 68 +---- nimbus/sync/snap/worker/db/hexary_desc.nim | 2 +- nimbus/sync/snap/worker/db/hexary_error.nim | 3 +- nimbus/sync/snap/worker/db/hexary_inspect.nim | 29 +- .../snap/worker/db/hexary_interpolate.nim | 4 +- nimbus/sync/snap/worker/db/hexary_paths.nim | 250 +++++++++++++++++- .../worker/db/{ => notused}/snapdb_check.nim | 38 ++- .../sync/snap/worker/db/snapdb_accounts.nim | 207 +++++---------- nimbus/sync/snap/worker/db/snapdb_desc.nim | 6 +- .../sync/snap/worker/db/snapdb_persistent.nim | 10 +- nimbus/sync/snap/worker/db/snapdb_pivot.nim | 6 +- .../snap/worker/db/snapdb_storage_slots.nim | 101 ++++--- nimbus/sync/snap/worker/heal_accounts.nim | 151 ++++------- .../sync/snap/worker/heal_storage_slots.nim | 48 ++-- nimbus/sync/snap/worker/pivot_helper.nim | 93 ++++--- .../sync/snap/worker/range_fetch_accounts.nim | 102 +++---- .../snap/worker/range_fetch_storage_slots.nim | 92 ++++--- nimbus/sync/snap/worker/sub_tries_helper.nim | 223 ++++++++++++++++ nimbus/sync/snap/worker_desc.nim | 11 +- tests/test_sync_snap.nim | 149 +++++++---- 22 files changed, 1014 insertions(+), 629 deletions(-) rename nimbus/sync/snap/worker/db/{ => notused}/snapdb_check.nim (88%) create mode 100644 nimbus/sync/snap/worker/sub_tries_helper.nim diff --git a/nimbus/sync/snap/constants.nim b/nimbus/sync/snap/constants.nim index 20bfe6ff3..e6b128777 100644 --- a/nimbus/sync/snap/constants.nim +++ b/nimbus/sync/snap/constants.nim @@ -83,7 +83,7 @@ const ## nodes to allow for a pseudo -task switch. - healAccountsTrigger* = 0.99 + healAccountsCoverageTrigger* = 0.999 ## Apply accounts healing if the global snap download coverage factor ## exceeds this setting. The global coverage factor is derived by merging ## all account ranges retrieved for all pivot state roots (see @@ -95,6 +95,23 @@ const ## over the network. More requests might be a disadvantage if peers only ## serve a maximum number requests (rather than data.) + healAccountsPivotTriggerMinFactor* = 0.17 + ## Additional condition to meed before starting healing. The current + ## pivot must have at least this much processed as recorded in the + ## `processed` ranges set. This is the minimim value (see below.) + + healAccountsPivotTriggerWeight* = 0.01 + healAccountsPivotTriggerNMax* = 10 + ## Enable healing not before the `processed` ranges set fill factor has + ## at least the following value. + ## :: + ## MinFactor + max(0, NMax - pivotTable.len) * Weight + ## + ## (the `healAccountsPivotTrigger` prefix of the constant names is ommited.) + ## + ## This effects in favouring late healing when more pivots have been + ## downloaded. + healAccountsBatchFetchMax* = 10 * 1024 ## Keep on gloing in healing task up until this many nodes have been ## fetched from the network or some error contition terminates the task. @@ -142,7 +159,7 @@ const ## Set 0 to disable. static: - doAssert healAccountsTrigger < 1.0 # larger values make no sense + doAssert healAccountsCoverageTrigger < 1.0 # larger values make no sense doAssert snapStorageSlotsQuPrioThresh < snapAccountsSaveStorageSlotsMax doAssert snapStorageSlotsFetchMax < healAccountsBatchFetchMax diff --git a/nimbus/sync/snap/range_desc.nim b/nimbus/sync/snap/range_desc.nim index 73948f3a9..1f08492db 100644 --- a/nimbus/sync/snap/range_desc.nim +++ b/nimbus/sync/snap/range_desc.nim @@ -10,7 +10,7 @@ import std/[math, sequtils, strutils, hashes], - eth/[common, trie/nibbles], + eth/common, stew/[byteutils, interval_set], stint, ../../constants, @@ -82,27 +82,6 @@ type account*: AccountSlotsHeader data*: seq[SnapStorage] -# ------------------------------------------------------------------------------ -# Private helpers -# ------------------------------------------------------------------------------ - -proc padPartialPath(partialPath: NibblesSeq; dblNibble: byte): NodeKey = - ## Extend (or cut) `partialPath` nibbles sequence and generate `NodeKey` - # Pad with zeroes - var padded: NibblesSeq - - let padLen = 64 - partialPath.len - if 0 <= padLen: - padded = partialPath & dblNibble.repeat(padlen div 2).initNibbleRange - if (padLen and 1) == 1: - padded = padded & @[dblNibble].initNibbleRange.slice(1) - else: - let nope = seq[byte].default.initNibbleRange - padded = partialPath.slice(0,63) & nope # nope forces re-alignment - - let bytes = padded.getBytes - (addr result.ByteArray32[0]).copyMem(unsafeAddr bytes[0], bytes.len) - # ------------------------------------------------------------------------------ # Public helpers # ------------------------------------------------------------------------------ @@ -139,12 +118,6 @@ proc to*(n: SomeUnsignedInt|UInt256; T: type NodeTag): T = ## Syntactic sugar n.u256.T -proc min*(partialPath: Blob; T: type NodeKey): T = - (hexPrefixDecode partialPath)[1].padPartialPath(0) - -proc max*(partialPath: Blob; T: type NodeKey): T = - (hexPrefixDecode partialPath)[1].padPartialPath(0xff) - proc digestTo*(data: Blob; T: type NodeKey): T = keccakHash(data).data.T diff --git a/nimbus/sync/snap/worker.nim b/nimbus/sync/snap/worker.nim index 4a4c0a6d1..2a38c8eb7 100644 --- a/nimbus/sync/snap/worker.nim +++ b/nimbus/sync/snap/worker.nim @@ -20,7 +20,7 @@ import ".."/[protocol, sync_desc], ./worker/[pivot_helper, ticker], ./worker/com/com_error, - ./worker/db/[hexary_desc, snapdb_check, snapdb_desc, snapdb_pivot], + ./worker/db/[hexary_desc, snapdb_desc, snapdb_pivot], "."/[constants, range_desc, worker_desc] {.push raises: [Defect].} @@ -221,10 +221,6 @@ proc runDaemon*(ctx: SnapCtxRef) {.async.} = ctx.data.ticker.stopRecovery() return - # Update logging - if not ctx.data.ticker.isNil: - ctx.data.ticker.stopRecovery() - proc runSingle*(buddy: SnapBuddyRef) {.async.} = ## Enabled while @@ -249,38 +245,6 @@ proc runPool*(buddy: SnapBuddyRef, last: bool): bool = ctx.poolMode = false result = true - block: - let rc = ctx.data.pivotTable.lastValue - if rc.isOk: - - # Check whether last pivot accounts and storage are complete. - let - env = rc.value - peer = buddy.peer - pivot = "#" & $env.stateHeader.blockNumber # for logging - - if not env.storageDone: - - # Check whether accounts download is complete - if env.fetchAccounts.unprocessed.isEmpty(): - - # FIXME: This check might not be needed. It will visit *every* node - # in the hexary trie for checking the account leaves. - # - # Note: This is insane on main net - if buddy.checkAccountsTrieIsComplete(env): - env.accountsState = HealerDone - - # Check whether storage slots are complete - if env.fetchStorageFull.len == 0 and - env.fetchStoragePart.len == 0: - env.storageDone = true - - when extraTraceMessages: - trace "Checked for pivot DB completeness", peer, pivot, - nAccounts=env.nAccounts, accountsState=env.accountsState, - nSlotLists=env.nSlotLists, storageDone=env.storageDone - proc runMulti*(buddy: SnapBuddyRef) {.async.} = ## Enabled while @@ -317,7 +281,10 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} = ctx.data.pivotTable.beforeTopMostlyClean() # This one is the syncing work horse which downloads the database - let syncActionContinue = await env.execSnapSyncAction(buddy) + await env.execSnapSyncAction(buddy) + + if env.obsolete: + return # pivot has changed # Save state so sync can be partially resumed at next start up let @@ -337,29 +304,8 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} = nAccounts=env.nAccounts, nSlotLists=env.nSlotLists, processed, nStoQu, blobSize=rc.value - if not syncActionContinue: - return - - # Check whether there are more accounts to fetch. - # - # Note that some other process might have temporarily borrowed from the - # `fetchAccounts.unprocessed` list. Whether we are done can only be decided - # if only a single buddy is active. S be it. - if env.fetchAccounts.unprocessed.isEmpty(): - - # Debugging log: analyse pivot against database - warn "Analysing accounts database -- might be slow", peer, pivot - discard buddy.checkAccountsListOk(env) - - # Check whether pivot download is complete. - if env.fetchStorageFull.len == 0 and - env.fetchStoragePart.len == 0: - trace "Running pool mode for verifying completeness", peer, pivot - buddy.ctx.poolMode = true - - # Debugging log: analyse pivot against database - warn "Analysing storage slots database -- might be slow", peer, pivot - discard buddy.checkStorageSlotsTrieIsComplete(env) + if buddy.ctrl.stopped: + return # peer worker has gone # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/db/hexary_desc.nim b/nimbus/sync/snap/worker/db/hexary_desc.nim index 4a93a64fe..6f567d269 100644 --- a/nimbus/sync/snap/worker/db/hexary_desc.nim +++ b/nimbus/sync/snap/worker/db/hexary_desc.nim @@ -165,7 +165,7 @@ type slot*: Option[int] ## May refer to indexed argument slots kind*: Option[NodeKind] ## Node type (if any) dangling*: seq[NodeSpecs] ## Missing inner sub-tries - error*: HexaryDbError ## Error code, or `NothingSerious` + error*: HexaryError ## Error code, or `NothingSerious` const EmptyNodeBlob* = seq[byte].default diff --git a/nimbus/sync/snap/worker/db/hexary_error.nim b/nimbus/sync/snap/worker/db/hexary_error.nim index ce0dfb8c5..e759da147 100644 --- a/nimbus/sync/snap/worker/db/hexary_error.nim +++ b/nimbus/sync/snap/worker/db/hexary_error.nim @@ -9,7 +9,7 @@ # except according to those terms. type - HexaryDbError* = enum + HexaryError* = enum NothingSerious = 0 AccountNotFound @@ -22,6 +22,7 @@ type SlotsNotSrictlyIncreasing TrieLoopAlert TrieIsEmpty + TrieIsLockedForPerusal TooManyProcessedChunks TooManySlotAccounts diff --git a/nimbus/sync/snap/worker/db/hexary_inspect.nim b/nimbus/sync/snap/worker/db/hexary_inspect.nim index ea44a7ce7..28db01ab7 100644 --- a/nimbus/sync/snap/worker/db/hexary_inspect.nim +++ b/nimbus/sync/snap/worker/db/hexary_inspect.nim @@ -139,7 +139,7 @@ proc processLink( inspect: var seq[(NodeKey,NibblesSeq)]; trail: NibblesSeq; child: Rlp; - ) {.gcsafe, raises: [Defect,RlpError,KeyError]} = + ) {.gcsafe, raises: [Defect,RlpError]} = ## Ditto if not child.isEmpty: let childBlob = child.toBytes @@ -161,6 +161,27 @@ proc processLink( # Public functions # ------------------------------------------------------------------------------ +proc to*(resumeCtx: TrieNodeStatCtxRef; T: type seq[NodeSpecs]): T = + ## Convert resumption context to nodes that can be used otherwise. This + ## function might be useful for error recovery. + ## + ## Note: In a non-persistant case, temporary `RepairKey` type node specs + ## that cannot be converted to `NodeKey` type nodes are silently dropped. + ## This should be no problem as a hexary trie with `RepairKey` type node + ## refs must be repaired or discarded anyway. + if resumeCtx.persistent: + for (key,trail) in resumeCtx.hddCtx: + result.add NodeSpecs( + partialPath: trail.hexPrefixEncode(isLeaf = false), + nodeKey: key) + else: + for (key,trail) in resumeCtx.memCtx: + if key.isNodeKey: + result.add NodeSpecs( + partialPath: trail.hexPrefixEncode(isLeaf = false), + nodeKey: key.convertTo(NodeKey)) + + proc hexaryInspectPath*( db: HexaryTreeDbRef; ## Database root: NodeKey; ## State root @@ -206,7 +227,7 @@ proc hexaryInspectToKeys*( proc hexaryInspectTrie*( db: HexaryTreeDbRef; ## Database root: NodeKey; ## State root - paths: seq[Blob]; ## Starting paths for search + paths: seq[Blob] = @[]; ## Starting paths for search resumeCtx: TrieNodeStatCtxRef = nil; ## Context for resuming inspection suspendAfter = high(uint64); ## To be resumed stopAtLevel = 64; ## Instead of loop detector @@ -309,12 +330,12 @@ proc hexaryInspectTrie*( proc hexaryInspectTrie*( getFn: HexaryGetFn; ## Database abstraction rootKey: NodeKey; ## State root - paths: seq[Blob]; ## Starting paths for search + paths: seq[Blob] = @[]; ## Starting paths for search resumeCtx: TrieNodeStatCtxRef = nil; ## Context for resuming inspection suspendAfter = high(uint64); ## To be resumed stopAtLevel = 64; ## Instead of loop detector ): TrieNodeStat - {.gcsafe, raises: [Defect,RlpError,KeyError]} = + {.gcsafe, raises: [Defect,RlpError]} = ## Variant of `hexaryInspectTrie()` for persistent database. when extraTraceMessages: let nPaths = paths.len diff --git a/nimbus/sync/snap/worker/db/hexary_interpolate.nim b/nimbus/sync/snap/worker/db/hexary_interpolate.nim index 4f7687c16..577b09c0f 100644 --- a/nimbus/sync/snap/worker/db/hexary_interpolate.nim +++ b/nimbus/sync/snap/worker/db/hexary_interpolate.nim @@ -42,7 +42,7 @@ proc pp(w: seq[RPathXStep]; db: HexaryTreeDbRef; indent = 4): string = let pfx = "\n" & " ".repeat(indent) w.mapIt(it.pp(db)).join(pfx) -proc pp(rc: Result[TrieNodeStat, HexaryDbError]; db: HexaryTreeDbRef): string = +proc pp(rc: Result[TrieNodeStat, HexaryError]; db: HexaryTreeDbRef): string = if rc.isErr: $rc.error else: rc.value.pp(db) # ------------------------------------------------------------------------------ @@ -524,7 +524,7 @@ proc hexaryInterpolate*( rootKey: NodeKey; ## Root node hash dbItems: var seq[RLeafSpecs]; ## List of path and leaf items bootstrap = false; ## Can create root node on-the-fly - ): Result[void,HexaryDbError] + ): Result[void,HexaryError] {.gcsafe, raises: [Defect,KeyError]} = ## From the argument list `dbItems`, leaf nodes will be added to the hexary ## trie while interpolating the path for the leaf nodes by adding missing diff --git a/nimbus/sync/snap/worker/db/hexary_paths.nim b/nimbus/sync/snap/worker/db/hexary_paths.nim index c11abeba7..0bd832359 100644 --- a/nimbus/sync/snap/worker/db/hexary_paths.nim +++ b/nimbus/sync/snap/worker/db/hexary_paths.nim @@ -11,8 +11,9 @@ ## Find node paths in hexary tries. import - std/[tables], + std/[algorithm, sequtils, tables], eth/[common, trie/nibbles], + stew/[byteutils, interval_set], ../../range_desc, ./hexary_desc @@ -29,6 +30,16 @@ proc pp(w: Blob; db: HexaryTreeDbRef): string = # Private helpers # ------------------------------------------------------------------------------ +proc `==`(a, b: XNodeObj): bool = + if a.kind == b.kind: + case a.kind: + of Leaf: + return a.lPfx == b.lPfx and a.lData == b.lData + of Extension: + return a.ePfx == b.ePfx and a.eLink == b.eLink + of Branch: + return a.bLink == b.bLink + proc getNibblesImpl(path: XPath|RPath; start = 0): NibblesSeq = ## Re-build the key path for n in start ..< path.path.len: @@ -42,6 +53,19 @@ proc getNibblesImpl(path: XPath|RPath; start = 0): NibblesSeq = result = result & it.node.lPfx result = result & path.tail +proc getNibblesImpl(path: XPath|RPath; start, maxLen: int): NibblesSeq = + ## Variant of `getNibblesImpl()` for partial rebuild + for n in start ..< min(path.path.len, maxLen): + let it = path.path[n] + case it.node.kind: + of Branch: + result = result & @[it.nibble.byte].initNibbleRange.slice(1) + of Extension: + result = result & it.node.ePfx + of Leaf: + result = result & it.node.lPfx + + proc toBranchNode( rlp: Rlp ): XNodeObj @@ -88,6 +112,24 @@ proc `<`(a, b: NibblesSeq): bool = # Private functions # ------------------------------------------------------------------------------ +proc padPartialPath(pfx: NibblesSeq; dblNibble: byte): NodeKey = + ## Extend (or cut) `partialPath` nibbles sequence and generate `NodeKey` + # Pad with zeroes + var padded: NibblesSeq + + let padLen = 64 - pfx.len + if 0 <= padLen: + padded = pfx & dblNibble.repeat(padlen div 2).initNibbleRange + if (padLen and 1) == 1: + padded = padded & @[dblNibble].initNibbleRange.slice(1) + else: + let nope = seq[byte].default.initNibbleRange + padded = pfx.slice(0,63) & nope # nope forces re-alignment + + let bytes = padded.getBytes + (addr result.ByteArray32[0]).copyMem(unsafeAddr bytes[0], bytes.len) + + proc pathExtend( path: RPath; key: RepairKey; @@ -405,6 +447,82 @@ proc pathMost( # End while # Notreached + +proc dismantleLeft(envPt, ivPt: RPath|XPath): Result[seq[Blob],void] = + ## Helper for `dismantle()` for handling left side of envelope + # + # partialPath + # / \ + # / \ + # / \ + # / \ + # envPt.. -- envelope of partial path + # | + # ivPt.. -- `iv`, not fully covering left of `env` + # + var collect: seq[Blob] + block leftCurbEnvelope: + for n in 0 ..< min(envPt.path.len, ivPt.path.len): + if envPt.path[n] != ivPt.path[n]: + # + # At this point, the `node` entries of either `path[n]` step are + # the same. This is so because the predecessor steps were the same + # or were the `rootKey` in case n == 0. + # + # But then (`node` entries being equal) the only way for the + # `path[n]` steps to differ is in the entry selector `nibble` for + # a branch node. + # + for m in n ..< ivPt.path.len: + let + pfx = ivPt.getNibblesImpl(0,m) # common path segment + top = ivPt.path[m].nibble # need nibbles smaller than top + # + # Incidentally for a non-`Branch` node, the value `top` becomes + # `-1` and the `for`- loop will be ignored (which is correct) + for nibble in 0 ..< top: + collect.add hexPrefixEncode( + pfx & @[nibble.byte].initNibbleRange.slice(1), isLeaf=false) + break leftCurbEnvelope + # + # Fringe case, e.g. when `partialPath` is an empty prefix (aka `@[0]`) + # and the database has a single leaf node `(a,some-value)` where the + # `rootKey` is the hash of this node. In that case, `pMin == 0` and + # `pMax == high(NodeTag)` and `iv == [a,a]`. + # + return err() + + ok(collect) + +proc dismantleRight(envPt, ivPt: RPath|XPath): Result[seq[Blob],void] = + ## Helper for `dismantle()` for handling right side of envelope + # + # partialPath + # / \ + # / \ + # / \ + # / \ + # .. envPt -- envelope of partial path + # | + # .. ivPt -- `iv`, not fully covering right of `env` + # + var collect: seq[Blob] + block rightCurbEnvelope: + for n in 0 ..< min(envPt.path.len, ivPt.path.len): + if envPt.path[n] != ivPt.path[n]: + for m in n ..< ivPt.path.len: + let + pfx = ivPt.getNibblesImpl(0,m) # common path segment + base = ivPt.path[m].nibble # need nibbles greater/equal + if 0 <= base: + for nibble in base+1 .. 15: + collect.add hexPrefixEncode( + pfx & @[nibble.byte].initNibbleRange.slice(1), isLeaf=false) + break rightCurbEnvelope + return err() + + ok(collect) + # ------------------------------------------------------------------------------ # Public helpers # ------------------------------------------------------------------------------ @@ -437,6 +555,45 @@ proc leafData*(path: RPath): Blob = of Extension: discard +proc pathEnvelope*(partialPath: Blob): NodeTagRange = + ## Convert partial path to range of all keys starting with this + ## partial path + let pfx = (hexPrefixDecode partialPath)[1] + NodeTagRange.new( + pfx.padPartialPath(0).to(NodeTag), + pfx.padPartialPath(255).to(NodeTag)) + +proc pathSortUniq*( + partialPaths: openArray[Blob]; + ): seq[Blob] + {.gcsafe, raises: [Defect,KeyError]} = + ## Sort and simplify a list of partial paths by removoing nested entries. + + var tab: Table[NodeTag,(Blob,bool)] + for w in partialPaths: + let iv = w.pathEnvelope + tab[iv.minPt] = (w,true) # begin entry + tab[iv.maxPt] = (@[],false) # end entry + + # When sorted, nested entries look like + # + # 123000000.. (w0, true) + # 123400000.. (w1, true) + # 1234fffff.. (, false) + # 123ffffff.. (, false) + # ... + # 777000000.. (w2, true) + # + var level = 0 + for key in toSeq(tab.keys).sorted(cmp): + let (w,begin) = tab[key] + if begin: + if level == 0: + result.add w + level.inc + else: + level.dec + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ @@ -658,6 +815,97 @@ proc prev*( if minDepth <= newPath.depth and 0 < newPath.leafData.len: return newPath + +proc dismantle*( + partialPath: Blob; ## Patrial path for existing node + rootKey: NodeKey; ## State root + iv: NodeTagRange; ## Proofed range of leaf paths + db: HexaryTreeDbRef; ## Database + ): seq[Blob] + {.gcsafe, raises: [Defect,RlpError,KeyError]} = + ## Returns the list of partial paths which envelopes span the range of + ## node paths one obtains by subtracting the argument range `iv` from the + ## envelope of the argumenr `partialPath`. + ## + ## The following boundary conditions apply in order to get a useful result + ## in a partially completed hexary trie database. + ## + ## * The argument `partialPath` refers to an existing node. + ## + ## * The argument `iv` contains a range of paths (e.g. account hash keys) + ## with the property that if there is no (leaf-) node for that path, then + ## no such node exists when the database is completed. + ## + ## This condition is sort of rephrasing the boundary proof condition that + ## applies when downloading a range of accounts or storage slots from the + ## network via `snap/1` protocol. In fact the condition here is stricter + ## as it excludes sub-trie *holes* (see comment on `importAccounts()`.) + ## + # Chechk for the trivial case when the `partialPath` envelope and `iv` do + # not overlap. + let env = partialPath.pathEnvelope + if iv.maxPt < env.minPt or env.maxPt < iv.minPt: + return @[partialPath] + + # So ranges do overlap. The case that the `partialPath` envelope is fully + # contained in `iv` results in `@[]` which is implicitely handled by + # non-matching any of the cases, below. + if env.minPt < iv.minPt: + let + envPt = env.minPt.to(NodeKey).hexaryPath(rootKey.to(RepairKey), db) + ivPt = iv.minPt.to(NodeKey).hexaryPath(rootKey.to(RepairKey), db) + when false: # or true: + echo ">>> ", + "\n ", envPt.pp(db), + "\n -----", + "\n ", ivPt.pp(db) + let rc = envPt.dismantleLeft ivPt + if rc.isErr: + return @[partialPath] + result &= rc.value + + if iv.maxPt < env.maxPt: + let + envPt = env.maxPt.to(NodeKey).hexaryPath(rootKey.to(RepairKey), db) + ivPt = iv.maxPt.to(NodeKey).hexaryPath(rootKey.to(RepairKey), db) + when false: # or true: + echo ">>> ", + "\n ", envPt.pp(db), + "\n -----", + "\n ", ivPt.pp(db) + let rc = envPt.dismantleRight ivPt + if rc.isErr: + return @[partialPath] + result &= rc.value + +proc dismantle*( + partialPath: Blob; ## Patrial path for existing node + rootKey: NodeKey; ## State root + iv: NodeTagRange; ## Proofed range of leaf paths + getFn: HexaryGetFn; ## Database abstraction + ): seq[Blob] + {.gcsafe, raises: [Defect,RlpError]} = + ## Variant of `dismantle()` for persistent database. + let env = partialPath.pathEnvelope + if iv.maxPt < env.minPt or env.maxPt < iv.minPt: + return @[partialPath] + + if env.minPt < iv.minPt: + let rc = dismantleLeft( + env.minPt.to(NodeKey).hexaryPath(rootKey, getFn), + iv.minPt.to(NodeKey).hexaryPath(rootKey, getFn)) + if rc.isErr: + return @[partialPath] + result &= rc.value + + if iv.maxPt < env.maxPt: + let rc = dismantleRight( + env.maxPt.to(NodeKey).hexaryPath(rootKey, getFn), + iv.maxPt.to(NodeKey).hexaryPath(rootKey, getFn)) + if rc.isErr: + return @[partialPath] + result &= rc.value + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/db/snapdb_check.nim b/nimbus/sync/snap/worker/db/notused/snapdb_check.nim similarity index 88% rename from nimbus/sync/snap/worker/db/snapdb_check.nim rename to nimbus/sync/snap/worker/db/notused/snapdb_check.nim index 8ed5d6a4a..0041b248f 100644 --- a/nimbus/sync/snap/worker/db/snapdb_check.nim +++ b/nimbus/sync/snap/worker/db/notused/snapdb_check.nim @@ -18,7 +18,8 @@ import ../../../../utils/prettify, ../../../sync_desc, "../.."/[range_desc, worker_desc], - "."/[hexary_desc, hexary_error, snapdb_accounts, snapdb_storage_slots] + "."/[hexary_desc, hexary_error, hexary_inspect, + snapdb_accounts, snapdb_storage_slots] {.push raises: [Defect].} @@ -45,7 +46,7 @@ proc accountsCtx( "{" & "pivot=" & "#" & $env.stateHeader.blockNumber & "," & "nAccounts=" & $env.nAccounts & "," & - ("covered=" & env.fetchAccounts.unprocessed.emptyFactor.toPC(0) & "/" & + ("covered=" & env.fetchAccounts.processed.fullFactor.toPC(0) & "/" & ctx.data.coveredAccounts.fullFactor.toPC(0)) & "," & "nCheckNodes=" & $env.fetchAccounts.checkNodes.len & "," & "nSickSubTries=" & $env.fetchAccounts.sickSubTries.len & "}" @@ -74,7 +75,7 @@ proc storageSlotsCtx( "inherit=" & (if data.inherit: "t" else: "f") & "," if not slots.isNil: result &= "" & - "covered=" & slots.unprocessed.emptyFactor.toPC(0) & + "covered=" & slots.processed.fullFactor.toPC(0) & "nCheckNodes=" & $slots.checkNodes.len & "," & "nSickSubTries=" & $slots.sickSubTries.len result &= "}" @@ -88,7 +89,7 @@ proc checkStorageSlotsTrie( accKey: NodeKey; storageRoot: Hash256; env: SnapPivotRef; - ): Result[bool,HexaryDbError] = + ): Result[bool,HexaryError] = ## Check whether a storage slots hexary trie is complete. let ctx = buddy.ctx @@ -106,7 +107,7 @@ proc checkStorageSlotsTrie( iterator accountsWalk( buddy: SnapBuddyRef; env: SnapPivotRef; - ): (NodeKey,Account,HexaryDbError) = + ): (NodeKey,Account,HexaryError) = let ctx = buddy.ctx db = ctx.data.snapDb @@ -157,18 +158,29 @@ proc checkAccountsTrieIsComplete*( ## Check whether accounts hexary trie is complete let ctx = buddy.ctx - db = ctx.data.snapDb peer = buddy.peer - stateRoot = env.stateHeader.stateRoot + db = ctx.data.snapDb + rootKey = env.stateHeader.stateRoot.to(NodeKey) + var + error: HexaryError - rc = db.inspectAccountsTrie(peer, stateRoot) + try: + let stats = db.getAccountFn.hexaryInspectTrie(rootKey, @[]) + if not stats.stopped: + return stats.dangling.len == 0 - if rc.isErr: - error logTxt "accounts health check failed", peer, - ctx=buddy.accountsCtx(env), error=rc.error - return false + error = TrieLoopAlert + except RlpError: + error = RlpEncoding + except KeyError as e: + raiseAssert "Not possible @ importRawAccountNodes: " & e.msg + except Exception as e: + raiseAssert "Ooops checkAccountsTrieIsComplete(): name=" & + $e.name & " msg=" & e.msg - rc.value.dangling.len == 0 + error logTxt "accounts health check failed", peer, + ctx=buddy.accountsCtx(env), error + return false proc checkAccountsListOk*( diff --git a/nimbus/sync/snap/worker/db/snapdb_accounts.nim b/nimbus/sync/snap/worker/db/snapdb_accounts.nim index 074edf5d8..51b7cf705 100644 --- a/nimbus/sync/snap/worker/db/snapdb_accounts.nim +++ b/nimbus/sync/snap/worker/db/snapdb_accounts.nim @@ -12,7 +12,7 @@ import std/[algorithm, sequtils, tables], chronicles, eth/[common, p2p, rlp, trie/nibbles], - stew/byteutils, + stew/[byteutils, interval_set], ../../range_desc, "."/[hexary_desc, hexary_error, hexary_import, hexary_interpolate, hexary_inspect, hexary_paths, snapdb_desc, snapdb_persistent] @@ -25,7 +25,6 @@ logScope: type SnapDbAccountsRef* = ref object of SnapDbBaseRef peer: Peer ## For log messages - getClsFn: AccountsGetFn ## Persistent database `get()` closure SnapAccountsGaps* = object innerGaps*: seq[NodeSpecs] @@ -44,12 +43,6 @@ proc to(h: Hash256; T: type NodeKey): T = proc convertTo(data: openArray[byte]; T: type Hash256): T = discard result.data.NodeKey.init(data) # size error => zero -proc getFn(ps: SnapDbAccountsRef): HexaryGetFn = - ## Derive from `GetClsFn` closure => `HexaryGetFn`. There reason for that - ## seemingly redundant mapping is that here is space for additional localised - ## and locked parameters as done with the `StorageSlotsGetFn`. - return proc(key: openArray[byte]): Blob = ps.getClsFn(key) - template noKeyError(info: static[string]; code: untyped) = try: code @@ -75,7 +68,7 @@ template noRlpExceptionOops(info: static[string]; code: untyped) = proc persistentAccounts( db: HexaryTreeDbRef; ## Current table ps: SnapDbAccountsRef; ## For persistent database - ): Result[void,HexaryDbError] + ): Result[void,HexaryError] {.gcsafe, raises: [Defect,OSError,KeyError].} = ## Store accounts trie table on databse if ps.rockDb.isNil: @@ -91,7 +84,7 @@ proc collectAccounts( peer: Peer, ## for log messages base: NodeTag; acc: seq[PackedAccount]; - ): Result[seq[RLeafSpecs],HexaryDbError] + ): Result[seq[RLeafSpecs],HexaryError] {.gcsafe, raises: [Defect, RlpError].} = ## Repack account records into a `seq[RLeafSpecs]` queue. The argument data ## `acc` are as received with the snap message `AccountRange`). @@ -137,11 +130,9 @@ proc init*( peer: Peer = nil ): T = ## Constructor, starts a new accounts session. - let db = pv.kvDb new result result.init(pv, root.to(NodeKey)) result.peer = peer - result.getClsFn = db.persistentAccountsGetFn() proc dup*( ps: SnapDbAccountsRef; @@ -167,27 +158,15 @@ proc dup*( # Public functions # ------------------------------------------------------------------------------ -proc nodeExists*( - ps: SnapDbAccountsRef; ## Re-usable session descriptor - node: NodeSpecs; ## Node specs, e.g. returned by `importAccounts()` - persistent = false; ## Whether to check data on disk - ): bool = - ## .. - if not persistent: - return ps.hexaDb.tab.hasKey(node.nodeKey.to(RepairKey)) - try: - return 0 < ps.getFn()(node.nodeKey.ByteArray32).len - except Exception as e: - raiseAssert "Not possible @ importAccounts(" & $e.name & "):" & e.msg +proc getAccountFn*(ps: SnapDbAccountsRef): HexaryGetFn = + ## Return `HexaryGetFn` closure. + let getFn = ps.kvDb.persistentAccountsGetFn() + return proc(key: openArray[byte]): Blob = getFn(key) -proc nodeExists*( - pv: SnapDbRef; ## Base descriptor on `BaseChainDB` - peer: Peer; ## For log messages - root: Hash256; ## State root - node: NodeSpecs; ## Node specs, e.g. returned by `importAccounts()` - ): bool = - ## Variant of `nodeExists()` for presistent storage, only. - SnapDbAccountsRef.init(pv, root, peer).nodeExists(node, persistent=true) +proc getAccountFn*(pv: SnapDbRef): HexaryGetFn = + ## Variant of `getAccountFn()` + let getFn = pv.kvDb.persistentAccountsGetFn() + return proc(key: openArray[byte]): Blob = getFn(key) proc importAccounts*( @@ -196,7 +175,7 @@ proc importAccounts*( data: PackedAccountRange; ## Re-packed `snap/1 ` reply data persistent = false; ## Store data on disk noBaseBoundCheck = false; ## Ignore left boundary proof check if `true` - ): Result[SnapAccountsGaps,HexaryDbError] = + ): Result[SnapAccountsGaps,HexaryError] = ## Validate and import accounts (using proofs as received with the snap ## message `AccountRange`). This function accumulates data in a memory table ## which can be written to disk with the argument `persistent` set `true`. @@ -243,9 +222,10 @@ proc importAccounts*( ## ## Note that the `peer` argument is for log messages, only. var - accounts: seq[RLeafSpecs] - outside: seq[NodeSpecs] - gaps: SnapAccountsGaps + accounts: seq[RLeafSpecs] # validated accounts to add to database + gaps: SnapAccountsGaps # return value + proofStats: TrieNodeStat # `proof` data dangling links + innerSubTrie: seq[NodeSpecs] # internal, collect dangling links try: if 0 < data.proof.len: let rc = ps.mergeProofs(ps.peer, data.proof) @@ -257,24 +237,25 @@ proc importAccounts*( return err(rc.error) accounts = rc.value + # Inspect trie for dangling nodes from prrof data (if any.) + if 0 < data.proof.len: + proofStats = ps.hexaDb.hexaryInspectTrie(ps.root, @[]) + if 0 < accounts.len: - var innerSubTrie: seq[NodeSpecs] if 0 < data.proof.len: # Inspect trie for dangling nodes. This is not a big deal here as the # proof data is typically small. - let - proofStats = ps.hexaDb.hexaryInspectTrie(ps.root, @[]) - topTag = accounts[^1].pathTag + let topTag = accounts[^1].pathTag for w in proofStats.dangling: - if base <= w.partialPath.max(NodeKey).to(NodeTag) and - w.partialPath.min(NodeKey).to(NodeTag) <= topTag: - # Extract dangling links which are inside the accounts range - innerSubTrie.add w + let iv = w.partialPath.pathEnvelope + if iv.maxPt < base or topTag < iv.minPt: + # Dangling link with partial path envelope outside accounts range + gaps.dangling.add w else: - # Otherwise register outside links - outside.add w + # Overlapping partial path envelope. + innerSubTrie.add w - # Build partial hexary trie + # Build partial or full hexary trie let rc = ps.hexaDb.hexaryInterpolate( ps.root, accounts, bootstrap = (data.proof.len == 0)) if rc.isErr: @@ -284,35 +265,35 @@ proc importAccounts*( # trie (if any). let bottomTag = accounts[0].pathTag for w in innerSubTrie: - if ps.hexaDb.tab.hasKey(w.nodeKey.to(RepairKey)): - continue - # Verify that `base` is to the left of the first account and there is - # nothing in between. Without proof, there can only be a complete - # set/list of accounts. There must be a proof for an empty list. - if not noBaseBoundCheck and - w.partialPath.max(NodeKey).to(NodeTag) < bottomTag: - return err(LowerBoundProofError) - # Otherwise register left over entry - gaps.innerGaps.add w + if not ps.hexaDb.tab.hasKey(w.nodeKey.to(RepairKey)): + if not noBaseBoundCheck: + # Verify that `base` is to the left of the first account and there + # is nothing in between. + # + # Without `proof` data available there can only be a complete + # set/list of accounts so there are no dangling nodes in the first + # place. But there must be `proof` data for an empty list. + if w.partialPath.pathEnvelope.maxPt < bottomTag: + return err(LowerBoundProofError) + # Otherwise register left over entry + gaps.innerGaps.add w if persistent: let rc = ps.hexaDb.persistentAccounts(ps) if rc.isErr: return err(rc.error) - # Verify outer links against database - let getFn = ps.getFn - for w in outside: - if w.nodeKey.ByteArray32.getFn().len == 0: - gaps.dangling.add w - else: - for w in outside: - if not ps.hexaDb.tab.hasKey(w.nodeKey.to(RepairKey)): - gaps.dangling.add w elif data.proof.len == 0: # There must be a proof for an empty argument list. return err(LowerBoundProofError) + else: + if not noBaseBoundCheck: + for w in proofStats.dangling: + if base <= w.partialPath.pathEnvelope.maxPt: + return err(LowerBoundProofError) + gaps.dangling = proofStats.dangling + except RlpError: return err(RlpEncoding) except KeyError as e: @@ -338,7 +319,7 @@ proc importAccounts*( base: NodeTag; ## Before or at first account entry in `data` data: PackedAccountRange; ## Re-packed `snap/1 ` reply data noBaseBoundCheck = false; ## Ignore left bound proof check if `true` - ): Result[SnapAccountsGaps,HexaryDbError] = + ): Result[SnapAccountsGaps,HexaryError] = ## Variant of `importAccounts()` for presistent storage, only. SnapDbAccountsRef.init( pv, root, peer).importAccounts( @@ -423,82 +404,16 @@ proc importRawAccountsNodes*( nodes, reportNodes, persistent=true) -proc inspectAccountsTrie*( - ps: SnapDbAccountsRef; ## Re-usable session descriptor - pathList = seq[Blob].default; ## Starting nodes for search - resumeCtx: TrieNodeStatCtxRef = nil; ## Context for resuming inspection - suspendAfter = high(uint64); ## To be resumed - persistent = false; ## Read data from disk - ignoreError = false; ## Always return partial results if any - ): Result[TrieNodeStat, HexaryDbError] = - ## Starting with the argument list `pathSet`, find all the non-leaf nodes in - ## the hexary trie which have at least one node key reference missing in - ## the trie database. Argument `pathSet` list entries that do not refer to a - ## valid node are silently ignored. - ## - ## Trie inspection can be automatically suspended after having visited - ## `suspendAfter` nodes to be resumed at the last state. An application of - ## this feature would look like - ## :: - ## var ctx = TrieNodeStatCtxRef() - ## while not ctx.isNil: - ## let rc = inspectAccountsTrie(.., resumeCtx=ctx, suspendAfter=1024) - ## ... - ## ctx = rc.value.resumeCtx - ## - let peer = ps.peer - var stats: TrieNodeStat - noRlpExceptionOops("inspectAccountsTrie()"): - if persistent: - stats = ps.getFn.hexaryInspectTrie( - ps.root, pathList, resumeCtx, suspendAfter=suspendAfter) - else: - stats = ps.hexaDb.hexaryInspectTrie( - ps.root, pathList, resumeCtx, suspendAfter=suspendAfter) - - block checkForError: - var error = TrieIsEmpty - if stats.stopped: - error = TrieLoopAlert - trace "Inspect account trie failed", peer, nPathList=pathList.len, - nDangling=stats.dangling.len, stoppedAt=stats.level, error - elif 0 < stats.level: - break checkForError - if ignoreError: - return ok(stats) - return err(error) - - #when extraTraceMessages: - # trace "Inspect account trie ok", peer, nPathList=pathList.len, - # nDangling=stats.dangling.len, level=stats.level - - return ok(stats) - -proc inspectAccountsTrie*( - pv: SnapDbRef; ## Base descriptor on `BaseChainDB` - peer: Peer; ## For log messages, only - root: Hash256; ## state root - pathList = seq[Blob].default; ## Starting paths for search - resumeCtx: TrieNodeStatCtxRef = nil; ## Context for resuming inspection - suspendAfter = high(uint64); ## To be resumed - ignoreError = false; ## Always return partial results if any - ): Result[TrieNodeStat, HexaryDbError] = - ## Variant of `inspectAccountsTrie()` for persistent storage. - SnapDbAccountsRef.init( - pv, root, peer).inspectAccountsTrie( - pathList, resumeCtx, suspendAfter, persistent=true, ignoreError) - - proc getAccountsNodeKey*( ps: SnapDbAccountsRef; ## Re-usable session descriptor path: Blob; ## Partial node path persistent = false; ## Read data from disk - ): Result[NodeKey,HexaryDbError] = + ): Result[NodeKey,HexaryError] = ## For a partial node path argument `path`, return the raw node key. var rc: Result[NodeKey,void] noRlpExceptionOops("getAccountsNodeKey()"): if persistent: - rc = ps.getFn.hexaryInspectPath(ps.root, path) + rc = ps.getAccountFn.hexaryInspectPath(ps.root, path) else: rc = ps.hexaDb.hexaryInspectPath(ps.root, path) if rc.isOk: @@ -509,7 +424,7 @@ proc getAccountsNodeKey*( pv: SnapDbRef; ## Base descriptor on `BaseChainDB` root: Hash256; ## state root path: Blob; ## Partial node path - ): Result[NodeKey,HexaryDbError] = + ): Result[NodeKey,HexaryError] = ## Variant of `getAccountsNodeKey()` for persistent storage. SnapDbAccountsRef.init( pv, root, Peer()).getAccountsNodeKey(path, persistent=true) @@ -519,7 +434,7 @@ proc getAccountsData*( ps: SnapDbAccountsRef; ## Re-usable session descriptor path: NodeKey; ## Account to visit persistent = false; ## Read data from disk - ): Result[Account,HexaryDbError] = + ): Result[Account,HexaryError] = ## Fetch account data. ## ## Caveat: There is no unit test yet for the non-persistent version @@ -528,7 +443,7 @@ proc getAccountsData*( noRlpExceptionOops("getAccountData()"): var leaf: Blob if persistent: - leaf = path.hexaryPath(ps.root, ps.getFn).leafData + leaf = path.hexaryPath(ps.root, ps.getAccountFn).leafData else: leaf = path.hexaryPath(ps.root.to(RepairKey),ps.hexaDb).leafData @@ -542,7 +457,7 @@ proc getAccountsData*( pv: SnapDbRef; ## Base descriptor on `BaseChainDB` root: Hash256; ## State root path: NodeKey; ## Account to visit - ): Result[Account,HexaryDbError] = + ): Result[Account,HexaryError] = ## Variant of `getAccountsData()` for persistent storage. SnapDbAccountsRef.init( pv, root, Peer()).getAccountsData(path, persistent=true) @@ -576,20 +491,20 @@ proc sortMerge*(acc: openArray[seq[PackedAccount]]): seq[PackedAccount] = proc getAccountsChainDb*( ps: SnapDbAccountsRef; accKey: NodeKey; - ): Result[Account,HexaryDbError] = + ): Result[Account,HexaryError] = ## Fetch account via `BaseChainDB` ps.getAccountsData(accKey, persistent = true) proc nextAccountsChainDbKey*( ps: SnapDbAccountsRef; accKey: NodeKey; - ): Result[NodeKey,HexaryDbError] = + ): Result[NodeKey,HexaryError] = ## Fetch the account path on the `BaseChainDB`, the one next to the ## argument account key. noRlpExceptionOops("getChainDbAccount()"): let path = accKey - .hexaryPath(ps.root, ps.getFn) - .next(ps.getFn) + .hexaryPath(ps.root, ps.getAccountFn) + .next(ps.getAccountFn) .getNibbles if 64 == path.len: return ok(path.getBytes.convertTo(Hash256).to(NodeKey)) @@ -599,13 +514,13 @@ proc nextAccountsChainDbKey*( proc prevAccountsChainDbKey*( ps: SnapDbAccountsRef; accKey: NodeKey; - ): Result[NodeKey,HexaryDbError] = + ): Result[NodeKey,HexaryError] = ## Fetch the account path on the `BaseChainDB`, the one before to the ## argument account. noRlpExceptionOops("getChainDbAccount()"): let path = accKey - .hexaryPath(ps.root, ps.getFn) - .prev(ps.getFn) + .hexaryPath(ps.root, ps.getAccountFn) + .prev(ps.getAccountFn) .getNibbles if 64 == path.len: return ok(path.getBytes.convertTo(Hash256).to(NodeKey)) diff --git a/nimbus/sync/snap/worker/db/snapdb_desc.nim b/nimbus/sync/snap/worker/db/snapdb_desc.nim index cd1d607f4..bd4148095 100644 --- a/nimbus/sync/snap/worker/db/snapdb_desc.nim +++ b/nimbus/sync/snap/worker/db/snapdb_desc.nim @@ -214,7 +214,7 @@ proc mergeProofs*( peer: Peer; ## For log messages proof: seq[Blob]; ## Node records freeStandingOk = false; ## Remove freestanding nodes - ): Result[void,HexaryDbError] + ): Result[void,HexaryError] {.gcsafe, raises: [Defect,RlpError,KeyError].} = ## Import proof records (as received with snap message) into a hexary trie ## of the repair table. These hexary trie records can be extended to a full @@ -253,7 +253,7 @@ proc verifyLowerBound*( peer: Peer; ## For log messages base: NodeTag; ## Before or at first account entry in `data` first: NodeTag; ## First account key - ): Result[void,HexaryDbError] + ): Result[void,HexaryError] {.gcsafe, raises: [Defect, KeyError].} = ## Verify that `base` is to the left of the first leaf entry and there is ## nothing in between. @@ -278,7 +278,7 @@ proc verifyNoMoreRight*( ps: SnapDbBaseRef; ## Database session descriptor peer: Peer; ## For log messages base: NodeTag; ## Before or at first account entry in `data` - ): Result[void,HexaryDbError] + ): Result[void,HexaryError] {.gcsafe, raises: [Defect, KeyError].} = ## Verify that there is are no more leaf entries to the right of and ## including `base`. diff --git a/nimbus/sync/snap/worker/db/snapdb_persistent.nim b/nimbus/sync/snap/worker/db/snapdb_persistent.nim index 7463b522d..4873360e8 100644 --- a/nimbus/sync/snap/worker/db/snapdb_persistent.nim +++ b/nimbus/sync/snap/worker/db/snapdb_persistent.nim @@ -90,7 +90,7 @@ proc persistentStorageSlotsGetFn*(db: TrieDatabaseRef): StorageSlotsGetFn = proc persistentStateRootGet*( db: TrieDatabaseRef; root: NodeKey; - ): Result[StateRootRegistry,HexaryDbError] = + ): Result[StateRootRegistry,HexaryError] = ## Implements a `get()` function for returning state root registry data. let rlpBlob = db.stateRootGet(root) if 0 < rlpBlob.len: @@ -107,7 +107,7 @@ proc persistentStateRootGet*( proc persistentAccountsPut*( db: HexaryTreeDbRef; base: TrieDatabaseRef - ): Result[void,HexaryDbError] = + ): Result[void,HexaryError] = ## Bulk store using transactional `put()` let dbTx = base.beginTransaction defer: dbTx.commit @@ -123,7 +123,7 @@ proc persistentAccountsPut*( proc persistentStorageSlotsPut*( db: HexaryTreeDbRef; base: TrieDatabaseRef - ): Result[void,HexaryDbError] = + ): Result[void,HexaryError] = ## Bulk store using transactional `put()` let dbTx = base.beginTransaction defer: dbTx.commit @@ -179,7 +179,7 @@ proc persistentStateRootPut*( proc persistentAccountsPut*( db: HexaryTreeDbRef; rocky: RocksStoreRef - ): Result[void,HexaryDbError] + ): Result[void,HexaryError] {.gcsafe, raises: [Defect,OSError,KeyError].} = ## SST based bulk load on `rocksdb`. if rocky.isNil: @@ -228,7 +228,7 @@ proc persistentAccountsPut*( proc persistentStorageSlotsPut*( db: HexaryTreeDbRef; rocky: RocksStoreRef - ): Result[void,HexaryDbError] + ): Result[void,HexaryError] {.gcsafe, raises: [Defect,OSError,KeyError].} = ## SST based bulk load on `rocksdb`. if rocky.isNil: diff --git a/nimbus/sync/snap/worker/db/snapdb_pivot.nim b/nimbus/sync/snap/worker/db/snapdb_pivot.nim index 4ff473031..3cc72ebcc 100644 --- a/nimbus/sync/snap/worker/db/snapdb_pivot.nim +++ b/nimbus/sync/snap/worker/db/snapdb_pivot.nim @@ -47,7 +47,7 @@ template handleRlpException(info: static[string]; code: untyped) = proc savePivot*( pv: SnapDbRef; ## Base descriptor on `BaseChainDB` data: SnapDbPivotRegistry; ## Registered data record - ): Result[int,HexaryDbError] = + ): Result[int,HexaryError] = ## Register pivot environment handleRlpException("savePivot()"): let rlpData = rlp.encode(data) @@ -58,7 +58,7 @@ proc savePivot*( proc recoverPivot*( pv: SnapDbRef; ## Base descriptor on `BaseChainDB` stateRoot: NodeKey; ## Check for a particular state root - ): Result[SnapDbPivotRegistry,HexaryDbError] = + ): Result[SnapDbPivotRegistry,HexaryError] = ## Restore pivot environment for a particular state root. let rc = pv.kvDb.persistentStateRootGet(stateRoot) if rc.isOk: @@ -70,7 +70,7 @@ proc recoverPivot*( proc recoverPivot*( pv: SnapDbRef; ## Base descriptor on `BaseChainDB` - ): Result[SnapDbPivotRegistry,HexaryDbError] = + ): Result[SnapDbPivotRegistry,HexaryError] = ## Restore pivot environment that was saved latest. let rc = pv.kvDb.persistentStateRootGet(NodeKey.default) if rc.isOk: diff --git a/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim b/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim index 608c954a1..9e4ed10d4 100644 --- a/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim +++ b/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim @@ -12,6 +12,7 @@ import std/tables, chronicles, eth/[common, p2p, rlp], + stew/interval_set, ../../../protocol, ../../range_desc, "."/[hexary_desc, hexary_error, hexary_import, hexary_inspect, @@ -29,7 +30,6 @@ type SnapDbStorageSlotsRef* = ref object of SnapDbBaseRef peer: Peer ## For log messages accKey: NodeKey ## Accounts address hash (curr.unused) - getClsFn: StorageSlotsGetFn ## Persistent database `get()` closure # ------------------------------------------------------------------------------ # Private helpers @@ -41,10 +41,6 @@ proc to(h: Hash256; T: type NodeKey): T = proc convertTo(data: openArray[byte]; T: type Hash256): T = discard result.data.NodeKey.init(data) # size error => zero -proc getFn(ps: SnapDbStorageSlotsRef; accKey: NodeKey): HexaryGetFn = - ## Capture `accKey` argument for `GetClsFn` closure => `HexaryGetFn` - return proc(key: openArray[byte]): Blob = ps.getClsFn(accKey,key) - template noKeyError(info: static[string]; code: untyped) = try: @@ -81,7 +77,7 @@ template noGenericExOrKeyError(info: static[string]; code: untyped) = proc persistentStorageSlots( db: HexaryTreeDbRef; ## Current table ps: SnapDbStorageSlotsRef; ## For persistent database - ): Result[void,HexaryDbError] + ): Result[void,HexaryError] {.gcsafe, raises: [Defect,OSError,KeyError].} = ## Store accounts trie table on databse if ps.rockDb.isNil: @@ -97,7 +93,7 @@ proc collectStorageSlots( peer: Peer; ## for log messages base: NodeTag; ## before or at first account entry in `data` slotLists: seq[SnapStorage]; - ): Result[seq[RLeafSpecs],HexaryDbError] + ): Result[seq[RLeafSpecs],HexaryError] {.gcsafe, raises: [Defect, RlpError].} = ## Similar to `collectAccounts()` var rcSlots: seq[RLeafSpecs] @@ -140,15 +136,17 @@ proc importStorageSlots( data: AccountSlots; ## Account storage descriptor proof: SnapStorageProof; ## Storage slots proof data noBaseBoundCheck = false; ## Ignore left boundary proof check if `true` - ): Result[seq[NodeSpecs],HexaryDbError] + ): Result[seq[NodeSpecs],HexaryError] {.gcsafe, raises: [Defect,RlpError,KeyError].} = ## Process storage slots for a particular storage root. See `importAccounts()` ## for comments on the return value. let tmpDb = SnapDbBaseRef.init(ps, data.account.storageRoot.to(NodeKey)) var - slots: seq[RLeafSpecs] - dangling: seq[NodeSpecs] + slots: seq[RLeafSpecs] # validated slots to add to database + dangling: seq[NodeSpecs] # return value + proofStats: TrieNodeStat # `proof` data dangling links + innerSubTrie: seq[NodeSpecs] # internal, collect dangling links if 0 < proof.len: let rc = tmpDb.mergeProofs(ps.peer, proof) if rc.isErr: @@ -160,17 +158,17 @@ proc importStorageSlots( slots = rc.value if 0 < slots.len: - var innerSubTrie: seq[NodeSpecs] if 0 < proof.len: # Inspect trie for dangling nodes. This is not a big deal here as the # proof data is typically small. - let - proofStats = ps.hexaDb.hexaryInspectTrie(ps.root, @[]) - topTag = slots[^1].pathTag + let topTag = slots[^1].pathTag for w in proofStats.dangling: - if base <= w.partialPath.max(NodeKey).to(NodeTag) and - w.partialPath.min(NodeKey).to(NodeTag) <= topTag: - # Extract dangling links which are inside the accounts range + let iv = w.partialPath.pathEnvelope + if iv.maxPt < base or topTag < iv.minPt: + # Dangling link with partial path envelope outside accounts range + discard + else: + # Overlapping partial path envelope. innerSubTrie.add w # Build partial hexary trie @@ -183,16 +181,18 @@ proc importStorageSlots( # trie (if any). let bottomTag = slots[0].pathTag for w in innerSubTrie: - if ps.hexaDb.tab.hasKey(w.nodeKey.to(RepairKey)): - continue - # Verify that `base` is to the left of the first slot and there is - # nothing in between. Without proof, there can only be a complete - # set/list of slots. There must be a proof for an empty list. - if not noBaseBoundCheck and - w.partialPath.max(NodeKey).to(NodeTag) < bottomTag: - return err(LowerBoundProofError) - # Otherwise register left over entry - dangling.add w + if not ps.hexaDb.tab.hasKey(w.nodeKey.to(RepairKey)): + if not noBaseBoundCheck: + # Verify that `base` is to the left of the first slot and there is + # nothing in between. + # + # Without `proof` data available there can only be a complete + # set/list of accounts so there are no dangling nodes in the first + # place. But there must be `proof` data for an empty list. + if w.partialPath.pathEnvelope.maxPt < bottomTag: + return err(LowerBoundProofError) + # Otherwise register left over entry + dangling.add w # Commit to main descriptor for k,v in tmpDb.hexaDb.tab.pairs: @@ -204,6 +204,13 @@ proc importStorageSlots( # There must be a proof for an empty argument list. return err(LowerBoundProofError) + else: + if not noBaseBoundCheck: + for w in proofStats.dangling: + if base <= w.partialPath.pathEnvelope.maxPt: + return err(LowerBoundProofError) + dangling = proofStats.dangling + ok(dangling) # ------------------------------------------------------------------------------ @@ -224,12 +231,27 @@ proc init*( result.init(pv, root.to(NodeKey)) result.peer = peer result.accKey = accKey - result.getClsFn = db.persistentStorageSlotsGetFn() # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ +proc getStorageSlotsFn*( + ps: SnapDbStorageSlotsRef; + ): HexaryGetFn = + ## Return `HexaryGetFn` closure. + let getFn = ps.kvDb.persistentStorageSlotsGetFn() + return proc(key: openArray[byte]): Blob = getFn(ps.accKey, key) + +proc getStorageSlotsFn*( + pv: SnapDbRef; + accKey: NodeKey; + ): HexaryGetFn = + ## Variant of `getStorageSlotsFn()` for captured `accKey` argument. + let getFn = pv.kvDb.persistentStorageSlotsGetFn() + return proc(key: openArray[byte]): Blob = getFn(accKey, key) + + proc importStorageSlots*( ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor data: AccountStorageRange; ## Account storage reply from `snap/1` protocol @@ -407,7 +429,7 @@ proc inspectStorageSlotsTrie*( suspendAfter = high(uint64); ## To be resumed persistent = false; ## Read data from disk ignoreError = false; ## Always return partial results if any - ): Result[TrieNodeStat, HexaryDbError] = + ): Result[TrieNodeStat, HexaryError] = ## Starting with the argument list `pathSet`, find all the non-leaf nodes in ## the hexary trie which have at least one node key reference missing in ## the trie database. Argument `pathSet` list entries that do not refer to a @@ -427,7 +449,7 @@ proc inspectStorageSlotsTrie*( var stats: TrieNodeStat noRlpExceptionOops("inspectStorageSlotsTrie()"): if persistent: - stats = ps.getFn(ps.accKey).hexaryInspectTrie( + stats = ps.getStorageSlotsFn.hexaryInspectTrie( ps.root, pathList, resumeCtx, suspendAfter=suspendAfter) else: stats = ps.hexaDb.hexaryInspectTrie( @@ -460,7 +482,7 @@ proc inspectStorageSlotsTrie*( resumeCtx: TrieNodeStatCtxRef = nil; ## Context for resuming inspection suspendAfter = high(uint64); ## To be resumed ignoreError = false; ## Always return partial results if any - ): Result[TrieNodeStat, HexaryDbError] = + ): Result[TrieNodeStat, HexaryError] = ## Variant of `inspectStorageSlotsTrieTrie()` for persistent storage. SnapDbStorageSlotsRef.init( pv, accKey, root, peer).inspectStorageSlotsTrie( @@ -471,12 +493,12 @@ proc getStorageSlotsNodeKey*( ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor path: Blob; ## Partial node path persistent = false; ## Read data from disk - ): Result[NodeKey,HexaryDbError] = + ): Result[NodeKey,HexaryError] = ## For a partial node path argument `path`, return the raw node key. var rc: Result[NodeKey,void] noRlpExceptionOops("getStorageSlotsNodeKey()"): if persistent: - rc = ps.getFn(ps.accKey).hexaryInspectPath(ps.root, path) + rc = ps.getStorageSlotsFn.hexaryInspectPath(ps.root, path) else: rc = ps.hexaDb.hexaryInspectPath(ps.root, path) if rc.isOk: @@ -489,7 +511,7 @@ proc getStorageSlotsNodeKey*( accKey: NodeKey; ## Account key root: Hash256; ## state root path: Blob; ## Partial node path - ): Result[NodeKey,HexaryDbError] = + ): Result[NodeKey,HexaryError] = ## Variant of `getStorageSlotsNodeKey()` for persistent storage. SnapDbStorageSlotsRef.init( pv, accKey, root, peer).getStorageSlotsNodeKey(path, persistent=true) @@ -499,7 +521,7 @@ proc getStorageSlotsData*( ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor path: NodeKey; ## Account to visit persistent = false; ## Read data from disk - ): Result[Account,HexaryDbError] = + ): Result[Account,HexaryError] = ## Fetch storage slots data. ## ## Caveat: There is no unit test yet @@ -509,9 +531,9 @@ proc getStorageSlotsData*( noRlpExceptionOops("getStorageSlotsData()"): var leaf: Blob if persistent: - leaf = path.hexaryPath(ps.root, ps.getFn(ps.accKey)).leafData + leaf = path.hexaryPath(ps.root, ps.getStorageSlotsFn).leafData else: - leaf = path.hexaryPath(ps.root.to(RepairKey),ps.hexaDb).leafData + leaf = path.hexaryPath(ps.root.to(RepairKey), ps.hexaDb).leafData if leaf.len == 0: return err(AccountNotFound) @@ -525,7 +547,7 @@ proc getStorageSlotsData*( accKey: NodeKey; ## Account key root: Hash256; ## state root path: NodeKey; ## Account to visit - ): Result[Account,HexaryDbError] = + ): Result[Account,HexaryError] = ## Variant of `getStorageSlotsData()` for persistent storage. SnapDbStorageSlotsRef.init( pv, accKey, root, peer).getStorageSlotsData(path, persistent=true) @@ -541,8 +563,7 @@ proc haveStorageSlotsData*( ## Caveat: There is no unit test yet noGenericExOrKeyError("haveStorageSlotsData()"): if persistent: - let getFn = ps.getFn(ps.accKey) - return 0 < ps.root.ByteArray32.getFn().len + return 0 < ps.getStorageSlotsFn()(ps.root.ByteArray32).len else: return ps.hexaDb.tab.hasKey(ps.root.to(RepairKey)) diff --git a/nimbus/sync/snap/worker/heal_accounts.nim b/nimbus/sync/snap/worker/heal_accounts.nim index 1bc9dfa5c..fb9557cc9 100644 --- a/nimbus/sync/snap/worker/heal_accounts.nim +++ b/nimbus/sync/snap/worker/heal_accounts.nim @@ -14,17 +14,17 @@ ## Flow chart for healing algorithm ## -------------------------------- ## :: -## START with {state-root} +## START ## | ## | +--------------------------------+ ## | | | -## v v | -## | -## | | -## | +--------------------------+ | -## | | +--------------------+ | | -## | | | | | | -## v v v | | | +## | v | +## | | +## | | | +## | | +-----------------------+ | +## | | | +------------------+ | | +## | | | | | | | +## v v v v | | | ## {missing-nodes} | | | ## | | | | ## v | | | @@ -48,8 +48,6 @@ ## Legend: ## * `<..>`: some action, process, etc. ## * `{missing-nodes}`: list implemented as `env.fetchAccounts.sickSubTries` -## * `(state-root}`: implicit argument for `getAccountNodeKey()` when -## the argument list is empty ## * `{leaf-nodes}`: list is optimised out ## * `{check-nodes}`: list implemented as `env.fetchAccounts.checkNodes` ## * `{storage-roots}`: list implemented as pair of queues @@ -57,8 +55,8 @@ ## ## Discussion of flow chart ## ------------------------ -## * Input nodes for `` are checked for dangling child node -## links which in turn are collected as output. +## * If there are no missing nodes, START proceeds with the `` +## process. ## ## * Nodes of the `{missing-nodes}` list are fetched from the network and ## merged into the persistent accounts trie database. @@ -67,6 +65,9 @@ ## + Successfully merged leaf nodes are processed as single entry accounts ## node ranges. ## +## * Input nodes for `` are checked for dangling child node +## links which in turn are collected as output. +## ## * If there is a problem with a node travelling from the source list ## `{missing-nodes}` towards either target list `{leaf-nodes}` or ## `{check-nodes}`, this problem node will fed back to the `{missing-nodes}` @@ -115,7 +116,8 @@ import ../../sync_desc, ".."/[constants, range_desc, worker_desc], ./com/[com_error, get_trie_nodes], - ./db/[hexary_desc, hexary_error, snapdb_accounts] + ./db/[hexary_desc, hexary_error, snapdb_accounts], + ./sub_tries_helper {.push raises: [Defect].} @@ -141,7 +143,7 @@ proc healingCtx( "{" & "pivot=" & "#" & $env.stateHeader.blockNumber & "," & "nAccounts=" & $env.nAccounts & "," & - ("covered=" & env.fetchAccounts.unprocessed.emptyFactor.toPC(0) & "/" & + ("covered=" & env.fetchAccounts.processed.fullFactor.toPC(0) & "/" & ctx.data.coveredAccounts.fullFactor.toPC(0)) & "," & "nCheckNodes=" & $env.fetchAccounts.checkNodes.len & "," & "nSickSubTries=" & $env.fetchAccounts.sickSubTries.len & "}" @@ -150,32 +152,6 @@ proc healingCtx( # Private functions # ------------------------------------------------------------------------------ -proc verifyStillMissingNodes( - buddy: SnapBuddyRef; - env: SnapPivotRef; - ) = - ## Check whether previously missing nodes from the `sickSubTries` list - ## have been magically added to the database since it was checked last - ## time. These nodes will me moved to `checkNodes` for further processing. - let - ctx = buddy.ctx - db = ctx.data.snapDb - peer = buddy.peer - stateRoot = env.stateHeader.stateRoot - - var delayed: seq[NodeSpecs] - for w in env.fetchAccounts.sickSubTries: - if ctx.data.snapDb.nodeExists(peer, stateRoot, w): - # Check nodes for dangling links below - env.fetchAccounts.checkNodes.add w.partialPath - else: - # Node is still missing - delayed.add w - - # Must not modify sequence while looping over it - env.fetchAccounts.sickSubTries = env.fetchAccounts.sickSubTries & delayed - - proc updateMissingNodesList( buddy: SnapBuddyRef; env: SnapPivotRef; @@ -186,40 +162,23 @@ proc updateMissingNodesList( ## fed back to the vey same list `checkNodes` let ctx = buddy.ctx - db = ctx.data.snapDb peer = buddy.peer - stateRoot = env.stateHeader.stateRoot + db = ctx.data.snapDb - while env.fetchAccounts.sickSubTries.len < snapRequestTrieNodesFetchMax: - # Inspect hexary trie for dangling nodes - let rc = db.inspectAccountsTrie( - peer, stateRoot, - env.fetchAccounts.checkNodes, # start with these nodes - env.fetchAccounts.resumeCtx, # resume previous attempt - healInspectionBatch) # visit no more than this many nodes - if rc.isErr: - when extraTraceMessages: - error logTxt "failed => stop", peer, - ctx=buddy.healingCtx(env), error=rc.error - # Attempt to switch peers, there is not much else we can do here + let rc = await db.getAccountFn.subTriesFromPartialPaths( + env.stateHeader.stateRoot, # State root related to pivot + env.fetchAccounts, # Account download specs + snapRequestTrieNodesFetchMax) # Maxinmal datagram request size + if rc.isErr: + if rc.error == TrieIsLockedForPerusal: + trace logTxt "failed", peer, + ctx=buddy.healingCtx(env), error=rc.error + else: + error logTxt "failed => stop", peer, + ctx=buddy.healingCtx(env), error=rc.error + # Attempt to switch pivot, there is not much else one can do here buddy.ctrl.zombie = true - return false - - # Update context for async threading environment - env.fetchAccounts.resumeCtx = rc.value.resumeCtx - env.fetchAccounts.checkNodes.setLen(0) - - # Collect result - env.fetchAccounts.sickSubTries = - env.fetchAccounts.sickSubTries & rc.value.dangling - - # Done unless there is some resumption context - if rc.value.resumeCtx.isNil: - break - - # Allow async task switch and continue. Note that some other task might - # steal some of the `env.fetchAccounts.sickSubTries`. - await sleepAsync 1.nanoseconds + return false return true @@ -325,25 +284,17 @@ proc registerAccountLeaf( peer = buddy.peer pt = accKey.to(NodeTag) - # Find range set (from list) containing `pt` - var ivSet: NodeTagRangeSet - block foundCoveringRange: - for w in env.fetchAccounts.unprocessed: - if 0 < w.covered(pt,pt): - ivSet = w - break foundCoveringRange - return # already processed, forget this account leaf + # Register isolated leaf node + if 0 < env.fetchAccounts.processed.merge(pt,pt) : + env.nAccounts.inc + env.fetchAccounts.unprocessed.reduce(pt,pt) + discard buddy.ctx.data.coveredAccounts.merge(pt,pt) - # Register this isolated leaf node that was added - env.nAccounts.inc - discard ivSet.reduce(pt,pt) - discard buddy.ctx.data.coveredAccounts.merge(pt,pt) - - # Update storage slots batch - if acc.storageRoot != emptyRlpHash: - env.fetchStorageFull.merge AccountSlotsHeader( - acckey: accKey, - storageRoot: acc.storageRoot) + # Update storage slots batch + if acc.storageRoot != emptyRlpHash: + env.fetchStorageFull.merge AccountSlotsHeader( + acckey: accKey, + storageRoot: acc.storageRoot) # ------------------------------------------------------------------------------ # Private functions: do the healing for one round @@ -362,19 +313,19 @@ proc accountsHealingImpl( peer = buddy.peer # Update for changes since last visit - buddy.verifyStillMissingNodes(env) + try: + db.getAccountFn.subTriesNodesReclassify( + env.stateHeader.stateRoot.to(NodeKey), env.fetchAccounts) + except Exception as e: + raiseAssert "Not possible @ accountsHealingImpl(" & $e.name & "):" & e.msg - # If `checkNodes` is empty, healing is at the very start or was - # postponed in which case `sickSubTries` is non-empty. - if env.fetchAccounts.checkNodes.len != 0 or - env.fetchAccounts.sickSubTries.len == 0: - if not await buddy.updateMissingNodesList(env): - return 0 - - # Check whether the trie is complete. if env.fetchAccounts.sickSubTries.len == 0: - trace logTxt "complete", peer, ctx=buddy.healingCtx(env) - return 0 # nothing to do + # Traverse the hexary trie for more missing nodes. This call is expensive. + if await buddy.updateMissingNodesList(env): + # Check whether the trie is complete. + if env.fetchAccounts.sickSubTries.len == 0: + trace logTxt "complete", peer, ctx=buddy.healingCtx(env) + return 0 # nothing to do # Get next batch of nodes that need to be merged it into the database let nodeSpecs = await buddy.getMissingNodesFromNetwork(env) diff --git a/nimbus/sync/snap/worker/heal_storage_slots.nim b/nimbus/sync/snap/worker/heal_storage_slots.nim index 7a6740959..76bea35b6 100644 --- a/nimbus/sync/snap/worker/heal_storage_slots.nim +++ b/nimbus/sync/snap/worker/heal_storage_slots.nim @@ -34,7 +34,8 @@ import ../../sync_desc, ".."/[constants, range_desc, worker_desc], ./com/[com_error, get_trie_nodes], - ./db/[hexary_desc, hexary_error, snapdb_storage_slots] + ./db/[hexary_desc, hexary_error, snapdb_storage_slots], + ./sub_tries_helper {.push raises: [Defect].} @@ -71,7 +72,7 @@ proc healingCtx( proc acceptWorkItemAsIs( buddy: SnapBuddyRef; kvp: SnapSlotsQueuePair; - ): Result[bool, HexaryDbError] = + ): Result[bool,HexaryError] = ## Check whether this work item is done and the corresponding storage trie ## can be completely inherited. if kvp.data.inherit: @@ -140,36 +141,21 @@ proc updateMissingNodesList( storageRoot = kvp.key slots = kvp.data.slots - while slots.sickSubTries.len < snapRequestTrieNodesFetchMax: - # Inspect hexary trie for dangling nodes - let rc = db.inspectStorageSlotsTrie( - peer, accKey, storageRoot, - slots.checkNodes, # start with these nodes - slots.resumeCtx, # resume previous attempt - healStorageSlotsInspectionBatch) # visit no more than this many nodes - if rc.isErr: - when extraTraceMessages: - let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len - error logTxt "failed => stop", peer, itCtx=buddy.healingCtx(kvp,env), - nSlotLists=env.nSlotLists, nStorageQueue, error=rc.error - # Attempt to switch peers, there is not much else we can do here + let rc = await db.getStorageSlotsFn(accKey).subTriesFromPartialPaths( + storageRoot, # State root related to storage slots + slots, # Storage slots download specs + snapRequestTrieNodesFetchMax) # Maxinmal datagram request size + if rc.isErr: + let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len + if rc.error == TrieIsLockedForPerusal: + trace logTxt "failed", peer, itCtx=buddy.healingCtx(kvp,env), + nSlotLists=env.nSlotLists, nStorageQueue, error=rc.error + else: + error logTxt "failed => stop", peer, itCtx=buddy.healingCtx(kvp,env), + nSlotLists=env.nSlotLists, nStorageQueue, error=rc.error + # Attempt to switch pivot, there is not much else one can do here buddy.ctrl.zombie = true - return false - - # Update context for async threading environment - slots.resumeCtx = rc.value.resumeCtx - slots.checkNodes.setLen(0) - - # Collect result - slots.sickSubTries = slots.sickSubTries & rc.value.dangling - - # Done unless there is some resumption context - if rc.value.resumeCtx.isNil: - break - - # Allow async task switch and continue. Note that some other task might - # steal some of the `env.fetchAccounts.sickSubTries`. - await sleepAsync 1.nanoseconds + return false return true diff --git a/nimbus/sync/snap/worker/pivot_helper.nim b/nimbus/sync/snap/worker/pivot_helper.nim index 92c0677fd..70b04d61b 100644 --- a/nimbus/sync/snap/worker/pivot_helper.nim +++ b/nimbus/sync/snap/worker/pivot_helper.nim @@ -36,6 +36,11 @@ proc init(batch: SnapRangeBatchRef; ctx: SnapCtxRef) = batch.unprocessed.init() batch.processed = NodeTagRangeSet.init() + # Initialise partial path the envelope of which covers the full range of + # account keys `0..high(NodeTag)`. This will trigger healing on the full + # range all possible keys. + batch.checkNodes.add @[0.byte] + # Initialise accounts range fetch batch, the pair of `fetchAccounts[]` # range sets. if ctx.data.coveredAccounts.isFull: @@ -123,9 +128,19 @@ proc update*( let rc = pivotTable.secondKey if rc.isOk: pivotTable.del rc.value + + # Update healing threshold for top pivot entry + topEnv = pivotTable.lastValue.value + else: discard pivotTable.lruAppend(header.stateRoot, env, ctx.buddiesMax) + # Update healing threshold + let + slots = max(0, healAccountsPivotTriggerNMax - pivotTable.len) + delta = slots.float * healAccountsPivotTriggerWeight + topEnv.healThresh = healAccountsPivotTriggerMinFactor + delta + proc tickerStats*( pivotTable: var SnapPivotTable; ## Pivot table @@ -189,13 +204,39 @@ proc tickerStats*( # Public functions: particular pivot # ------------------------------------------------------------------------------ +proc pivotAccountsComplete*( + env: SnapPivotRef; ## Current pivot environment + ): bool = + ## Returns `true` if accounts are fully available for this this pivot. + env.fetchAccounts.processed.isFull + + +proc pivotAccountsHealingOk*( + env: SnapPivotRef; ## Current pivot environment + ctx: SnapCtxRef; ## Some global context + ): bool = + ## Returns `true` if accounts healing is enabled for this pivot. + ## + if not env.pivotAccountsComplete(): + # Only start accounts healing if there is some completion level, already. + # + # We check against the global coverage factor, i.e. a measure for how much + # of the total of all accounts have been processed. Even if the hexary trie + # database for the current pivot state root is sparsely filled, there is a + # good chance that it can inherit some unchanged sub-trie from an earlier + # pivot state root download. The healing process then works like sort of + # glue. + if healAccountsCoverageTrigger <= ctx.data.coveredAccounts.fullFactor: + # Ditto for pivot. + if env.healThresh <= env.fetchAccounts.processed.fullFactor: + return true + + proc execSnapSyncAction*( env: SnapPivotRef; ## Current pivot environment buddy: SnapBuddyRef; ## Worker peer - ): Future[bool] - {.async.} = - ## Execute a synchronisation run. The return code is `true` if a full - ## synchronisation cycle could be executed. + ) {.async.} = + ## Execute a synchronisation run. let ctx = buddy.ctx @@ -205,58 +246,34 @@ proc execSnapSyncAction*( if snapStorageSlotsQuPrioThresh < nStoQu: await buddy.rangeFetchStorageSlots(env) if buddy.ctrl.stopped or env.obsolete: - return false + return - if env.accountsState != HealerDone: + if not env.pivotAccountsComplete(): await buddy.rangeFetchAccounts(env) if buddy.ctrl.stopped or env.obsolete: - return false + return await buddy.rangeFetchStorageSlots(env) if buddy.ctrl.stopped or env.obsolete: - return false + return - if not ctx.data.accountsHealing: - # Only start healing if there is some completion level, already. - # - # We check against the global coverage factor, i.e. a measure for how - # much of the total of all accounts have been processed. Even if the - # hexary trie database for the current pivot state root is sparsely - # filled, there is a good chance that it can inherit some unchanged - # sub-trie from an earlier pivor state root download. The healing - # process then works like sort of glue. - if 0 < env.nAccounts: - if healAccountsTrigger <= ctx.data.coveredAccounts.fullFactor: - ctx.data.accountsHealing = true - - if ctx.data.accountsHealing: - # Can only run a single accounts healer instance at a time. This - # instance will clear the batch queue so there is nothing to do for - # another process. - if env.accountsState == HealerIdle: - env.accountsState = HealerRunning - await buddy.healAccounts(env) - env.accountsState = HealerIdle - - if buddy.ctrl.stopped or env.obsolete: - return false + if env.pivotAccountsHealingOk(ctx): + await buddy.healAccounts(env) + if buddy.ctrl.stopped or env.obsolete: + return # Some additional storage slots might have been popped up await buddy.rangeFetchStorageSlots(env) if buddy.ctrl.stopped or env.obsolete: - return false + return await buddy.healStorageSlots(env) - if buddy.ctrl.stopped or env.obsolete: - return false - - return true proc saveCheckpoint*( env: SnapPivotRef; ## Current pivot environment ctx: SnapCtxRef; ## Some global context - ): Result[int,HexaryDbError] = + ): Result[int,HexaryError] = ## Save current sync admin data. On success, the size of the data record ## saved is returned (e.g. for logging.) ## diff --git a/nimbus/sync/snap/worker/range_fetch_accounts.nim b/nimbus/sync/snap/worker/range_fetch_accounts.nim index 6f96a2e06..3f502c1b5 100644 --- a/nimbus/sync/snap/worker/range_fetch_accounts.nim +++ b/nimbus/sync/snap/worker/range_fetch_accounts.nim @@ -29,17 +29,17 @@ ## * Data points in `iv` that were invalid or not recevied from the network ## are merged back it the set `env.fetchAccounts.unprocessed`. ## - import chronicles, chronos, eth/[common, p2p], stew/[interval_set, keyed_queue], stint, + ../../../utils/prettify, ../../sync_desc, ".."/[constants, range_desc, worker_desc], ./com/[com_error, get_account_range], - ./db/snapdb_accounts + ./db/[hexary_paths, snapdb_accounts] {.push raises: [Defect].} @@ -57,21 +57,21 @@ const template logTxt(info: static[string]): static[string] = "Accounts range " & info -proc dumpUnprocessed( - buddy: SnapBuddyRef; - env: SnapPivotRef; - ): string = - ## Debugging ... - let - peer = buddy.peer - pivot = "#" & $env.stateHeader.blockNumber # for logging - moan = proc(overlap: UInt256; iv: NodeTagRange) = - trace logTxt "unprocessed => overlap", peer, pivot, overlap, iv - - env.fetchAccounts.unprocessed.dump(moan, 5) +# proc dumpUnprocessed( +# buddy: SnapBuddyRef; +# env: SnapPivotRef; +# ): string = +# ## Debugging ... +# let +# peer = buddy.peer +# pivot = "#" & $env.stateHeader.blockNumber # for logging +# moan = proc(overlap: UInt256; iv: NodeTagRange) = +# trace logTxt "unprocessed => overlap", peer, pivot, overlap, iv +# +# env.fetchAccounts.unprocessed.dump(moan, 5) # ------------------------------------------------------------------------------ -# Private functions +# Private helpers # ------------------------------------------------------------------------------ proc getUnprocessed( @@ -97,6 +97,8 @@ proc accountsRangefetchImpl( let ctx = buddy.ctx peer = buddy.peer + db = ctx.data.snapDb + fa = env.fetchAccounts stateRoot = env.stateHeader.stateRoot pivot = "#" & $env.stateHeader.blockNumber # for logging @@ -113,7 +115,7 @@ proc accountsRangefetchImpl( let dd = block: let rc = await buddy.getAccountRange(stateRoot, iv, pivot) if rc.isErr: - env.fetchAccounts.unprocessed.merge iv # fail => interval back to pool + fa.unprocessed.merge iv # fail => interval back to pool let error = rc.error if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): when extraTraceMessages: @@ -132,29 +134,29 @@ proc accountsRangefetchImpl( # trace logTxt "fetched", peer, gotAccounts, gotStorage, # pivot, reqLen=iv.len, gotLen=dd.consumed.len - # Now, we fully own the scheduler. The original interval will savely be - # placed back for a moment -- to be corrected below. - env.fetchAccounts.unprocessed.merge iv + # Now, we fully own the scheduler. The original interval will savely be placed + # back for a moment (the `unprocessed` range set to be corrected below.) + fa.unprocessed.merge iv # Processed accounts hashes are set up as a set of intervals which is needed # if the data range returned from the network contains holes. - let processed = NodeTagRangeSet.init() + let covered = NodeTagRangeSet.init() if 0 < dd.data.accounts.len: - discard processed.merge(iv.minPt, dd.data.accounts[^1].accKey.to(NodeTag)) + discard covered.merge(iv.minPt, dd.data.accounts[^1].accKey.to(NodeTag)) else: - discard processed.merge iv + discard covered.merge iv let gaps = block: # No left boundary check needed. If there is a gap, the partial path for # that gap is returned by the import function to be registered, below. - let rc = ctx.data.snapDb.importAccounts( + let rc = db.importAccounts( peer, stateRoot, iv.minPt, dd.data, noBaseBoundCheck = true) if rc.isErr: # Bad data, just try another peer buddy.ctrl.zombie = true when extraTraceMessages: trace logTxt "import failed => stop", peer, gotAccounts, gotStorage, - pivot, reqLen=iv.len, gotLen=processed.total, error=rc.error + pivot, reqLen=iv.len, gotLen=covered.total, error=rc.error return rc.value @@ -164,47 +166,23 @@ proc accountsRangefetchImpl( # Punch holes into the reported range of received accounts from the network # if it there are gaps (described by dangling nodes.) for w in gaps.innerGaps: - discard processed.reduce( - w.partialPath.min(NodeKey).to(NodeTag), - w.partialPath.max(NodeKey).to(Nodetag)) - - # Update dangling nodes list unless healing is activated. The problem - # with healing activated is, that a previously missing node that suddenly - # appears will not automatically translate into a full sub-trie. It might - # just be the node itself (which justified the name `sickSubTrie`). - # - # Instead of managing partial sub-tries here, this is delegated to the - # healing module. - if not ctx.data.accountsHealing: - var delayed: seq[NodeSpecs] - for w in env.fetchAccounts.sickSubTries: - if not ctx.data.snapDb.nodeExists(peer, stateRoot, w): - delayed.add w - when extraTraceMessages: - trace logTxt "dangling nodes", peer, pivot, - nCheckNodes=env.fetchAccounts.checkNodes.len, - nSickSubTries=env.fetchAccounts.sickSubTries.len, - nUpdatedMissing=delayed.len, nOutsideGaps=gaps.dangling.len - env.fetchAccounts.sickSubTries = delayed & gaps.dangling + discard covered.reduce w.partialPath.pathEnvelope # Update book keeping - for w in processed.increasing: + for w in covered.increasing: # Remove the processed range from the batch of unprocessed ones. - env.fetchAccounts.unprocessed.reduce w - # Register consumed intervals on the accumulator over all state roots. + fa.unprocessed.reduce w + # Register consumed intervals on the accumulators over all state roots. discard buddy.ctx.data.coveredAccounts.merge w + discard fa.processed.merge w # Register accounts with storage slots on the storage TODO list. env.fetchStorageFull.merge dd.withStorage - #when extraTraceMessages: - # let - # imported = processed.dump() - # unprocessed = buddy.dumpUnprocessed(env) - # trace logTxt "request done", peer, pivot, - # nCheckNodes=env.fetchAccounts.checkNodes.len, - # nSickSubTries=env.fetchAccounts.sickSubTries.len, - # imported, unprocessed + when extraTraceMessages: + trace logTxt "request done", peer, pivot, + covered=covered.fullFactor.toPC(2), + processed=fa.processed.fullFactor.toPC(2) return true @@ -217,7 +195,10 @@ proc rangeFetchAccounts*( env: SnapPivotRef; ) {.async.} = ## Fetch accounts and store them in the database. - if not env.fetchAccounts.unprocessed.isEmpty(): + let + fa = env.fetchAccounts + + if not fa.processed.isFull(): let ctx = buddy.ctx peer = buddy.peer @@ -226,8 +207,8 @@ proc rangeFetchAccounts*( when extraTraceMessages: trace logTxt "start", peer, pivot - var nFetchAccounts = 0 - while not env.fetchAccounts.unprocessed.isEmpty() and + var nFetchAccounts = 0 # for logging + while not fa.processed.isFull() and buddy.ctrl.running and not env.obsolete: nFetchAccounts.inc @@ -241,6 +222,7 @@ proc rangeFetchAccounts*( when extraTraceMessages: trace logTxt "done", peer, pivot, nFetchAccounts, + nCheckNodes=fa.checkNodes.len, nSickSubTries=fa.sickSubTries.len, runState=buddy.ctrl.state # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/range_fetch_storage_slots.nim b/nimbus/sync/snap/worker/range_fetch_storage_slots.nim index d13ed18b4..840c94133 100644 --- a/nimbus/sync/snap/worker/range_fetch_storage_slots.nim +++ b/nimbus/sync/snap/worker/range_fetch_storage_slots.nim @@ -61,7 +61,7 @@ import ../../sync_desc, ".."/[constants, range_desc, worker_desc], ./com/[com_error, get_storage_ranges], - ./db/snapdb_storage_slots + ./db/[hexary_error, snapdb_storage_slots] {.push raises: [Defect].} @@ -131,28 +131,24 @@ proc getNextSlotItemPartial( for kvp in env.fetchStoragePart.nextPairs: if not kvp.data.slots.isNil: - # Extract first interval and return single item request queue - for ivSet in kvp.data.slots.unprocessed: - let rc = ivSet.ge() - if rc.isOk: + # Extract range and return single item request queue + let rc = kvp.data.slots.unprocessed.fetch(maxLen = high(UInt256)) + if rc.isOk: - # Extraxt this interval from the range set - discard ivSet.reduce rc.value + # Delete from batch queue if the range set becomes empty + if kvp.data.slots.unprocessed.isEmpty: + env.fetchStoragePart.del(kvp.key) - # Delete from batch queue if the range set becomes empty - if kvp.data.slots.unprocessed.isEmpty: - env.fetchStoragePart.del(kvp.key) + when extraTraceMessages: + trace logTxt "fetch partial", peer, + nSlotLists=env.nSlotLists, + nStorageQuPart=env.fetchStoragePart.len, + subRange=rc.value, account=kvp.data.accKey - when extraTraceMessages: - trace logTxt "fetch partial", peer, - nSlotLists=env.nSlotLists, - nStorageQuPart=env.fetchStoragePart.len, - subRange=rc.value, account=kvp.data.accKey - - return @[AccountSlotsHeader( - accKey: kvp.data.accKey, - storageRoot: kvp.key, - subRange: some rc.value)] + return @[AccountSlotsHeader( + accKey: kvp.data.accKey, + storageRoot: kvp.key, + subRange: some rc.value)] # Oops, empty range set? Remove range and move item to the full requests kvp.data.slots = nil @@ -231,20 +227,54 @@ proc storeStoragesSingleBatch( for w in report: # All except the last item always index to a node argument. The last # item has been checked for, already. - let inx = w.slot.get + let + inx = w.slot.get + acc = stoRange.data.storages[inx].account - # if w.error in {RootNodeMismatch, RightBoundaryProofFailed}: - # ??? + if w.error == RootNodeMismatch: + # Some pathological case, needs further investigation. For the + # moment, provide partial fetches. + const + halfTag = (high(UInt256) div 2).NodeTag + halfTag1 = halfTag + 1.u256 + env.fetchStoragePart.merge [ + AccountSlotsHeader( + accKey: acc.accKey, + storageRoot: acc.storageRoot, + subRange: some NodeTagRange.new(low(NodeTag), halfTag)), + AccountSlotsHeader( + accKey: acc.accKey, + storageRoot: acc.storageRoot, + subRange: some NodeTagRange.new(halfTag1, high(NodeTag)))] - # Reset any partial result (which would be the last entry) to - # requesting the full interval. So all the storage slots are - # re-fetched completely for this account. - env.fetchStorageFull.merge AccountSlotsHeader( - accKey: stoRange.data.storages[inx].account.accKey, - storageRoot: stoRange.data.storages[inx].account.storageRoot) + elif w.error == RightBoundaryProofFailed and + acc.subRange.isSome and 1 < acc.subRange.unsafeGet.len: + # Some pathological case, needs further investigation. For the + # moment, provide a partial fetches. + let + iv = acc.subRange.unsafeGet + halfTag = iv.minPt + (iv.len div 2) + halfTag1 = halfTag + 1.u256 + env.fetchStoragePart.merge [ + AccountSlotsHeader( + accKey: acc.accKey, + storageRoot: acc.storageRoot, + subRange: some NodeTagRange.new(iv.minPt, halfTag)), + AccountSlotsHeader( + accKey: acc.accKey, + storageRoot: acc.storageRoot, + subRange: some NodeTagRange.new(halfTag1, iv.maxPt))] - # Last entry might be partial - if inx == topStoRange: + else: + # Reset any partial result (which would be the last entry) to + # requesting the full interval. So all the storage slots are + # re-fetched completely for this account. + env.fetchStorageFull.merge AccountSlotsHeader( + accKey: acc.accKey, + storageRoot: acc.storageRoot) + + # Last entry might be partial (if any) + # # Forget about partial result processing if the last partial entry # was reported because # * either there was an error processing it diff --git a/nimbus/sync/snap/worker/sub_tries_helper.nim b/nimbus/sync/snap/worker/sub_tries_helper.nim new file mode 100644 index 000000000..198be5c35 --- /dev/null +++ b/nimbus/sync/snap/worker/sub_tries_helper.nim @@ -0,0 +1,223 @@ +# Nimbus +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +import + chronicles, + chronos, + eth/[common, p2p], + stew/interval_set, + ".."/[constants, range_desc, worker_desc], + ./db/[hexary_desc, hexary_error, hexary_inspect, hexary_paths] + +{.push raises: [Defect].} + +logScope: + topics = "snap-subtrie" + +const + extraTraceMessages = false or true + ## Enabled additional logging noise + +# ------------------------------------------------------------------------------ +# Private logging helpers +# ------------------------------------------------------------------------------ + +template logTxt(info: static[string]): static[string] = + "Sub-trie helper " & info + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc doInspect( + getFn: HexaryGetFn; ## Abstract database access + rootKey: NodeKey; ## Start of hexary trie + partialPaths: seq[Blob]; ## Nodes with prob. dangling child links + resumeCtx: TrieNodeStatCtxRef; ## Resume previous inspection + ): Result[TrieNodeStat,HexaryError] + {.gcsafe, raises: [Defect,RlpError].} = + ## .. + let stats = getFn.hexaryInspectTrie( + rootKey, partialPaths, resumeCtx, healInspectionBatch) + + if stats.stopped: + return err(TrieLoopAlert) + + ok(stats) + + +proc getOverlapping( + batch: SnapRangeBatchRef; ## Healing data support + iv: NodeTagRange; ## Reference interval + ): Result[NodeTagRange,void] = + ## Find overlapping interval in `batch` + block: + let rc = batch.processed.ge iv.minPt + if rc.isOk and rc.value.minPt <= iv.maxPt: + return ok(rc.value) + block: + let rc = batch.processed.le iv.maxPt + if rc.isOk and iv.minPt <= rc.value.maxPt: + return ok(rc.value) + err() + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc subTriesFromPartialPaths*( + getFn: HexaryGetFn; ## Abstract database access + stateRoot: Hash256; ## Start of hexary trie + batch: SnapRangeBatchRef; ## Healing data support + sickSubTriesMaxLen = high(int); ## Max length of `sickSubTries` + ): Future[Result[void,HexaryError]] + {.async.} = + ## Starting with a given set of potentially dangling account nodes + ## `checkNodes`, this set is filtered and processed. The outcome is + ## fed back to the vey same list `checkNodes` + + # Process might be expensive, so only a single instance is allowed to run it + if batch.lockTriePerusal: + return err(TrieIsLockedForPerusal) + batch.lockTriePerusal = true + + let + rootKey = stateRoot.to(NodeKey) + var + error: HexaryError + count = 0 # for logging + start = Moment.now() # for logging + + block errorWhenOutside: + try: + while batch.sickSubTries.len < sickSubTriesMaxLen: + # Inspect hexary trie for dangling nodes + let rc = getFn.doInspect(rootKey, batch.checkNodes, batch.resumeCtx) + if rc.isErr: + error = rc.error + break errorWhenOutside + + count.inc + + # Update context for async threading environment + batch.resumeCtx = rc.value.resumeCtx + batch.checkNodes.setLen(0) + + # Collect result + batch.sickSubTries = batch.sickSubTries & rc.value.dangling + + # Done unless there is some resumption context + if rc.value.resumeCtx.isNil: + break + + when extraTraceMessages: + trace logTxt "inspection wait", count, + elapsed=(Moment.now()-start), + sleep=healInspectionBatchWaitNanoSecs, + sickSubTriesLen=batch.sickSubTries.len, sickSubTriesMaxLen, + resumeCtxLen = batch.resumeCtx.hddCtx.len + + # Allow async task switch and continue. Note that some other task might + # steal some of the `sickSubTries` var argument. + await sleepAsync healInspectionBatchWaitNanoSecs.nanoseconds + + batch.lockTriePerusal = false + return ok() + + except RlpError: + error = RlpEncoding + + batch.sickSubTries = batch.sickSubTries & batch.resumeCtx.to(seq[NodeSpecs]) + batch.resumeCtx = nil + + batch.lockTriePerusal = false + return err(error) + + +proc subTriesNodesReclassify*( + getFn: HexaryGetFn; ## Abstract database access + rootKey: NodeKey; ## Start node into hexary trie + batch: SnapRangeBatchRef; ## Healing data support + ) {.gcsafe, raises: [Defect,KeyError].} = + ## Check whether previously missing nodes from the `sickSubTries` list have + ## been magically added to the database since it was checked last time. These + ## nodes will me moved to `checkNodes` for further processing. Also, some + ## full sub-tries might have been added which can be checked against + ## the `processed` range set. + + # Move `sickSubTries` entries that have now an exisiting node to the + # list of partial paths to be re-checked. + block: + var delayed: seq[NodeSpecs] + for w in batch.sickSubTries: + if 0 < getFn(w.nodeKey.ByteArray32).len: + batch.checkNodes.add w.partialPath + else: + delayed.add w + batch.sickSubTries = delayed + + # Remove `checkNodes` entries with complete known sub-tries. + var + doneWith: seq[Blob] # loop will not recurse on that list + count = 0 # for logging only + + # `While` loop will terminate with processed paths in `doneWith`. + block: + var delayed: seq[Blob] + while 0 < batch.checkNodes.len: + + when extraTraceMessages: + trace logTxt "reclassify", count, + nCheckNodes=batch.checkNodes.len + + for w in batch.checkNodes: + let + iv = w.pathEnvelope + nCov = batch.processed.covered iv + + if iv.len <= nCov: + # Fully processed envelope, no need to keep `w` any longer + when extraTraceMessages: + trace logTxt "reclassify discard", count, partialPath=w, + nDelayed=delayed.len + continue + + if 0 < nCov: + # Partially processed range, fetch an overlapping interval and + # remove that from the envelope of `w`. + try: + let paths = w.dismantle( + rootKey, batch.getOverlapping(iv).value, getFn) + delayed &= paths + when extraTraceMessages: + trace logTxt "reclassify dismantled", count, partialPath=w, + nPaths=paths.len, nDelayed=delayed.len + continue + except RlpError: + discard + + # Not processed at all. So keep `w` but there is no need to look + # at it again in the next lap. + doneWith.add w + + # Prepare for next lap + batch.checkNodes.swap delayed + delayed.setLen(0) + + batch.checkNodes = doneWith.pathSortUniq + + when extraTraceMessages: + trace logTxt "reclassify finalise", count, + nDoneWith=doneWith.len, nCheckNodes=batch.checkNodes.len + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ + diff --git a/nimbus/sync/snap/worker_desc.nim b/nimbus/sync/snap/worker_desc.nim index 3aaa57246..e2491f79c 100644 --- a/nimbus/sync/snap/worker_desc.nim +++ b/nimbus/sync/snap/worker_desc.nim @@ -54,14 +54,7 @@ type checkNodes*: seq[Blob] ## Nodes with prob. dangling child links sickSubTries*: seq[NodeSpecs] ## Top ref for sub-tries to be healed resumeCtx*: TrieNodeStatCtxRef ## State for resuming trie inpection - - SnapHealingState* = enum - ## State of healing process. The `HealerRunning` state indicates that - ## dangling and/or missing nodes have been temprarily removed from the - ## batch queue while processing. - HealerIdle - HealerRunning - HealerDone + lockTriePerusal*: bool ## Only one process at a time SnapPivotRef* = ref object ## Per-state root cache for particular snap data environment @@ -69,7 +62,6 @@ type # Accounts download fetchAccounts*: SnapRangeBatchRef ## Set of accounts ranges to fetch - accountsState*: SnapHealingState ## All accounts have been processed healThresh*: float ## Start healing when fill factor reached # Storage slots download @@ -107,7 +99,6 @@ type pivotTable*: SnapPivotTable ## Per state root environment pivotFinderCtx*: RootRef ## Opaque object reference for sub-module coveredAccounts*: NodeTagRangeSet ## Derived from all available accounts - accountsHealing*: bool ## Activates accounts DB healing recovery*: SnapRecoveryRef ## Current recovery checkpoint/context noRecovery*: bool ## Ignore recovery checkpoints diff --git a/tests/test_sync_snap.nim b/tests/test_sync_snap.nim index 63c2eef95..c54fda265 100644 --- a/tests/test_sync_snap.nim +++ b/tests/test_sync_snap.nim @@ -65,8 +65,8 @@ else: let # Forces `check()` to print the error (as opposed when using `isOk()`) - OkHexDb = Result[void,HexaryDbError].ok() - OkStoDb = Result[void,seq[(int,HexaryDbError)]].ok() + OkHexDb = Result[void,HexaryError].ok() + OkStoDb = Result[void,seq[(int,HexaryError)]].ok() # There was a problem with the Github/CI which results in spurious crashes # when leaving the `runner()` if the persistent BaseChainDB initialisation @@ -88,7 +88,7 @@ var proc isOk(rc: ValidationResult): bool = rc == ValidationResult.OK -proc isImportOk(rc: Result[SnapAccountsGaps,HexaryDbError]): bool = +proc isImportOk(rc: Result[SnapAccountsGaps,HexaryError]): bool = if rc.isErr: check rc.error == NothingSerious # prints an error if different elif 0 < rc.value.innerGaps.len: @@ -96,7 +96,7 @@ proc isImportOk(rc: Result[SnapAccountsGaps,HexaryDbError]): bool = else: return true -proc toStoDbRc(r: seq[HexaryNodeReport]): Result[void,seq[(int,HexaryDbError)]]= +proc toStoDbRc(r: seq[HexaryNodeReport]): Result[void,seq[(int,HexaryError)]]= ## Kludge: map error report to (older version) return code if r.len != 0: return err(r.mapIt((it.slot.get(otherwise = -1),it.error))) @@ -125,13 +125,13 @@ proc pp(d: Duration): string = else: d.ppUs -proc pp(rc: Result[Account,HexaryDbError]): string = +proc pp(rc: Result[Account,HexaryError]): string = if rc.isErr: $rc.error else: rc.value.pp -proc pp(rc: Result[Hash256,HexaryDbError]): string = +proc pp(rc: Result[Hash256,HexaryError]): string = if rc.isErr: $rc.error else: $rc.value.to(NodeTag) -proc pp(rc: Result[TrieNodeStat,HexaryDbError]; db: SnapDbBaseRef): string = +proc pp(rc: Result[TrieNodeStat,HexaryError]; db: SnapDbBaseRef): string = if rc.isErr: $rc.error else: rc.value.pp(db.hexaDb) proc pp(a: NodeKey; collapse = true): string = @@ -409,6 +409,56 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) = # Beware: dumping a large database is not recommended #true.say "***", "database dump\n ", desc.dumpHexaDB() + test "Dismantle path prefix envelopes": + doAssert 1 < accKeys.len + let + iv = NodeTagRange.new(accBaseTag, accKeys[^2].to(NodeTag)) + ivMin = iv.minPt.to(NodeKey).ByteArray32.toSeq.initNibbleRange + ivMax = iv.maxPt.to(NodeKey).ByteArray32.toSeq.initNibbleRange + pfxLen = ivMin.sharedPrefixLen ivMax + # Use some overlapping prefixes. Note that a prefix must refer to + # an existing node + for n in 0 .. pfxLen: + let + pfx = ivMin.slice(0, pfxLen - n).hexPrefixEncode + qfx = pfx.dismantle(root.to(NodeKey), iv, desc.hexaDB) + + # Re-assemble intervals + let covered = NodeTagRangeSet.init() + for w in qfx: + let iv = pathEnvelope w + check iv.len == covered.merge iv + + if covered.chunks == 1 and iv.minPt == low(NodeTag): + # Order: `iv` <= `covered` + check iv.maxPt <= covered.ge.value.minPt + elif covered.chunks == 1 and iv.maxPt == high(NodeTag): + # Order: `covered` <= `iv` + check covered.ge.value.maxPt <= iv.minPt + else: + # Covered contains two ranges were the gap is big enough for `iv` + check covered.chunks == 2 + # Order: `covered.ge` <= `iv` <= `covered.le` + check covered.ge.value.maxPt <= iv.minPt + check iv.maxPt <= covered.le.value.minPt + + # Must hold + check covered.le.value.minPt <= accKeys[^1].to(Nodetag) + + when false: # or true: + let + cmaNlSp0 = ",\n" & repeat(" ",12) + cmaNlSpc = ",\n" & repeat(" ",13) + echo ">>> n=", n, " pfxMax=", pfxLen, + "\n pfx=", pfx, + "\n ivMin=", ivMin, + "\n iv1st=", accKeys[0], + "\n ivMax=", ivMax, + "\n ivPast=", accKeys[^1], + "\n covered=@[", toSeq(covered.increasing) + .mapIt(&"[{it.minPt}{cmaNlSpc}{it.maxPt}]") + .join(cmaNlSp0), "]", + "\n => @[", qfx.mapIt(it.toHex).join(cmaNlSpc), "]" test &"Storing/retrieving {accKeys.len} items " & "on persistent state root registry": @@ -464,7 +514,7 @@ proc storagesRunner( noisy = true; persistent = true; sample = storSample; - knownFailures: seq[(string,seq[(int,HexaryDbError)])] = @[]) = + knownFailures: seq[(string,seq[(int,HexaryError)])] = @[]) = let peer = Peer.new accountsList = sample.to(seq[UndumpAccounts]) @@ -502,7 +552,7 @@ proc storagesRunner( let testId = fileInfo & "#" & $n expRc = if ignore.hasKey(testId): - Result[void,seq[(int,HexaryDbError)]].err(ignore[testId]) + Result[void,seq[(int,HexaryError)]].err(ignore[testId]) else: OkStoDb check dbDesc.importStorageSlots(w.data, persistent).toStoDbRc == expRc @@ -521,7 +571,7 @@ proc storagesRunner( dbDesc = SnapDbStorageSlotsRef.init(dbBase, accKey, root, peer) rc = dbDesc.inspectStorageSlotsTrie(persistent=persistent) if m == errInx: - check rc == Result[TrieNodeStat,HexaryDbError].err(TrieIsEmpty) + check rc == Result[TrieNodeStat,HexaryError].err(TrieIsEmpty) else: check rc.isOk # ok => level > 0 and not stopped @@ -574,25 +624,24 @@ proc inspectionRunner( desc = SnapDbAccountsRef.init(memBase, root, peer) for w in accList: check desc.importAccounts(w.base, w.data, persistent=false).isImportOk - let rc = desc.inspectAccountsTrie(persistent=false) - check rc.isOk + let stats = desc.hexaDb.hexaryInspectTrie(rootKey) + check not stats.stopped let - dangling = rc.value.dangling.mapIt(it.partialPath) + dangling = stats.dangling.mapIt(it.partialPath) keys = desc.hexaDb.hexaryInspectToKeys(rootKey, dangling) check dangling.len == keys.len - singleStats.add (desc.hexaDb.tab.len,rc.value) + singleStats.add (desc.hexaDb.tab.len,stats) - # Verify piecemeal approach for `inspectAccountsTrie()` ... + # Verify piecemeal approach for `hexaryInspectTrie()` ... var ctx = TrieNodeStatCtxRef() piecemeal: HashSet[Blob] while not ctx.isNil: - let rx = desc.inspectAccountsTrie( - resumeCtx=ctx, suspendAfter=128, persistent=false) - check rx.isOk - let stats = rx.get(otherwise = TrieNodeStat()) - ctx = stats.resumeCtx - piecemeal.incl stats.dangling.mapIt(it.partialPath).toHashSet + let stat2 = desc.hexaDb.hexaryInspectTrie( + rootKey, resumeCtx=ctx, suspendAfter=128) + check not stat2.stopped + ctx = stat2.resumeCtx + piecemeal.incl stat2.dangling.mapIt(it.partialPath).toHashSet # Must match earlier all-in-one result check dangling.len == piecemeal.len check dangling.toHashSet == piecemeal @@ -614,27 +663,26 @@ proc inspectionRunner( for w in accList: check desc.importAccounts(w.base,w.data, persistent=true).isImportOk - let rc = desc.inspectAccountsTrie(persistent=true) - check rc.isOk + let stats = desc.getAccountFn.hexaryInspectTrie(rootKey) + check not stats.stopped let - dangling = rc.value.dangling.mapIt(it.partialPath) + dangling = stats.dangling.mapIt(it.partialPath) keys = desc.hexaDb.hexaryInspectToKeys(rootKey, dangling) check dangling.len == keys.len # Must be the same as the in-memory fingerprint let ssn1 = singleStats[n][1].dangling.mapIt(it.partialPath) check ssn1.toHashSet == dangling.toHashSet - # Verify piecemeal approach for `inspectAccountsTrie()` ... + # Verify piecemeal approach for `hexaryInspectTrie()` ... var ctx = TrieNodeStatCtxRef() piecemeal: HashSet[Blob] while not ctx.isNil: - let rx = desc.inspectAccountsTrie( - resumeCtx=ctx, suspendAfter=128, persistent=persistent) - check rx.isOk - let stats = rx.get(otherwise = TrieNodeStat()) - ctx = stats.resumeCtx - piecemeal.incl stats.dangling.mapIt(it.partialPath).toHashSet + let stat2 = desc.getAccountFn.hexaryInspectTrie( + rootKey, resumeCtx=ctx, suspendAfter=128) + check not stat2.stopped + ctx = stat2.resumeCtx + piecemeal.incl stat2.dangling.mapIt(it.partialPath).toHashSet # Must match earlier all-in-one result check dangling.len == piecemeal.len check dangling.toHashSet == piecemeal @@ -649,14 +697,14 @@ proc inspectionRunner( desc = memDesc.dup(root,Peer()) for w in accList: check desc.importAccounts(w.base, w.data, persistent=false).isImportOk - let rc = desc.inspectAccountsTrie(persistent=false) - check rc.isOk + let stats = desc.hexaDb.hexaryInspectTrie(rootKey) + check not stats.stopped let - dangling = rc.value.dangling.mapIt(it.partialPath) + dangling = stats.dangling.mapIt(it.partialPath) keys = desc.hexaDb.hexaryInspectToKeys( rootKey, dangling.toHashSet.toSeq) check dangling.len == keys.len - accuStats.add (desc.hexaDb.tab.len,rc.value) + accuStats.add (desc.hexaDb.tab.len, stats) test &"Fingerprinting {inspectList.len} accumulated accounts lists " & "for persistent db": @@ -672,14 +720,14 @@ proc inspectionRunner( desc = perDesc.dup(root,Peer()) for w in accList: check desc.importAccounts(w.base, w.data, persistent).isImportOk - let rc = desc.inspectAccountsTrie(persistent=false) - check rc.isOk + let stats = desc.getAccountFn.hexaryInspectTrie(rootKey) + check not stats.stopped let - dangling = rc.value.dangling.mapIt(it.partialPath) + dangling = stats.dangling.mapIt(it.partialPath) keys = desc.hexaDb.hexaryInspectToKeys( rootKey, dangling.toHashSet.toSeq) check dangling.len == keys.len - check accuStats[n][1] == rc.value + check accuStats[n][1] == stats test &"Cascaded fingerprinting {inspectList.len} accumulated accounts " & "lists for in-memory-db": @@ -702,12 +750,13 @@ proc inspectionRunner( if cscStep.hasKeyOrPut(rootKey,(1,seq[Blob].default)): cscStep[rootKey][0].inc let - r0 = desc.inspectAccountsTrie(persistent=false) - rc = desc.inspectAccountsTrie(cscStep[rootKey][1],persistent=false) - check rc.isOk + stat0 = desc.hexaDb.hexaryInspectTrie(rootKey) + stats = desc.hexaDb.hexaryInspectTrie(rootKey,cscStep[rootKey][1]) + check not stat0.stopped + check not stats.stopped let - accumulated = r0.value.dangling.mapIt(it.partialPath).toHashSet - cascaded = rc.value.dangling.mapIt(it.partialPath).toHashSet + accumulated = stat0.dangling.mapIt(it.partialPath).toHashSet + cascaded = stats.dangling.mapIt(it.partialPath).toHashSet check accumulated == cascaded # Make sure that there are no trivial cases let trivialCases = toSeq(cscStep.values).filterIt(it[0] <= 1).len @@ -734,12 +783,14 @@ proc inspectionRunner( if cscStep.hasKeyOrPut(rootKey,(1,seq[Blob].default)): cscStep[rootKey][0].inc let - r0 = desc.inspectAccountsTrie(persistent=true) - rc = desc.inspectAccountsTrie(cscStep[rootKey][1],persistent=true) - check rc.isOk + stat0 = desc.getAccountFn.hexaryInspectTrie(rootKey) + stats = desc.getAccountFn.hexaryInspectTrie(rootKey, + cscStep[rootKey][1]) + check not stat0.stopped + check not stats.stopped let - accumulated = r0.value.dangling.mapIt(it.partialPath).toHashSet - cascaded = rc.value.dangling.mapIt(it.partialPath).toHashSet + accumulated = stat0.dangling.mapIt(it.partialPath).toHashSet + cascaded = stats.dangling.mapIt(it.partialPath).toHashSet check accumulated == cascaded # Make sure that there are no trivial cases let trivialCases = toSeq(cscStep.values).filterIt(it[0] <= 1).len