diff --git a/nimbus/sync/snap/constants.nim b/nimbus/sync/snap/constants.nim index 90e2021f2..aacbac4ab 100644 --- a/nimbus/sync/snap/constants.nim +++ b/nimbus/sync/snap/constants.nim @@ -11,12 +11,15 @@ {.push raises: [].} import - eth/trie/nibbles + eth/[common, trie/nibbles] const EmptyBlob* = seq[byte].default ## Useful shortcut + EmptyBlobSeq* = seq[Blob].default + ## Useful shortcut + EmptyNibbleSeq* = EmptyBlob.initNibbleRange ## Useful shortcut @@ -60,6 +63,11 @@ const # -------------- + accountsFetchRetryMax* = 2 + ## The request intervals will be slightly re-arranged after failure. + ## So re-trying to fetch another range might be successful (set to 0 + ## for disabling retries.) + accountsSaveProcessedChunksMax* = 1000 ## Recovery data are stored if the processed ranges list contains no more ## than this many range *chunks*. @@ -104,12 +112,13 @@ const ## The maximal number of nodes visited at level 3 is *4KiB* and at level 4 ## is *64Kib*. - healAccountsBatchMax* = 10 * 1024 - ## Keep on gloing in healing task up until this many nodes have been - ## fetched from the network or some error contition terminates the task. - ## - ## This constant should be larger than `fetchRequestStorageSlotsMax` + healAccountsInspectionPlanBRetryMax* = 2 + ## Retry inspection if this may times unless there is at least one dangling + ## node found. + healAccountsInspectionPlanBRetryNapMSecs* = 2 + ## Sleep beween inspection retrys to allow thread switch. If this constant + ## is set `0`, `1`ns wait is used. healSlorageSlotsTrigger* = 0.70 ## Consider per account storage slost healing if a per-account hexary @@ -118,6 +127,12 @@ const healStorageSlotsInspectionPlanBLevel* = 4 ## Similar to `healAccountsInspectionPlanBLevel` + healStorageSlotsInspectionPlanBRetryMax* = 2 + ## Similar to `healAccountsInspectionPlanBRetryMax` + + healStorageSlotsInspectionPlanBRetryNapMSecs* = 2 + ## Similar to `healAccountsInspectionPlanBRetryNapMSecs` + healStorageSlotsBatchMax* = 32 ## Maximal number of storage tries to to heal in a single batch run. Only ## this many items will be removed from the batch queue. These items will @@ -152,7 +167,6 @@ const static: doAssert storageSlotsQuPrioThresh < accountsSaveStorageSlotsMax - doAssert fetchRequestTrieNodesMax < healAccountsBatchMax # Deprecated, to be expired diff --git a/nimbus/sync/snap/worker/com/get_trie_nodes.nim b/nimbus/sync/snap/worker/com/get_trie_nodes.nim index 86b09dbd0..46e5b2147 100644 --- a/nimbus/sync/snap/worker/com/get_trie_nodes.nim +++ b/nimbus/sync/snap/worker/com/get_trie_nodes.nim @@ -114,26 +114,28 @@ proc getTrieNodes*( ## (if any.) let peer {.used.} = buddy.peer - nPaths = paths.len + nGroups = paths.len - if nPaths == 0: + if nGroups == 0: return err(ComEmptyRequestArguments) - let nTotal = paths.mapIt(min(1,it.slotPaths.len)).foldl(a+b, 0) + let nTotal = paths.mapIt(max(1,it.slotPaths.len)).foldl(a+b, 0) if trSnapTracePacketsOk: - trace trSnapSendSending & "GetTrieNodes", peer, pivot, nPaths, nTotal + trace trSnapSendSending & "GetTrieNodes", peer, pivot, nGroups, nTotal let trieNodes = block: let rc = await buddy.getTrieNodesReq(stateRoot, paths, pivot) if rc.isErr: return err(ComNetworkProblem) if rc.value.isNone: - trace trSnapRecvTimeoutWaiting & "for TrieNodes", peer, pivot, nPaths + trace trSnapRecvTimeoutWaiting & "for TrieNodes", peer, pivot, nGroups return err(ComResponseTimeout) let blobs = rc.value.get.nodes if nTotal < blobs.len: # Ooops, makes no sense + trace trSnapRecvError & "too many TrieNodes", peer, pivot, + nGroups, nExpected=nTotal, nReceived=blobs.len return err(ComTooManyTrieNodes) blobs @@ -153,7 +155,7 @@ proc getTrieNodes*( # nodes. # * The responding node is allowed to return less data than requested # (serving QoS limits), but the node must return at least one trie node. - trace trSnapRecvReceived & "empty TrieNodes", peer, pivot, nPaths, nNodes + trace trSnapRecvReceived & "empty TrieNodes", peer, pivot, nGroups, nNodes return err(ComNoByteCodesAvailable) # Assemble return value @@ -172,7 +174,7 @@ proc getTrieNodes*( break trace trSnapRecvReceived & "TrieNodes", peer, pivot, - nPaths, nNodes, nLeftOver=dd.leftOver.len + nGroups, nNodes, nLeftOver=dd.leftOver.len return ok(dd) diff --git a/nimbus/sync/snap/worker/db/hexary_error.nim b/nimbus/sync/snap/worker/db/hexary_error.nim index 5cf529a35..c5c40cc43 100644 --- a/nimbus/sync/snap/worker/db/hexary_error.nim +++ b/nimbus/sync/snap/worker/db/hexary_error.nim @@ -50,6 +50,7 @@ type NearbyLeafExpected NearbyDanglingLink NearbyPathTail + NearbyBeyondRange # envelope DecomposeDegenerated diff --git a/nimbus/sync/snap/worker/db/hexary_inspect.nim b/nimbus/sync/snap/worker/db/hexary_inspect.nim index f32c58899..be63df93b 100644 --- a/nimbus/sync/snap/worker/db/hexary_inspect.nim +++ b/nimbus/sync/snap/worker/db/hexary_inspect.nim @@ -16,7 +16,7 @@ import eth/[common, trie/nibbles], stew/results, "../.."/[constants, range_desc], - "."/[hexary_desc, hexary_paths] + "."/[hexary_desc, hexary_nodes_helper, hexary_paths] logScope: topics = "snap-db" @@ -64,58 +64,157 @@ proc convertTo(key: RepairKey; T: type NodeKey): T = ## Might be lossy, check before use discard result.init(key.ByteArray33[1 .. 32]) -proc convertTo(key: Blob; T: type NodeKey): T = - ## Might be lossy, check before use - discard result.init(key) +proc convertTo(key: NodeKey; T: type NodeKey): T = + ## For simplifying generic functions + key + +proc convertTo(key: RepairKey; T: type RepairKey): T = + ## For simplifying generic functions + key + +proc isNodeKey(key: Blob): bool = + ## For simplifying generic functions + key.len == 32 or key.len == 0 + +proc to(key: NodeKey; T: type NodeKey): T = + ## For simplifying generic functions + key # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ -proc processLink( - db: HexaryTreeDbRef; - stats: var TrieNodeStat; - inspect: var seq[(RepairKey,NibblesSeq)]; - trail: NibblesSeq; - child: RepairKey; - ) = - ## Helper for `hexaryInspect()` - if not child.isZero: +proc processLink[Q]( + db: HexaryTreeDbRef|HexaryGetFn; # Database abstraction + stats: var TrieNodeStat; # Collecting results + inspect: var Q; # Intermediate todo list + trail: NibblesSeq; # Todo list argument + child: RepairKey|Blob; # Todo list argument + ) {.gcsafe, raises: [CatchableError]} = + ## Helper for `inspectTrieImpl()` + if not child.isZeroLink: if not child.isNodeKey: # Oops -- caught in the middle of a repair process? Just register # this node stats.dangling.add NodeSpecs( partialPath: trail.hexPrefixEncode(isLeaf = false)) - elif db.tab.hasKey(child): - inspect.add (child,trail) + elif child.getNode(db).isOk: + inspect.add (child.convertTo(typeof(inspect[0][0])), trail) else: stats.dangling.add NodeSpecs( partialPath: trail.hexPrefixEncode(isLeaf = false), nodeKey: child.convertTo(NodeKey)) -proc processLink( - getFn: HexaryGetFn; - stats: var TrieNodeStat; - inspect: var seq[(NodeKey,NibblesSeq)]; - trail: NibblesSeq; - child: Rlp; - ) {.gcsafe, raises: [CatchableError]} = - ## Ditto - if not child.isEmpty: - let childBlob = child.toBytes - if childBlob.len != 32: - # Oops -- that is wrong, although the only sensible action is to - # register the node and otherwise ignore it - stats.dangling.add NodeSpecs( - partialPath: trail.hexPrefixEncode(isLeaf = false)) +proc inspectTrieImpl( + db: HexaryTreeDbRef|HexaryGetFn; # Database abstraction + rootKey: NodeKey|RepairKey; # State root + partialPaths: seq[Blob]; # Starting paths for search + resumeCtx: TrieNodeStatCtxRef; # Context for resuming inspection + suspendAfter: uint64; # To be resumed + stopAtLevel: uint8; # Width-first depth level + maxDangling: int; # Maximal number of dangling results + ): TrieNodeStat + {.gcsafe, raises: [CatchableError]} = + ## ... + when extraTraceMessages: + let nPaths = partialPaths.len + + if rootKey.getNode(db).isErr: + when extraTraceMessages: + trace "Hexary inspect: missing root", nPaths, maxDangling, + rootKey=rootKey.convertTo(NodeKey) + return TrieNodeStat() + + var + reVisit: seq[(typeof(rootKey),NibblesSeq)] + again: seq[(typeof(rootKey),NibblesSeq)] + resumeOk = false + + # Initialise lists from previous session + if not resumeCtx.isNil: + when typeof(db) is HexaryTreeDbRef: + if not resumeCtx.persistent and 0 < resumeCtx.memCtx.len: + resumeOk = true + reVisit = resumeCtx.memCtx else: - let childKey = childBlob.convertTo(NodeKey) - if 0 < child.toBytes.getFn().len: - inspect.add (childKey,trail) - else: - stats.dangling.add NodeSpecs( - partialPath: trail.hexPrefixEncode(isLeaf = false), - nodeKey: childKey) + if resumeCtx.persistent and 0 < resumeCtx.hddCtx.len: + resumeOk = true + reVisit = resumeCtx.hddCtx + + if partialPaths.len == 0 and not resumeOk: + reVisit.add (rootKey,EmptyNibbleSeq) + else: + # Add argument paths + for w in partialPaths: + let (isLeaf,nibbles) = hexPrefixDecode w + if not isLeaf: + let rc = nibbles.hexaryPathNodeKey(rootKey, db, missingOk=false) + if rc.isOk: + reVisit.add (rc.value.to(typeof(rootKey)), nibbles) + + # Stopping on `suspendAfter` has precedence over `stopAtLevel` + while 0 < reVisit.len and result.count <= suspendAfter: + when extraTraceMessages: + trace "Hexary inspect processing", nPaths, maxDangling, + level=result.level, nReVisit=reVisit.len, nDangling=result.dangling.len + + if stopAtLevel < result.level: + result.stopped = true + break + + for n in 0 ..< reVisit.len: + if suspendAfter < result.count or + maxDangling <= result.dangling.len: + # Swallow rest + again &= reVisit[n ..< reVisit.len] + break + + let + (rKey, parentTrail) = reVisit[n] + rc = rKey.getNode(db) + if rc.isErr: + continue # ignore this node + let node = rc.value + + case node.kind: + of Extension: + let + trail = parentTrail & node.ePfx + child = node.eLink + db.processLink(stats=result, inspect=again, trail, child) + of Branch: + for n in 0 ..< 16: + let + trail = parentTrail & @[n.byte].initNibbleRange.slice(1) + child = node.bLink[n] + db.processLink(stats=result, inspect=again, trail, child) + of Leaf: + # Ooops, forget node and key + discard + + result.count.inc + # End `for` + + result.level.inc + swap(reVisit, again) + again.setLen(0) + # End while + + # Collect left overs for resuming search + if 0 < reVisit.len: + when typeof(db) is HexaryTreeDbRef: + result.resumeCtx = TrieNodeStatCtxRef( + persistent: false, + memCtx: reVisit) + else: + result.resumeCtx = TrieNodeStatCtxRef( + persistent: true, + hddCtx: reVisit) + + when extraTraceMessages: + trace "Hexary inspect finished", nPaths, maxDangling, + level=result.level, nResumeCtx=reVisit.len, nDangling=result.dangling.len, + maxLevel=stopAtLevel, stopped=result.stopped # ------------------------------------------------------------------------------ # Public functions @@ -143,10 +242,10 @@ proc to*(resumeCtx: TrieNodeStatCtxRef; T: type seq[NodeSpecs]): T = proc hexaryInspectTrie*( - db: HexaryTreeDbRef; # Database - root: NodeKey; # State root - partialPaths: seq[Blob] = @[]; # Starting paths for search - resumeCtx: TrieNodeStatCtxRef = nil; # Context for resuming inspection + db: HexaryTreeDbRef; # Database abstraction + rootKey: NodeKey; # State root + partialPaths = EmptyBlobSeq; # Starting paths for search + resumeCtx = TrieNodeStatCtxRef(nil); # Context for resuming inspection suspendAfter = high(uint64); # To be resumed stopAtLevel = 64u8; # Width-first depth level maxDangling = high(int); # Maximal number of dangling results @@ -182,86 +281,16 @@ proc hexaryInspectTrie*( ## let state = hexaryInspectTrie(db, root, paths, resumeCtx=ctx, 1024) ## ... ## ctx = state.resumeCtx + ## paths = EmptyBlobSeq ## - let rootKey = root.to(RepairKey) - if not db.tab.hasKey(rootKey): - return TrieNodeStat() - - var - reVisit: seq[(RepairKey,NibblesSeq)] - again: seq[(RepairKey,NibblesSeq)] - resumeOk = false - - # Initialise lists from previous session - if not resumeCtx.isNil and - not resumeCtx.persistent and - 0 < resumeCtx.memCtx.len: - resumeOk = true - reVisit = resumeCtx.memCtx - - if partialPaths.len == 0 and not resumeOk: - reVisit.add (rootKey,EmptyNibbleSeq) - else: - # Add argument paths - for w in partialPaths: - let (isLeaf,nibbles) = hexPrefixDecode w - if not isLeaf: - let rc = nibbles.hexaryPathNodeKey(rootKey, db, missingOk=false) - if rc.isOk: - reVisit.add (rc.value.to(RepairKey), nibbles) - - # Stopping on `suspendAfter` has precedence over `stopAtLevel` - while 0 < reVisit.len and result.count <= suspendAfter: - if stopAtLevel < result.level: - result.stopped = true - break - - for n in 0 ..< reVisit.len: - if suspendAfter < result.count or - maxDangling <= result.dangling.len: - # Swallow rest - again &= reVisit[n ..< reVisit.len] - break - - let - (rKey, parentTrail) = reVisit[n] - node = db.tab[rKey] - # parent = rKey.convertTo(NodeKey) -- unused - - case node.kind: - of Extension: - let - trail = parentTrail & node.ePfx - child = node.eLink - db.processLink(stats=result, inspect=again, trail, child) - of Branch: - for n in 0 ..< 16: - let - trail = parentTrail & @[n.byte].initNibbleRange.slice(1) - child = node.bLink[n] - db.processLink(stats=result, inspect=again, trail, child) - of Leaf: - # Ooops, forget node and key - discard - - result.count.inc - # End `for` - - result.level.inc - swap(reVisit, again) - again.setLen(0) - # End while - - if 0 < reVisit.len: - result.resumeCtx = TrieNodeStatCtxRef( - persistent: false, - memCtx: reVisit) + db.inspectTrieImpl(rootKey.to(RepairKey), + partialPaths, resumeCtx, suspendAfter, stopAtLevel, maxDangling) proc hexaryInspectTrie*( getFn: HexaryGetFn; # Database abstraction rootKey: NodeKey; # State root - partialPaths: seq[Blob] = @[]; # Starting paths for search + partialPaths = EmptyBlobSeq; # Starting paths for search resumeCtx: TrieNodeStatCtxRef = nil; # Context for resuming inspection suspendAfter = high(uint64); # To be resumed stopAtLevel = 64u8; # Width-first depth level @@ -269,99 +298,8 @@ proc hexaryInspectTrie*( ): TrieNodeStat {.gcsafe, raises: [CatchableError]} = ## Variant of `hexaryInspectTrie()` for persistent database. - when extraTraceMessages: - let nPaths = paths.len - - let root = rootKey.to(Blob) - if root.getFn().len == 0: - when extraTraceMessages: - trace "Hexary inspect: missing root", nPaths, maxLeafPaths, - rootKey=root.toHex - return TrieNodeStat() - - var - reVisit: seq[(NodeKey,NibblesSeq)] - again: seq[(NodeKey,NibblesSeq)] - resumeOk = false - - # Initialise lists from previous session - if not resumeCtx.isNil and - resumeCtx.persistent and - 0 < resumeCtx.hddCtx.len: - resumeOk = true - reVisit = resumeCtx.hddCtx - - if partialPaths.len == 0 and not resumeOk: - reVisit.add (rootKey,EmptyNibbleSeq) - else: - # Add argument paths - for w in partialPaths: - let (isLeaf,nibbles) = hexPrefixDecode w - if not isLeaf: - let rc = nibbles.hexaryPathNodeKey(rootKey, getFn, missingOk=false) - if rc.isOk: - reVisit.add (rc.value, nibbles) - - # Stopping on `suspendAfter` has precedence over `stopAtLevel` - while 0 < reVisit.len and result.count <= suspendAfter: - when extraTraceMessages: - trace "Hexary inspect processing", nPaths, maxLeafPaths, - level=result.level, nReVisit=reVisit.len, nDangling=result.dangling.len - - if stopAtLevel < result.level: - result.stopped = true - break - - for n in 0 ..< reVisit.len: - if suspendAfter < result.count or - maxDangling <= result.dangling.len: - # Swallow rest - again = again & reVisit[n ..< reVisit.len] - break - - let - (parent, parentTrail) = reVisit[n] - parentBlob = parent.to(Blob).getFn() - if parentBlob.len == 0: - # Ooops, forget node and key - continue - - let nodeRlp = rlpFromBytes parentBlob - case nodeRlp.listLen: - of 2: - let (isLeaf,xPfx) = hexPrefixDecode nodeRlp.listElem(0).toBytes - if not isleaf: - let - trail = parentTrail & xPfx - child = nodeRlp.listElem(1) - getFn.processLink(stats=result, inspect=again, trail, child) - of 17: - for n in 0 ..< 16: - let - trail = parentTrail & @[n.byte].initNibbleRange.slice(1) - child = nodeRlp.listElem(n) - getFn.processLink(stats=result, inspect=again, trail, child) - else: - # Ooops, forget node and key - discard - - result.count.inc - # End `for` - - result.level.inc - swap(reVisit, again) - again.setLen(0) - # End while - - if 0 < reVisit.len: - result.resumeCtx = TrieNodeStatCtxRef( - persistent: true, - hddCtx: reVisit) - - when extraTraceMessages: - trace "Hexary inspect finished", nPaths, maxLeafPaths, - level=result.level, nResumeCtx=reVisit.len, nDangling=result.dangling.len, - maxLevel=stopAtLevel, stopped=result.stopped + getFn.inspectTrieImpl( + rootKey, partialPaths, resumeCtx, suspendAfter, stopAtLevel, maxDangling) # ------------------------------------------------------------------------------ # Public functions, debugging diff --git a/nimbus/sync/snap/worker/db/hexary_nearby.nim b/nimbus/sync/snap/worker/db/hexary_nearby.nim index 1f05a37c4..fbc0fb087 100644 --- a/nimbus/sync/snap/worker/db/hexary_nearby.nim +++ b/nimbus/sync/snap/worker/db/hexary_nearby.nim @@ -21,26 +21,6 @@ import # Private helpers # ------------------------------------------------------------------------------ -proc branchNibbleMin(node: XNodeObj|RNodeRef; minInx: int8): int8 = - ## Find the least index for an argument branch `node` link with index - ## greater or equal the argument `nibble`. - if node.kind == Branch: - for n in minInx .. 15: - if not node.bLink[n].isZeroLink: - return n - -1 - -proc branchNibbleMax(node: XNodeObj|RNodeRef; maxInx: int8): int8 = - ## Find the greatest index for an argument branch `node` link with index - ## less or equal the argument `nibble`. - if node.kind == Branch: - for n in maxInx.countDown 0: - if not node.bLink[n].isZeroLink: - return n - -1 - -# -------------------- - proc `<=`(a, b: NibblesSeq): bool = ## Compare nibbles, different lengths are padded to the right with zeros let abMin = min(a.len, b.len) @@ -135,58 +115,100 @@ proc zeroAdjust( else: pfx <= p.tail - proc branchNibble(w: typeof(path.path[0].node); n: int8): int8 = + proc branchBorderNibble(w: typeof(path.path[0].node); n: int8): int8 = when doLeast: w.branchNibbleMin n else: w.branchNibbleMax n - if path.path.len == 0: - let root = path.root.getNode(db) - if root.isOk: - block fail: - var pfx: NibblesSeq - case root.value.kind: - of Branch: - # Find first non-dangling link and assign it - if path.tail.len == 0: - break fail - let n = root.value.branchNibble path.tail[0].int8 - if n < 0: - break fail - pfx = @[n.byte].initNibbleRange.slice(1) + if path.path.len != 0: + return Result[typeof(path),HexaryError].ok(path) - of Extension: - let ePfx = root.value.ePfx - # Must be followed by a branch node - if path.tail.len < 2 or not path.accept(ePfx): - break fail - let node = root.value.eLink.getNode(db) - if node.isErr: - break fail - let n = node.value.branchNibble path.tail[1].int8 - if n < 0: - break fail - pfx = ePfx & @[n.byte].initNibbleRange.slice(1) + let root = path.root.getNode(db) + if root.isOk: + block fail: + var pfx: NibblesSeq + case root.value.kind: + of Branch: + # Find first non-dangling link and assign it + if path.tail.len == 0: + break fail + let n = root.value.branchBorderNibble path.tail[0].int8 + if n < 0: + # Before or after the database range + return err(NearbyBeyondRange) + pfx = @[n.byte].initNibbleRange.slice(1) - of Leaf: - pfx = root.value.lPfx - if not path.accept(pfx): - break fail + of Extension: + let ePfx = root.value.ePfx + # Must be followed by a branch node + if path.tail.len < 2 or not path.accept(ePfx): + break fail + let node = root.value.eLink.getNode(db) + if node.isErr: + break fail + let n = node.value.branchBorderNibble path.tail[1].int8 + if n < 0: + # Before or after the database range + return err(NearbyBeyondRange) + pfx = ePfx & @[n.byte].initNibbleRange.slice(1) - return pfx.padPartialPath(0).hexaryPath(path.root, db) - path + of Leaf: + pfx = root.value.lPfx + if not path.accept(pfx): + # Before or after the database range + return err(NearbyBeyondRange) + + let newPath = pfx.padPartialPath(0).hexaryPath(path.root, db) + if 0 < newPath.path.len: + return ok(newPath) + + err(NearbyEmptyPath) proc finalise( - path: XPath|RPath; # Partially expanded path - db: HexaryTreeDbRef|HexaryGetFn; # Database abstraction + path: XPath|RPath; # Partially expanded path + db: HexaryTreeDbRef|HexaryGetFn; # Database abstraction + moveRight: static[bool]; # Direction of next node ): auto {.gcsafe, raises: [CatchableError].} = ## Handle some pathological cases after main processing failed + proc beyond(p: typeof(path); pfx: NibblesSeq): bool = + when moveRight: + pfx < p.tail + else: + p.tail < pfx + + proc branchBorderNibble(w: typeof(path.path[0].node)): int8 = + when moveRight: + w.branchNibbleMax 15 + else: + w.branchNibbleMin 0 + + # Just for completeness (this case should have been handled, already) if path.path.len == 0: return Result[typeof(path),HexaryError].err(NearbyEmptyPath) + # Check whether the path is beyond the database range + if 0 < path.tail.len: # nothing to compare against, otherwise + let top = path.path[^1] + # Note that only a `Branch` nodes has a non-zero nibble + if 0 <= top.nibble and top.nibble == top.node.branchBorderNibble: + # Check the following up node + let rc = top.node.bLink[top.nibble].getNode(db) + if rc.isErr: + return err(NearbyDanglingLink) + var pfx: NibblesSeq + case rc.value.kind: + of Leaf: + pfx = rc.value.lPfx + of Extension: + pfx = rc.value.ePfx + of Branch: + pfx = @[rc.value.branchBorderNibble.byte].initNibbleRange.slice(1) + if path.beyond pfx: + return err(NearbyBeyondRange) + # Pathological cases # * finalise right: nfffff.. for n < f or # * finalise left: n00000.. for 0 < n @@ -200,34 +222,36 @@ proc finalise( proc nearbyNext( path: RPath|XPath; # Partially expanded path db: HexaryTreeDbRef|HexaryGetFn; # Database abstraction - doLeast: static[bool]; # Direction: *least* or *most* + moveRight: static[bool]; # Direction of next node pathLenMax = 64; # Beware of loops (if any) ): auto {.gcsafe, raises: [CatchableError].} = ## Unified implementation of `hexaryNearbyRight()` and `hexaryNearbyLeft()`. proc accept(nibble: int8): bool = - ## Accept `nibble` unless on boundaty dependent on `doLeast` - when doLeast: + ## Accept `nibble` unless on boundaty dependent on `moveRight` + when moveRight: nibble < 15 else: 0 < nibble proc accept(p: typeof(path); pfx: NibblesSeq): bool = - when doLeast: + when moveRight: p.tail <= pfx else: pfx <= p.tail proc branchNibbleNext(w: typeof(path.path[0].node); n: int8): int8 = - when doLeast: + when moveRight: w.branchNibbleMin(n + 1) else: w.branchNibbleMax(n - 1) # Some easy cases - var path = path.zeroAdjust(db, doLeast) - if path.path.len == 0: - return Result[typeof(path),HexaryError].err(NearbyEmptyPath) # error + var path = block: + let rc = path.zeroAdjust(db, doLeast=moveRight) + if rc.isErr: + return Result[typeof(path),HexaryError].err(rc.error) + rc.value var uPath = path @@ -266,10 +290,10 @@ proc nearbyNext( case nextNode.kind of Leaf: if uPath.accept(nextNode.lPfx): - return uPath.complete(topLink, db, pathLenMax, doLeast) + return uPath.complete(topLink, db, pathLenMax, doLeast=moveRight) of Extension: if uPath.accept(nextNode.ePfx): - return uPath.complete(topLink, db, pathLenMax, doLeast) + return uPath.complete(topLink, db, pathLenMax, doLeast=moveRight) of Branch: let nextNibble = uPath.tail[0].int8 if start and accept(nextNibble): @@ -284,7 +308,8 @@ proc nearbyNext( let n = step.node.branchNibbleNext step.nibble if 0 <= n: uPath.path[^1].nibble = n - return uPath.complete(step.node.bLink[n], db, pathLenMax, doLeast) + return uPath.complete( + step.node.bLink[n], db, pathLenMax, doLeast=moveRight) if start: # Retry without look ahead @@ -301,19 +326,19 @@ proc nearbyNext( # End while # Handle some pathological cases - return path.finalise(db) + return path.finalise(db, moveRight) proc nearbyNext( baseTag: NodeTag; # Some node rootKey: NodeKey; # State root db: HexaryTreeDbRef|HexaryGetFn; # Database abstraction - doLeast: static[bool]; # Direction: *least* or *most* + moveRight: static[bool]; # Direction of next node pathLenMax = 64; # Beware of loops (if any) ): Result[NodeTag,HexaryError] {.gcsafe, raises: [CatchableError].} = ## Variant of `nearbyNext()`, convenience wrapper - let rc = baseTag.hexaryPath(rootKey, db).nearbyNext(db, doLeast) + let rc = baseTag.hexaryPath(rootKey, db).nearbyNext(db, moveRight) if rc.isErr: return err(rc.error) @@ -340,9 +365,12 @@ proc hexaryNearbyRight*( ## backtrack if there are dangling links in between and rather returns ## an error. ## + ## In the case that there is no more leaf node to the right of the argument + ## path, the particular error code `NearbyBeyondRange` is returned. + ## ## This code is intended to be used for verifying a left-bound proof to ## verify that there is no leaf node *right* of a boundary path value. - path.nearbyNext(db, doLeast=true) + path.nearbyNext(db, moveRight=true) proc hexaryNearbyRight*( baseTag: NodeTag; # Some node @@ -352,7 +380,31 @@ proc hexaryNearbyRight*( {.gcsafe, raises: [CatchableError].} = ## Variant of `hexaryNearbyRight()` working with `NodeTag` arguments rather ## than `RPath` or `XPath` ones. - baseTag.nearbyNext(rootKey, db, doLeast=true) + baseTag.nearbyNext(rootKey, db, moveRight=true) + +proc hexaryNearbyLeft*( + path: RPath|XPath; # Partially expanded path + db: HexaryTreeDbRef|HexaryGetFn; # Database abstraction + ): auto + {.gcsafe, raises: [CatchableError].} = + ## Similar to `hexaryNearbyRight()`. + ## + ## This code is intended to be used for verifying a right-bound proof to + ## verify that there is no leaf node *left* to a boundary path value. + path.nearbyNext(db, moveRight=false) + +proc hexaryNearbyLeft*( + baseTag: NodeTag; # Some node + rootKey: NodeKey; # State root + db: HexaryTreeDbRef|HexaryGetFn; # Database abstraction + ): Result[NodeTag,HexaryError] + {.gcsafe, raises: [CatchableError].} = + ## Similar to `hexaryNearbyRight()` for `NodeKey` arguments. + baseTag.nearbyNext(rootKey, db, moveRight=false) + +# ------------------------------------------------------------------------------ +# Public debugging helpers +# ------------------------------------------------------------------------------ proc hexaryNearbyRightMissing*( path: RPath|XPath; # Partially expanded path @@ -361,9 +413,20 @@ proc hexaryNearbyRightMissing*( {.gcsafe, raises: [KeyError].} = ## Returns `true` if the maximally extended argument nodes `path` is the ## rightmost on the hexary trie database. It verifies that there is no more - ## leaf entry to the right of the argument `path`. - ## - ## This code is intended be used for verifying a left-bound proof. + ## leaf entry to the right of the argument `path`. This function is an + ## an alternative to + ## :: + ## let rc = path.hexaryNearbyRight(db) + ## if rc.isOk: + ## # not at the end => false + ## ... + ## elif rc.error != NearbyBeyondRange: + ## # problem with database => error + ## ... + ## else: + ## # no nore nodes => true + ## ... + ## and is intended mainly for debugging. if path.path.len == 0: return err(NearbyEmptyPath) if 0 < path.tail.len: @@ -390,27 +453,6 @@ proc hexaryNearbyRightMissing*( of Branch: return ok(nextNode.branchNibbleMin(path.tail[0].int8) < 0) - -proc hexaryNearbyLeft*( - path: RPath|XPath; # Partially expanded path - db: HexaryTreeDbRef|HexaryGetFn; # Database abstraction - ): auto - {.gcsafe, raises: [CatchableError].} = - ## Similar to `hexaryNearbyRight()`. - ## - ## This code is intended to be used for verifying a right-bound proof to - ## verify that there is no leaf node *left* to a boundary path value. - path.nearbyNext(db, doLeast=false) - -proc hexaryNearbyLeft*( - baseTag: NodeTag; # Some node - rootKey: NodeKey; # State root - db: HexaryTreeDbRef|HexaryGetFn; # Database abstraction - ): Result[NodeTag,HexaryError] - {.gcsafe, raises: [CatchableError].} = - ## Similar to `hexaryNearbyRight()` for `NodeKey` arguments. - baseTag.nearbyNext(rootKey, db, doLeast=false) - # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/db/hexary_nodes_helper.nim b/nimbus/sync/snap/worker/db/hexary_nodes_helper.nim index 564b6a9b2..a93d7c003 100644 --- a/nimbus/sync/snap/worker/db/hexary_nodes_helper.nim +++ b/nimbus/sync/snap/worker/db/hexary_nodes_helper.nim @@ -108,6 +108,26 @@ proc getNode*( # ------------------ +proc branchNibbleMin*(node: XNodeObj|RNodeRef; minInx: int8): int8 = + ## Find the least index for an argument branch `node` link with index + ## greater or equal the argument `nibble`. + if node.kind == Branch: + for n in minInx .. 15: + if not node.bLink[n].isZeroLink: + return n + -1 + +proc branchNibbleMax*(node: XNodeObj|RNodeRef; maxInx: int8): int8 = + ## Find the greatest index for an argument branch `node` link with index + ## less or equal the argument `nibble`. + if node.kind == Branch: + for n in maxInx.countDown 0: + if not node.bLink[n].isZeroLink: + return n + -1 + +# -------------------- + proc padPartialPath*(pfx: NibblesSeq; dblNibble: byte): NodeKey = ## Extend (or cut) `partialPath` nibbles sequence and generate `NodeKey`. ## This function must be handled with some care regarding a meaningful value diff --git a/nimbus/sync/snap/worker/db/hexary_range.nim b/nimbus/sync/snap/worker/db/hexary_range.nim index 1a6be91ce..42d094d28 100644 --- a/nimbus/sync/snap/worker/db/hexary_range.nim +++ b/nimbus/sync/snap/worker/db/hexary_range.nim @@ -126,7 +126,7 @@ template collectLeafs( let rx = nodeTag.hexaryNearbyLeft(rootKey, db) if rx.isOk: rls.base = rx.value - elif rx.error notin {NearbyFailed,NearbyEmptyPath}: + elif rx.error != NearbyBeyondRange: rc = typeof(rc).err(rx.error) break body @@ -139,7 +139,7 @@ template collectLeafs( xPath = block: let rx = nodeTag.hexaryPath(rootKey,db).hexaryNearbyRight(db) if rx.isErr: - if rx.error notin {NearbyFailed,NearbyEmptyPath}: + if rx.error != NearbyBeyondRange: rc = typeof(rc).err(rx.error) else: rls.leafsLast = true @@ -232,6 +232,47 @@ proc hexaryRangeLeafsProof*( ## `leafList`. db.updateProof(rootKey, rp) + +proc hexaryRangeInflate*( + db: HexaryGetFn|HexaryTreeDbRef; # Database abstraction + rootKey: NodeKey; # State root + nodeKey: NodeTag; # Centre of inflated interval + ): NodeTagRange + {.gcsafe, raises: [CatchableError]} = + ## Calculate the largest leaf range interval containing only the argument + ## `nodeKey`. + ## + ## If the database is fully allocated, then the returned interval ends right + ## before or after the next neighbour leaf node, or at the range type + ## boundaries `low(NodeTag)` or `high(NodeTag)`. + ## + ## If the database is partially allocated only and some of the neighbour + ## nodes are missing, the returned interval is not extended towards this + ## end. + var + leftPt = nodeKey + rightPt = nodeKey + + if low(NodeTag) < nodeKey: + let + pt = nodeKey - 1.u256 + rc = pt.hexaryPath(rootKey,db).hexaryNearbyLeft(db) + if rc.isOk: + leftPt = rc.value.getPartialPath.convertTo(NodeKey).to(NodeTag) + 1.u256 + elif rc.error == NearbyBeyondRange: + leftPt = low(NodeTag) + + if nodeKey < high(NodeTag): + let + pt = nodeKey + 1.u256 + rc = pt.hexaryPath(rootKey,db).hexaryNearbyRight(db) + if rc.isOk: + rightPt = rc.value.getPartialPath.convertTo(NodeKey).to(NodeTag) - 1.u256 + elif rc.error == NearbyBeyondRange: + rightPt = high(NodeTag) + + NodeTagRange.new(leftPt, rightPt) + # ------------------------------------------------------------------------------ # Public helpers # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/db/snapdb_accounts.nim b/nimbus/sync/snap/worker/db/snapdb_accounts.nim index 71064cc63..8aec3748c 100644 --- a/nimbus/sync/snap/worker/db/snapdb_accounts.nim +++ b/nimbus/sync/snap/worker/db/snapdb_accounts.nim @@ -368,7 +368,7 @@ proc importRawAccountsNodes*( when extraTraceMessages: if nErrors == 0: - trace "Raw account nodes imported", peer, slot, nItems, report=result.len + trace "Raw account nodes imported", peer, slot, nItems, nReport=result.len proc importRawAccountsNodes*( pv: SnapDbRef; ## Base descriptor on `ChainDBRef` diff --git a/nimbus/sync/snap/worker/db/snapdb_desc.nim b/nimbus/sync/snap/worker/db/snapdb_desc.nim index 5ceebb446..b61cd633b 100644 --- a/nimbus/sync/snap/worker/db/snapdb_desc.nim +++ b/nimbus/sync/snap/worker/db/snapdb_desc.nim @@ -255,18 +255,18 @@ proc verifyNoMoreRight*( {.gcsafe, raises: [CatchableError].} = ## Verify that there is are no more leaf entries to the right of and ## including `base`. - let - root = root.to(RepairKey) - base = base.to(NodeKey) - rc = base.hexaryPath(root, xDb).hexaryNearbyRightMissing(xDb) - if rc.isErr: - return err(rc.error) - if rc.value: + var error: HexaryError + + let rc = base.hexaryNearbyRight(root, xDb) + if rc.isOk: + error = LowerBoundProofError + elif rc.error != NearbyBeyondRange: + error = rc.error + else: return ok() - let error = LowerBoundProofError when extraTraceMessages: - trace "verifyLeftmostBound()", peer, base=base.pp, error + trace "verifyLeftmostBound()", peer, base, root, error err(error) # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/pivot.nim b/nimbus/sync/snap/worker/pivot.nim index 4f035de0e..745bbb28d 100644 --- a/nimbus/sync/snap/worker/pivot.nim +++ b/nimbus/sync/snap/worker/pivot.nim @@ -308,7 +308,7 @@ proc recoverPivotFromCheckpoint*( # Import processed interval for (minPt,maxPt) in recov.state.processed: if topLevel: - env.fetchAccounts.unprocessed.reduce(minPt, maxPt) + env.fetchAccounts.unprocessed.reduce NodeTagRange.new(minPt, maxPt) discard env.fetchAccounts.processed.merge(minPt, maxPt) discard ctx.pool.coveredAccounts.merge(minPt, maxPt) ctx.pivotAccountsCoverage100PcRollOver() # update coverage level roll over diff --git a/nimbus/sync/snap/worker/pivot/find_missing_nodes.nim b/nimbus/sync/snap/worker/pivot/find_missing_nodes.nim index 0d7830590..5031f571d 100644 --- a/nimbus/sync/snap/worker/pivot/find_missing_nodes.nim +++ b/nimbus/sync/snap/worker/pivot/find_missing_nodes.nim @@ -35,6 +35,9 @@ ## B. Employ the `hexaryInspect()` trie perusal function in a limited mode ## for finding dangling (i.e. missing) sub-nodes below the allocated nodes. ## +## C. Remove empry intervals from the accounting ranges. This is a pure +## maintenance process that applies if A and B fail. +## ## Discussion ## ---------- ## @@ -52,20 +55,47 @@ ## no general solution for *plan B* by recursively searching the whole hexary ## trie database for more dangling nodes. ## +{.push raises: [].} + import std/sequtils, + chronicles, + chronos, eth/common, stew/interval_set, "../../.."/[sync_desc, types], "../.."/[constants, range_desc, worker_desc], - ../db/[hexary_desc, hexary_envelope, hexary_inspect] + ../db/[hexary_desc, hexary_envelope, hexary_error, hexary_inspect, + hexary_nearby] -{.push raises: [].} +logScope: + topics = "snap-find" + +type + MissingNodesSpecs* = object + ## Return type for `findMissingNodes()` + missing*: seq[NodeSpecs] + level*: uint8 + visited*: uint64 + emptyGaps*: NodeTagRangeSet + +const + extraTraceMessages = false # or true + ## Enabled additional logging noise # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ +template logTxt(info: static[string]): static[string] = + "Find missing nodes " & info + +template ignExceptionOops(info: static[string]; code: untyped) = + try: + code + except CatchableError as e: + trace logTxt "Ooops", `info`=info, name=($e.name), msg=(e.msg) + template noExceptionOops(info: static[string]; code: untyped) = try: code @@ -82,7 +112,10 @@ proc findMissingNodes*( rootKey: NodeKey; getFn: HexaryGetFn; planBLevelMax: uint8; - ): (seq[NodeSpecs],uint8,uint64) = + planBRetryMax: int; + planBRetrySleepMs: int; + ): Future[MissingNodesSpecs] + {.async.} = ## Find some missing nodes in the hexary trie database. var nodes: seq[NodeSpecs] @@ -92,40 +125,79 @@ proc findMissingNodes*( # Get unallocated nodes to be fetched let rc = ranges.processed.hexaryEnvelopeDecompose(rootKey, getFn) if rc.isOk: - nodes = rc.value - - # The gaps between resuling envelopes are either ranges that have - # no leaf nodes, or they are contained in the `processed` range. So - # these gaps are be merged back into the `processed` set of ranges. - let gaps = NodeTagRangeSet.init() - discard gaps.merge(low(NodeTag),high(NodeTag)) # All range - for w in nodes: discard gaps.reduce w.hexaryEnvelope # Remove envelopes - - # Merge gaps into `processed` range and update `unprocessed` ranges - for iv in gaps.increasing: - discard ranges.processed.merge iv - ranges.unprocessed.reduce iv - - # Check whether the hexary trie is complete - if ranges.processed.isFull: - return - - # Remove allocated nodes - let missing = nodes.filterIt(it.nodeKey.ByteArray32.getFn().len == 0) + # Extract nodes from the list that do not exisit in the database + # and need to be fetched (and allocated.) + let missing = rc.value.filterIt(it.nodeKey.ByteArray32.getFn().len == 0) if 0 < missing.len: - return (missing, 0u8, 0u64) + when extraTraceMessages: + trace logTxt "plan A", nNodes=nodes.len, nMissing=missing.len + return MissingNodesSpecs(missing: missing) + + when extraTraceMessages: + trace logTxt "plan A not applicable", nNodes=nodes.len # Plan B, carefully employ `hexaryInspect()` + var nRetryCount = 0 if 0 < nodes.len: - try: + ignExceptionOops("compileMissingNodesList"): let paths = nodes.mapIt it.partialPath + suspend = if planBRetrySleepMs <= 0: 1.nanoseconds + else: planBRetrySleepMs.milliseconds + var + maxLevel = planBLevelMax stats = getFn.hexaryInspectTrie(rootKey, paths, - stopAtLevel = planBLevelMax, + stopAtLevel = maxLevel, maxDangling = fetchRequestTrieNodesMax) - result = (stats.dangling, stats.level, stats.count) - except CatchableError: - discard + + while stats.dangling.len == 0 and + nRetryCount < planBRetryMax and + not stats.resumeCtx.isNil: + await sleepAsync suspend + nRetryCount.inc + maxLevel = (120 * maxLevel + 99) div 100 # ~20% increase + trace logTxt "plan B retry", nRetryCount, maxLevel + stats = getFn.hexaryInspectTrie(rootKey, + resumeCtx = stats.resumeCtx, + stopAtLevel = maxLevel, + maxDangling = fetchRequestTrieNodesMax) + + result = MissingNodesSpecs( + missing: stats.dangling, + level: stats.level, + visited: stats.count) + + if 0 < result.missing.len: + when extraTraceMessages: + trace logTxt "plan B", nNodes=nodes.len, nDangling=result.missing.len, + level=result.level, nVisited=result.visited, nRetryCount + return + + when extraTraceMessages: + trace logTxt "plan B not applicable", nNodes=nodes.len, + level=result.level, nVisited=result.visited, nRetryCount + + # Plan C, clean up intervals + + # Calculate `gaps` as the complement of the `processed` set of intervals + let gaps = NodeTagRangeSet.init() + discard gaps.merge(low(NodeTag),high(NodeTag)) + for w in ranges.processed.increasing: discard gaps.reduce w + + # Clean up empty gaps in the processed range + result.emptyGaps = NodeTagRangeSet.init() + for gap in gaps.increasing: + let rc = gap.minPt.hexaryNearbyRight(rootKey,getFn) + if rc.isOk: + # So there is a right end in the database and there is no leaf in + # the right open interval interval [gap.minPt,rc.value). + discard result.emptyGaps.merge(gap.minPt, rc.value) + elif rc.error == NearbyBeyondRange: + discard result.emptyGaps.merge(gap.minPt, high(NodeTag)) + + when extraTraceMessages: + trace logTxt "plan C", nGapFixes=result.emptyGaps.chunks, + nGapOpen=(ranges.processed.chunks - result.emptyGaps.chunks) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/pivot/heal_accounts.nim b/nimbus/sync/snap/worker/pivot/heal_accounts.nim index 350279359..84f7e30ca 100644 --- a/nimbus/sync/snap/worker/pivot/heal_accounts.nim +++ b/nimbus/sync/snap/worker/pivot/heal_accounts.nim @@ -39,7 +39,7 @@ {.push raises: [].} import - std/[math, sequtils, tables], + std/[math, sequtils, sets, tables], chronicles, chronos, eth/[common, p2p, trie/nibbles, trie/trie_defs, rlp], @@ -48,7 +48,8 @@ import "../../.."/[sync_desc, protocol, types], "../.."/[constants, range_desc, worker_desc], ../com/[com_error, get_trie_nodes], - ../db/[hexary_desc, hexary_envelope, hexary_error, snapdb_accounts], + ../db/[hexary_desc, hexary_envelope, hexary_error, hexary_nearby, + hexary_paths, hexary_range, snapdb_accounts], "."/[find_missing_nodes, storage_queue_helper, swap_in] logScope: @@ -58,6 +59,8 @@ const extraTraceMessages = false or true ## Enabled additional logging noise + EmptyBlobSet = HashSet[Blob].default + # ------------------------------------------------------------------------------ # Private logging helpers # ------------------------------------------------------------------------------ @@ -69,7 +72,8 @@ proc `$`(node: NodeSpecs): string = node.partialPath.toHex proc `$`(rs: NodeTagRangeSet): string = - rs.fullFactor.toPC(0) + let ff = rs.fullFactor + if 0.99 <= ff and ff < 1.0: "99%" else: ff.toPC(0) proc `$`(iv: NodeTagRange): string = iv.fullFactor.toPC(3) @@ -99,13 +103,6 @@ template discardRlpError(info: static[string]; code: untyped) = except RlpError: discard -template noExceptionOops(info: static[string]; code: untyped) = - try: - code - except CatchableError as e: - raiseAssert "Inconveivable (" & - info & "): name=" & $e.name & " msg=" & e.msg - # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ @@ -113,7 +110,8 @@ template noExceptionOops(info: static[string]; code: untyped) = proc compileMissingNodesList( buddy: SnapBuddyRef; env: SnapPivotRef; - ): seq[NodeSpecs] = + ): Future[seq[NodeSpecs]] + {.async.} = ## Find some missing glue nodes in accounts database. let ctx = buddy.ctx @@ -127,21 +125,31 @@ proc compileMissingNodesList( discard ctx.swapInAccounts(env) if not fa.processed.isFull: - noExceptionOops("compileMissingNodesList"): - let (missing, nLevel {.used.}, nVisited {.used.}) = fa.findMissingNodes( - rootKey, getFn, healAccountsInspectionPlanBLevel) + let mlv = await fa.findMissingNodes( + rootKey, getFn, + healAccountsInspectionPlanBLevel, + healAccountsInspectionPlanBRetryMax, + healAccountsInspectionPlanBRetryNapMSecs) - when extraTraceMessages: - trace logTxt "missing nodes", peer, - ctx=buddy.healingCtx(env), nLevel, nVisited, - nResult=missing.len, result=missing.toPC + # Clean up empty account ranges found while looking for nodes + if not mlv.emptyGaps.isNil: + for w in mlv.emptyGaps.increasing: + discard env.fetchAccounts.processed.merge w + env.fetchAccounts.unprocessed.reduce w + discard buddy.ctx.pool.coveredAccounts.merge w - result = missing + when extraTraceMessages: + trace logTxt "missing nodes", peer, + ctx=buddy.healingCtx(env), nLevel=mlv.level, nVisited=mlv.visited, + nResult=mlv.missing.len, result=mlv.missing.toPC + + return mlv.missing proc fetchMissingNodes( buddy: SnapBuddyRef; - missingNodes: seq[NodeSpecs]; + missingNodes: seq[NodeSpecs]; # Nodes to fetch from the network + ignore: HashSet[Blob]; # Except for these partial paths listed env: SnapPivotRef; ): Future[seq[NodeSpecs]] {.async.} = @@ -150,47 +158,43 @@ proc fetchMissingNodes( let ctx {.used.} = buddy.ctx peer {.used.} = buddy.peer - stateRoot = env.stateHeader.stateRoot + rootHash = env.stateHeader.stateRoot pivot = "#" & $env.stateHeader.blockNumber # for logging - nMissingNodes= missingNodes.len - nFetchNodes = max(0, nMissingNodes - fetchRequestTrieNodesMax) - - # There is no point in fetching too many nodes as it will be rejected. So - # rest of the `missingNodes` list is ignored to be picked up later. - fetchNodes = missingNodes[0 ..< nFetchNodes] - # Initalise for fetching nodes from the network via `getTrieNodes()` var nodeKey: Table[Blob,NodeKey] # Temporary `path -> key` mapping pathList: seq[SnapTriePaths] # Function argument for `getTrieNodes()` - for w in fetchNodes: - pathList.add SnapTriePaths(accPath: w.partialPath) - nodeKey[w.partialPath] = w.nodeKey - # Fetch nodes from the network. - let rc = await buddy.getTrieNodes(stateRoot, pathList, pivot) - if rc.isOk: - # Reset error counts for detecting repeated timeouts, network errors, etc. - buddy.only.errors.resetComError() + # There is no point in fetching too many nodes as it will be rejected. So + # rest of the `missingNodes` list is ignored to be picked up later. + for w in missingNodes: + if w.partialPath notin ignore and not nodeKey.hasKey(w.partialPath): + pathList.add SnapTriePaths(accPath: w.partialPath) + nodeKey[w.partialPath] = w.nodeKey + if fetchRequestTrieNodesMax <= pathList.len: + break - # Forget about unfetched missing nodes, will be picked up later - return rc.value.nodes.mapIt(NodeSpecs( - partialPath: it.partialPath, - nodeKey: nodeKey[it.partialPath], - data: it.data)) + if 0 < pathList.len: + # Fetch nodes from the network. + let rc = await buddy.getTrieNodes(rootHash, pathList, pivot) + if rc.isOk: + # Reset error counts for detecting repeated timeouts, network errors, etc. + buddy.only.errors.resetComError() - # Process error ... - let - error = rc.error - ok = await buddy.ctrl.stopAfterSeriousComError(error, buddy.only.errors) - when extraTraceMessages: - if ok: - trace logTxt "fetch nodes error => stop", peer, - ctx=buddy.healingCtx(env), error - else: - trace logTxt "fetch nodes error", peer, - ctx=buddy.healingCtx(env), error + # Forget about unfetched missing nodes, will be picked up later + return rc.value.nodes.mapIt(NodeSpecs( + partialPath: it.partialPath, + nodeKey: nodeKey[it.partialPath], + data: it.data)) + + # Process error ... + let + error = rc.error + ok = await buddy.ctrl.stopAfterSeriousComError(error, buddy.only.errors) + when extraTraceMessages: + trace logTxt "reply error", peer, ctx=buddy.healingCtx(env), + error, stop=ok return @[] @@ -235,27 +239,44 @@ proc registerAccountLeaf( ## Process single account node as would be done with an interval by ## the `storeAccounts()` function let - peer {.used.} = buddy.peer + ctx = buddy.ctx + peer = buddy.peer + rootKey = env.stateHeader.stateRoot.to(NodeKey) + getFn = ctx.pool.snapDb.getAccountFn pt = accKey.to(NodeTag) + # Extend interval [pt,pt] if possible + var iv: NodeTagRange + try: + iv = getFn.hexaryRangeInflate(rootKey, pt) + except CatchableError as e: + error logTxt "inflating interval oops", peer, ctx=buddy.healingCtx(env), + accKey, name=($e.name), msg=e.msg + iv = NodeTagRange.new(pt,pt) + # Register isolated leaf node - if 0 < env.fetchAccounts.processed.merge(pt,pt) : + if 0 < env.fetchAccounts.processed.merge iv: env.nAccounts.inc - env.fetchAccounts.unprocessed.reduce(pt,pt) - discard buddy.ctx.pool.coveredAccounts.merge(pt,pt) + env.fetchAccounts.unprocessed.reduce iv + discard buddy.ctx.pool.coveredAccounts.merge iv # Update storage slots batch if acc.storageRoot != emptyRlpHash: env.storageQueueAppendFull(acc.storageRoot, accKey) + #when extraTraceMessages: + # trace logTxt "registered single account", peer, ctx=buddy.healingCtx(env), + # leftSlack=(iv.minPt < pt), rightSlack=(pt < iv.maxPt) + # ------------------------------------------------------------------------------ # Private functions: do the healing for one round # ------------------------------------------------------------------------------ proc accountsHealingImpl( buddy: SnapBuddyRef; + ignore: HashSet[Blob]; env: SnapPivotRef; - ): Future[int] + ): Future[(int,HashSet[Blob])] {.async.} = ## Fetching and merging missing account trie database nodes. It returns the ## number of nodes fetched from the network, and -1 upon error. @@ -269,16 +290,16 @@ proc accountsHealingImpl( discard # Update for changes since last visit - let missingNodes = buddy.compileMissingNodesList(env) + let missingNodes = await buddy.compileMissingNodesList(env) if missingNodes.len == 0: # Nothing to do trace logTxt "nothing to do", peer, ctx=buddy.healingCtx(env) - return 0 # nothing to do + return (0,EmptyBlobSet) # nothing to do # Get next batch of nodes that need to be merged it into the database - let fetchedNodes = await buddy.fetchMissingNodes(missingNodes, env) + let fetchedNodes = await buddy.fetchMissingNodes(missingNodes, ignore, env) if fetchedNodes.len == 0: - return 0 + return (0,EmptyBlobSet) # Store nodes onto disk let @@ -289,23 +310,20 @@ proc accountsHealingImpl( # Storage error, just run the next lap (not much else that can be done) error logTxt "error updating persistent database", peer, ctx=buddy.healingCtx(env), nFetchedNodes, error=report[^1].error - return -1 + return (-1,EmptyBlobSet) # Filter out error and leaf nodes var - nIgnored = 0 nLeafNodes = 0 # for logging + rejected: HashSet[Blob] for w in report: if w.slot.isSome: # non-indexed entries appear typically at the end, though let inx = w.slot.unsafeGet - if w.kind.isNone: - # Error report without node referenece - discard - - elif w.error != NothingSerious: - # Node error, will need to pick up later and download again - nIgnored.inc + # Node error, will need to pick up later and download again. Node that + # there need not be an expicit node specs (so `kind` is opted out.) + if w.kind.isNone or w.error != HexaryError(0): + rejected.incl fetchedNodes[inx].partialPath elif w.kind.unsafeGet == Leaf: # Leaf node has been stored, double check @@ -316,10 +334,10 @@ proc accountsHealingImpl( nLeafNodes.inc when extraTraceMessages: - trace logTxt "merged into database", peer, - ctx=buddy.healingCtx(env), nFetchedNodes, nLeafNodes + trace logTxt "merged into database", peer, ctx=buddy.healingCtx(env), + nFetchedNodes, nLeafNodes, nRejected=rejected.len - return nFetchedNodes - nIgnored + return (nFetchedNodes - rejected.len, rejected) # ------------------------------------------------------------------------------ # Public functions @@ -330,27 +348,32 @@ proc healAccounts*( env: SnapPivotRef; ) {.async.} = ## Fetching and merging missing account trie database nodes. - let - ctx {.used.} = buddy.ctx - peer {.used.} = buddy.peer - when extraTraceMessages: + let + ctx {.used.} = buddy.ctx + peer {.used.} = buddy.peer trace logTxt "started", peer, ctx=buddy.healingCtx(env) + let + fa = env.fetchAccounts var nNodesFetched = 0 nFetchLoop = 0 - # Stop after `healAccountsBatchMax` nodes have been fetched - while nNodesFetched < healAccountsBatchMax: - var nNodes = await buddy.accountsHealingImpl(env) + ignore: HashSet[Blob] + + while not fa.processed.isFull() and + buddy.ctrl.running and + not env.archived: + var (nNodes, rejected) = await buddy.accountsHealingImpl(ignore, env) if nNodes <= 0: break + ignore = ignore + rejected nNodesFetched.inc(nNodes) nFetchLoop.inc when extraTraceMessages: trace logTxt "job done", peer, ctx=buddy.healingCtx(env), - nNodesFetched, nFetchLoop, runState=buddy.ctrl.state + nNodesFetched, nFetchLoop, nIgnore=ignore.len, runState=buddy.ctrl.state # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/pivot/heal_storage_slots.nim b/nimbus/sync/snap/worker/pivot/heal_storage_slots.nim index 789446bc8..1d87fc791 100644 --- a/nimbus/sync/snap/worker/pivot/heal_storage_slots.nim +++ b/nimbus/sync/snap/worker/pivot/heal_storage_slots.nim @@ -123,7 +123,8 @@ proc compileMissingNodesList( buddy: SnapBuddyRef; kvp: SnapSlotsQueuePair; env: SnapPivotRef; - ): seq[NodeSpecs] = + ): Future[seq[NodeSpecs]] + {.async.} = ## Find some missing glue nodes in storage slots database. let ctx = buddy.ctx @@ -133,17 +134,18 @@ proc compileMissingNodesList( getFn = ctx.pool.snapDb.getStorageSlotsFn(kvp.data.accKey) if not slots.processed.isFull: - noExceptionOops("compileMissingNodesList"): - let (missing, nLevel {.used.}, nVisited {.used.}) = - slots.findMissingNodes( - rootKey, getFn, healStorageSlotsInspectionPlanBLevel) + let mlv = await slots.findMissingNodes( + rootKey, getFn, + healStorageSlotsInspectionPlanBLevel, + healStorageSlotsInspectionPlanBRetryMax, + healStorageSlotsInspectionPlanBRetryNapMSecs) - when extraTraceMessages: - trace logTxt "missing nodes", peer, - ctx=buddy.healingCtx(env), nLevel, nVisited, - nResult=missing.len, result=missing.toPC + when extraTraceMessages: + trace logTxt "missing nodes", peer, + ctx=buddy.healingCtx(env), nLevel=mlv.level, nVisited=mlv.visited, + nResult=mlv.missing.len, result=mlv.missing.toPC - result = missing + return mlv.missing proc getNodesFromNetwork( @@ -221,7 +223,7 @@ proc storageSlotsHealing( ctx = buddy.ctx db = ctx.pool.snapDb peer = buddy.peer - missing = buddy.compileMissingNodesList(kvp, env) + missing = await buddy.compileMissingNodesList(kvp, env) if missing.len == 0: trace logTxt "nothing to do", peer, ctx=buddy.healingCtx(kvp,env) diff --git a/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim b/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim index ad96674e7..26798ef5f 100644 --- a/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim +++ b/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim @@ -41,6 +41,8 @@ ## the account including administrative data is queued in ## `env.fetchStorageFull`. ## +{.push raises: [].} + import chronicles, chronos, @@ -54,8 +56,6 @@ import ../db/[hexary_envelope, snapdb_accounts], "."/[storage_queue_helper, swap_in] -{.push raises: [].} - logScope: topics = "snap-range" @@ -132,7 +132,7 @@ proc accountsRangefetchImpl( pivot = "#" & $env.stateHeader.blockNumber rc = await buddy.getAccountRange(stateRoot, iv, pivot) if rc.isErr: - fa.unprocessed.merge iv # fail => interval back to pool + fa.unprocessed.mergeSplit iv # fail => interval back to pool let error = rc.error if await buddy.ctrl.stopAfterSeriousComError(error, buddy.only.errors): when extraTraceMessages: @@ -151,7 +151,7 @@ proc accountsRangefetchImpl( # Now, we fully own the scheduler. The original interval will savely be placed # back for a moment (the `unprocessed` range set to be corrected below.) - fa.unprocessed.merge iv + fa.unprocessed.mergeSplit iv # Processed accounts hashes are set up as a set of intervals which is needed # if the data range returned from the network contains holes. @@ -232,13 +232,21 @@ proc rangeFetchAccounts*( when extraTraceMessages: trace logTxt "start", peer, ctx=buddy.fetchCtx(env) - var nFetchAccounts = 0 # for logging + static: + doAssert 0 <= accountsFetchRetryMax + var + nFetchAccounts = 0 # for logging + nRetry = 0 while not fa.processed.isFull() and buddy.ctrl.running and - not env.archived: - nFetchAccounts.inc - if not await buddy.accountsRangefetchImpl(env): - break + not env.archived and + nRetry <= accountsFetchRetryMax: + # May repeat fetching with re-arranged request intervals + if await buddy.accountsRangefetchImpl(env): + nFetchAccounts.inc + nRetry = 0 + else: + nRetry.inc # Clean up storage slots queue first it it becomes too large let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len diff --git a/nimbus/sync/snap/worker/pivot/storage_queue_helper.nim b/nimbus/sync/snap/worker/pivot/storage_queue_helper.nim index bc44e3114..9c12b7de6 100644 --- a/nimbus/sync/snap/worker/pivot/storage_queue_helper.nim +++ b/nimbus/sync/snap/worker/pivot/storage_queue_helper.nim @@ -110,7 +110,7 @@ proc storageQueueAppendPartialBisect*( env.fetchStoragePart.del acc.storageRoot # Oops, nothing to do return # Done let halfTag = rc.value.minPt + ((rc.value.maxPt - rc.value.minPt) div 2) - data.slots.unprocessed.merge(rc.value.minPt, halfTag) + data.slots.unprocessed.merge NodeTagRange.new(rc.value.minPt, halfTag) proc storageQueueAppend*( diff --git a/nimbus/sync/snap/worker_desc.nim b/nimbus/sync/snap/worker_desc.nim index e12662dca..fca2503ff 100644 --- a/nimbus/sync/snap/worker_desc.nim +++ b/nimbus/sync/snap/worker_desc.nim @@ -165,9 +165,20 @@ proc merge*(q: var SnapTodoRanges; iv: NodeTagRange) = discard q[0].merge(iv) discard q[1].reduce(iv) -proc merge*(q: var SnapTodoRanges; minPt, maxPt: NodeTag) = - ## Variant of `merge()` - q.merge NodeTagRange.new(minPt, maxPt) +proc mergeSplit*(q: var SnapTodoRanges; iv: NodeTagRange) = + ## Ditto w/priorities partially reversed + if 1 < iv.len: + let + midPt = iv.minPt + ((iv.maxPt - iv.minPt) shr 1) + iv1 = NodeTagRange.new(iv.minPt, midPt) + iv2 = NodeTagRange.new(midPt + 1.u256, iv.maxPt) + discard q[0].reduce iv1 + discard q[1].merge iv1 + discard q[0].merge iv2 + discard q[1].reduce iv2 + else: + discard q[0].reduce iv + discard q[1].merge iv proc reduce*(q: var SnapTodoRanges; iv: NodeTagRange) = @@ -175,10 +186,6 @@ proc reduce*(q: var SnapTodoRanges; iv: NodeTagRange) = discard q[0].reduce(iv) discard q[1].reduce(iv) -proc reduce*(q: var SnapTodoRanges; minPt, maxPt: NodeTag) = - ## Variant of `reduce()` - q.reduce NodeTagRange.new(minPt, maxPt) - iterator ivItems*(q: var SnapTodoRanges): NodeTagRange = ## Iterator over all list entries diff --git a/tests/test_sync_snap/test_node_range.nim b/tests/test_sync_snap/test_node_range.nim index b70cd31e2..e2f7b4182 100644 --- a/tests/test_sync_snap/test_node_range.nim +++ b/tests/test_sync_snap/test_node_range.nim @@ -207,6 +207,7 @@ proc verifyRangeProof( leafs: seq[RangeLeaf]; proof: seq[SnapProof]; dbg = HexaryTreeDbRef(nil); + leafBeforeBase = true; ): Result[void,HexaryError] = ## Re-build temporary database and prove or disprove let @@ -217,6 +218,7 @@ proc verifyRangeProof( result = ok() block verify: + let leaf0Tag = leafs[0].key.to(NodeTag) # Import proof nodes result = xDb.mergeProofs(rootKey, proof) @@ -234,11 +236,60 @@ proc verifyRangeProof( break verify # Left proof - result = xDb.verifyLowerBound(rootKey, baseTag, leafs[0].key.to(NodeTag)) + result = xDb.verifyLowerBound(rootKey, baseTag, leaf0Tag) if result.isErr: check result == Result[void,HexaryError].ok() break verify + # Inflated interval around first point + block: + let iv0 = xDb.hexaryRangeInflate(rootKey, leaf0Tag) + # Verify left end + if baseTag == low(NodeTag): + if iv0.minPt != low(NodeTag): + check iv0.minPt == low(NodeTag) + result = Result[void,HexaryError].err(NearbyFailed) + break verify + elif leafBeforeBase: + check iv0.minPt < baseTag + # Verify right end + if 1 < leafs.len: + if iv0.maxPt + 1.u256 != leafs[1].key.to(NodeTag): + check iv0.maxPt + 1.u256 == leafs[1].key.to(NodeTag) + result = Result[void,HexaryError].err(NearbyFailed) + break verify + + # Inflated interval around last point + if 1 < leafs.len: + let + uPt = leafs[^1].key.to(NodeTag) + ivX = xDb.hexaryRangeInflate(rootKey, uPt) + # Verify left end + if leafs[^2].key.to(NodeTag) != ivX.minPt - 1.u256: + check leafs[^2].key.to(NodeTag) == ivX.minPt - 1.u256 + result = Result[void,HexaryError].err(NearbyFailed) + break verify + # Verify right end + if uPt < high(NodeTag): + let + uPt1 = uPt + 1.u256 + rx = uPt1.hexaryPath(rootKey,xDb).hexaryNearbyRightMissing(xDb) + ry = uPt1.hexaryNearbyRight(rootKey, xDb) + if rx.isErr: + if ry.isOk: + check rx.isErr and ry.isErr + result = Result[void,HexaryError].err(NearbyFailed) + break verify + elif rx.value != ry.isErr: + check rx.value == ry.isErr + result = Result[void,HexaryError].err(NearbyFailed) + break verify + if rx.get(otherwise=false): + if ivX.minPt + 1.u256 != high(NodeTag): + check ivX.minPt + 1.u256 == high(NodeTag) + result = Result[void,HexaryError].err(NearbyFailed) + break verify + return ok() if noisy: @@ -396,7 +447,7 @@ proc test_NodeRangeProof*( # This is needed as the range extractor needs the node before the `base` # (if ateher is any) in order to assemble the proof. But this node might # not be present in the partial database. - (base, start) = if w.base == 0.to(NodeTag): (w.base, 0) + (base, start) = if w.base == low(NodeTag): (w.base, 0) else: (first + delta, 1) # Assemble accounts list starting at the second item accounts = w.data.accounts[start ..< min(w.data.accounts.len,maxLen)]