From 5e865edec002f46644284d09fb7f0abfda7ef89a Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Tue, 4 Apr 2023 14:36:18 +0100 Subject: [PATCH] Update snap client storage slots download and healing (#1529) * Fix fringe condition for `GetStorageRanges` message handler why: Receiving a proved empty range was not considered at all. This lead to inconsistencies of the return value which led to subsequent errors. * Update storage range bulk download details; Mainly re-org of storage queue processing in `storage_queue_helper.nim` * Update logging variables/messages * Update storage slots healing details: Mainly clean up after improved helper functions from the sources `find_missing_nodes.nim` and `storage_queue_helper.nim`. * Simplify account fetch why: To much fuss made tolerating some errors. There will be an overall strategy implemented where the concert of download and healing function is orchestrated. * Add error resilience to the concert of download and healing. why: The idea is that a peer might stop serving snap/1 accounts and storage slot downloads while still able to support fetching nodes for healing. --- nimbus/sync/handlers/snap.nim | 19 +- nimbus/sync/snap/constants.nim | 60 +- nimbus/sync/snap/range_desc.nim | 100 ++- nimbus/sync/snap/worker.nim | 19 +- .../snap/worker/com/get_storage_ranges.nim | 102 ++-- nimbus/sync/snap/worker/pivot.nim | 24 +- .../snap/worker/pivot/find_missing_nodes.nim | 18 +- .../sync/snap/worker/pivot/heal_accounts.nim | 39 +- .../snap/worker/pivot/heal_storage_slots.nim | 316 ++++++---- .../worker/pivot/range_fetch_accounts.nim | 51 +- .../pivot/range_fetch_storage_slots.nim | 179 +++--- .../worker/pivot/storage_queue_helper.nim | 567 ++++++++++++------ nimbus/sync/snap/worker/pivot/swap_in.nim | 4 +- nimbus/sync/snap/worker_desc.nim | 38 +- nimbus/sync/sync_desc.nim | 13 +- 15 files changed, 959 insertions(+), 590 deletions(-) diff --git a/nimbus/sync/handlers/snap.nim b/nimbus/sync/handlers/snap.nim index 91c7269ca..842ae48e5 100644 --- a/nimbus/sync/handlers/snap.nim +++ b/nimbus/sync/handlers/snap.nim @@ -123,7 +123,7 @@ proc getSlotsSpecs( # Ignore missing account entry if accData.len == 0: when extraTraceMessages: - trace logTxt "getSlotsSpecs: no such account", accKey + trace logTxt "getSlotsSpecs: no such account", accKey, rootKey return err() # Ignore empty storage list @@ -169,7 +169,8 @@ iterator doTrieNodeSpecs( # Fail on this group when extraTraceMessages: - trace logTxt "doTrieNodeSpecs (blind)", nBlind=w.slotPaths.len + trace logTxt "doTrieNodeSpecs (blind)", accPath=w.accPath.toHex, + nBlind=w.slotPaths.len, nBlind0=w.slotPaths[0].toHex yield (NodeKey.default, nil, EmptyBlob, w.slotPaths.len) @@ -416,14 +417,9 @@ method getStorageRanges*( dataAllocated += rangeProof.leafsSize when extraTraceMessages: - if accounts.len == 1: - trace logTxt "getStorageRanges: single account", iv, - accKey=accHash.to(NodeKey), stoRoot=sp.stoRoot - - #when extraTraceMessages: - # trace logTxt "getStorageRanges: data slots", iv, sizeMax, dataAllocated, - # accKey, stoRoot, nSlots=rangeProof.leafs.len, - # nProof=rangeProof.proof.len + trace logTxt "getStorageRanges: data slots", iv, sizeMax, dataAllocated, + nAccounts=accounts.len, accKey=accHash.to(NodeKey), stoRoot=sp.stoRoot, + nSlots=rangeProof.leafs.len, nProof=rangeProof.proof.len slotLists.add rangeProof.leafs.mapIt(it.to(SnapStorage)) if 0 < rangeProof.proof.len: @@ -494,8 +490,7 @@ method getTrieNodes*( let steps = partPath.hexPrefixDecode[1].hexaryPath(stateKey, getFn) if 0 < steps.path.len and steps.tail.len == 0 and steps.path[^1].nibble < 0: - let data = steps.path[^1].node.convertTo(Blob) - data + steps.path[^1].node.convertTo(Blob) else: EmptyBlob diff --git a/nimbus/sync/snap/constants.nim b/nimbus/sync/snap/constants.nim index aacbac4ab..9bf13ea0a 100644 --- a/nimbus/sync/snap/constants.nim +++ b/nimbus/sync/snap/constants.nim @@ -11,12 +11,16 @@ {.push raises: [].} import + std/sets, eth/[common, trie/nibbles] const EmptyBlob* = seq[byte].default ## Useful shortcut + EmptyBlobSet* = HashSet[Blob].default + ## Useful shortcut + EmptyBlobSeq* = seq[Blob].default ## Useful shortcut @@ -63,11 +67,6 @@ const # -------------- - accountsFetchRetryMax* = 2 - ## The request intervals will be slightly re-arranged after failure. - ## So re-trying to fetch another range might be successful (set to 0 - ## for disabling retries.) - accountsSaveProcessedChunksMax* = 1000 ## Recovery data are stored if the processed ranges list contains no more ## than this many range *chunks*. @@ -82,6 +81,14 @@ const ## If there are too many dangling nodes, no data will be saved and restart ## has to perform from scratch or an earlier checkpoint. + # -------------- + + storageSlotsFetchFailedFullMax* = fetchRequestStorageSlotsMax + 100 + ## Maximal number of failures when fetching full range storage slots. + ## These failed slot ranges are only called for once in the same cycle. + + storageSlotsFetchFailedPartialMax* = 300 + ## Ditto for partial range storage slots. storageSlotsTrieInheritPerusalMax* = 30_000 ## Maximal number of nodes to visit in order to find out whether this @@ -108,26 +115,33 @@ const healAccountsInspectionPlanBLevel* = 4 ## Search this level deep for missing nodes if `hexaryEnvelopeDecompose()` ## only produces existing nodes. - ## - ## The maximal number of nodes visited at level 3 is *4KiB* and at level 4 - ## is *64Kib*. healAccountsInspectionPlanBRetryMax* = 2 - ## Retry inspection if this may times unless there is at least one dangling - ## node found. + ## Retry inspection with depth level argument starting at + ## `healAccountsInspectionPlanBLevel-1` and counting down at most this + ## many times until there is at least one dangling node found and the + ## depth level argument remains positive. The cumulative depth of the + ## iterated seach is + ## :: + ## b 1 + ## Σ ν = --- (b - a + 1) (a + b) + ## a 2 + ## for + ## :: + ## b = healAccountsInspectionPlanBLevel + ## a = b - healAccountsInspectionPlanBRetryMax + ## healAccountsInspectionPlanBRetryNapMSecs* = 2 ## Sleep beween inspection retrys to allow thread switch. If this constant ## is set `0`, `1`ns wait is used. - healSlorageSlotsTrigger* = 0.70 - ## Consider per account storage slost healing if a per-account hexary - ## sub-trie has reached this factor of completeness. + # -------------- - healStorageSlotsInspectionPlanBLevel* = 4 + healStorageSlotsInspectionPlanBLevel* = 5 ## Similar to `healAccountsInspectionPlanBLevel` - healStorageSlotsInspectionPlanBRetryMax* = 2 + healStorageSlotsInspectionPlanBRetryMax* = 99 # 5 + 4 + .. + 1 => 15 ## Similar to `healAccountsInspectionPlanBRetryMax` healStorageSlotsInspectionPlanBRetryNapMSecs* = 2 @@ -138,6 +152,9 @@ const ## this many items will be removed from the batch queue. These items will ## then be processed one by one. + healStorageSlotsFailedMax* = 300 + ## Ditto for partial range storage slots. + # -------------- comErrorsTimeoutMax* = 3 @@ -167,17 +184,8 @@ const static: doAssert storageSlotsQuPrioThresh < accountsSaveStorageSlotsMax - - -# Deprecated, to be expired -const - healInspectionBatch* = 10_000 - ## Number of nodes to inspect in a single batch. In between batches, a - ## task/thread switch is allowed. - - healInspectionBatchWaitNanoSecs* = 500 - ## Wait some time asynchroneously after processing `healInspectionBatch` - ## nodes to allow for a pseudo -task switch. + doAssert 0 <= storageSlotsFetchFailedFullMax + doAssert 0 <= storageSlotsFetchFailedPartialMax # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/range_desc.nim b/nimbus/sync/snap/range_desc.nim index ab026aedc..c2b838300 100644 --- a/nimbus/sync/snap/range_desc.nim +++ b/nimbus/sync/snap/range_desc.nim @@ -13,9 +13,10 @@ import std/[math, sequtils, strutils, hashes], eth/common, - stew/[byteutils, interval_set], + stew/interval_set, stint, ../../constants, + ../../utils/prettify, ../protocol, ../types @@ -71,6 +72,11 @@ type storageRoot*: Hash256 ## Start of storage tree subRange*: Option[NodeTagRange] ## Sub-range of slot range covered + AccountSlotsChanged* = object + ## Variant of `AccountSlotsHeader` representing some transition + account*: AccountSlotsHeader ## Account header + newRange*: Option[NodeTagRange] ## New sub-range (if-any) + AccountStorageRange* = object ## List of storage descriptors, the last `AccountSlots` storage data might ## be incomplete and the `proof` is needed for proving validity. @@ -83,6 +89,8 @@ type account*: AccountSlotsHeader data*: seq[SnapStorage] +# See below for definition of constant `FullNodeTagRange` + # ------------------------------------------------------------------------------ # Public helpers # ------------------------------------------------------------------------------ @@ -189,6 +197,10 @@ proc digestTo*(data: Blob; T: type NodeTag): T = ## Hash the `data` argument keccakHash(data).to(T) +const + # Cannot be defined earlier: `NodeTag` operations needed + FullNodeTagRange* = NodeTagRange.new(low(NodeTag),high(NodeTag)) + # ------------------------------------------------------------------------------ # Public functions: `NodeTagRange` helpers # ------------------------------------------------------------------------------ @@ -205,11 +217,34 @@ proc isEmpty*(lrs: openArray[NodeTagRangeSet]): bool = return false true +proc isEmpty*(iv: NodeTagRange): bool = + ## Ditto for an interval range. + false # trivially by definition + + proc isFull*(lrs: NodeTagRangeSet): bool = ## Returns `true` if the argument set `lrs` contains of the single ## interval [low(NodeTag),high(NodeTag)]. lrs.total == 0 and 0 < lrs.chunks +proc isFull*(lrs: openArray[NodeTagRangeSet]): bool = + ## Variant of `isFull()` where intervals are distributed across several + ## sets. This function makes sense only if the interval sets are mutually + ## disjunct. + var accu: NodeTag + for ivSet in lrs: + if 0 < ivSet.total: + if high(NodeTag) - ivSet.total < accu: + return true + accu = accu + ivSet.total + elif 0 < ivSet.chunks: + # number of points in `ivSet` is `2^256 + 1` + return true + +proc isFull*(iv: NodeTagRange): bool = + ## Ditto for an interval range. + iv == FullNodeTagRange + proc emptyFactor*(lrs: NodeTagRangeSet): float = ## Relative uncovered total, i.e. `#points-not-covered / 2^256` to be used @@ -235,9 +270,11 @@ proc emptyFactor*(lrs: openArray[NodeTagRangeSet]): float = discard else: # number of points in `ivSet` is `2^256 + 1` return 0.0 + # Calculate: (2^256 - accu) / 2^256 if accu == 0.to(NodeTag): - return 1.0 - ((high(NodeTag) - accu).u256 + 1).to(float) / (2.0^256) + 1.0 + else: + ((high(NodeTag) - accu).u256 + 1).to(float) / (2.0^256) proc fullFactor*(lrs: NodeTagRangeSet): float = @@ -250,6 +287,22 @@ proc fullFactor*(lrs: NodeTagRangeSet): float = else: 1.0 # number of points in `lrs` is `2^256 + 1` +proc fullFactor*(lrs: openArray[NodeTagRangeSet]): float = + ## Variant of `fullFactor()` where intervals are distributed across several + ## sets. This function makes sense only if the interval sets are mutually + ## disjunct. + var accu: NodeTag + for ivSet in lrs: + if 0 < ivSet.total: + if high(NodeTag) - ivSet.total < accu: + return 1.0 + accu = accu + ivSet.total + elif ivSet.chunks == 0: + discard + else: # number of points in `ivSet` is `2^256 + 1` + return 1.0 + accu.u256.to(float) / (2.0^256) + proc fullFactor*(iv: NodeTagRange): float = ## Relative covered length of an inetrval, i.e. `#points-covered / 2^256` if 0 < iv.len: @@ -266,8 +319,16 @@ proc `$`*(nodeTag: NodeTag): string = "2^256-1" elif nodeTag == 0.u256.NodeTag: "0" + elif nodeTag == 2.u256.pow(255).NodeTag: + "2^255" # 800... + elif nodeTag == 2.u256.pow(254).NodeTag: + "2^254" # 400.. + elif nodeTag == 2.u256.pow(253).NodeTag: + "2^253" # 200... + elif nodeTag == 2.u256.pow(251).NodeTag: + "2^252" # 100... else: - nodeTag.to(Hash256).data.toHex + nodeTag.UInt256.toHex proc `$`*(nodeKey: NodeKey): string = $nodeKey.to(NodeTag) @@ -293,6 +354,37 @@ proc `$`*(iv: NodeTagRange): string = leafRangePp iv +proc fullPC3*(w: NodeTagRangeSet|NodeTagRange): string = + ## Pretty print fill state of range sets. + if w.isEmpty: + "0%" + elif w.isFull: + "100%" + else: + let ff = w.fullFactor + if ff <= 0.99999: + ff.toPC(3) + else: + "99.999" + +proc fullPC3*(w: openArray[NodeTagRangeSet]): string = + ## Variant of `fullPC3()` where intervals are distributed across several + ## sets. This function makes sense only if the interval sets are mutually + ## disjunct. + if w.isEmpty: + "0%" + else: + let partition = "~" & $w.mapIt(it.chunks).foldl(a+b) + if w.isFull: + "100%" & partition + else: + let ff = w.fullFactor + if ff <= 0.99999: + ff.toPC(3) & partition + else: + "99.999" & partition + + proc dump*( ranges: openArray[NodeTagRangeSet]; moan: proc(overlap: UInt256; iv: NodeTagRange) {.gcsafe.}; diff --git a/nimbus/sync/snap/worker.nim b/nimbus/sync/snap/worker.nim index bd365f8d5..d84a14b6e 100644 --- a/nimbus/sync/snap/worker.nim +++ b/nimbus/sync/snap/worker.nim @@ -9,14 +9,13 @@ # except according to those terms. import - std/[options, sets, strutils], + std/[options, sets], chronicles, chronos, eth/[common, p2p], stew/[interval_set, keyed_queue], ../../common as nimcom, ../../db/select_backend, - ../../utils/prettify, ".."/[handlers, protocol, sync_desc], ./worker/[pivot, ticker], ./worker/com/com_error, @@ -251,11 +250,9 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} = when extraTraceMessages: block: - let - nAccounts {.used.} = env.nAccounts - nSlotLists {.used.} = env.nSlotLists - processed {.used.} = env.fetchAccounts.processed.fullFactor.toPC(2) - trace "Multi sync runner", peer, pivot, nAccounts, nSlotLists, processed, + trace "Multi sync runner", peer, pivot, nAccounts=env.nAccounts, + nSlotLists=env.nSlotLists, + processed=env.fetchAccounts.processed.fullPC3, nStoQu=nStorQuAtStart # This one is the syncing work horse which downloads the database @@ -263,10 +260,10 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} = # Various logging entries (after accounts and storage slots download) let - nAccounts = env.nAccounts - nSlotLists = env.nSlotLists - processed = env.fetchAccounts.processed.fullFactor.toPC(2) - nStoQuLater = env.fetchStorageFull.len + env.fetchStoragePart.len + nAccounts {.used.} = env.nAccounts + nSlotLists {.used.} = env.nSlotLists + processed {.used.} = env.fetchAccounts.processed.fullPC3 + nStoQuLater {.used.} = env.fetchStorageFull.len + env.fetchStoragePart.len if env.archived: # Archive pivot if it became stale diff --git a/nimbus/sync/snap/worker/com/get_storage_ranges.nim b/nimbus/sync/snap/worker/com/get_storage_ranges.nim index 603b2e175..d234a7f60 100644 --- a/nimbus/sync/snap/worker/com/get_storage_ranges.nim +++ b/nimbus/sync/snap/worker/com/get_storage_ranges.nim @@ -33,9 +33,12 @@ type # proof*: seq[SnapProof] GetStorageRanges* = object - leftOver*: seq[AccountSlotsHeader] + leftOver*: seq[AccountSlotsChanged] data*: AccountStorageRange +const + extraTraceMessages = false or true + # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ @@ -68,9 +71,8 @@ proc getStorageRangesReq( return ok(reply) except CatchableError as e: - let error {.used.} = e.msg trace trSnapRecvError & "waiting for GetStorageRanges reply", peer, pivot, - error + name=($e.name), error=(e.msg) return err() # ------------------------------------------------------------------------------ @@ -84,25 +86,28 @@ proc getStorageRanges*( pivot: string; ## For logging, instead of `stateRoot` ): Future[Result[GetStorageRanges,ComError]] {.async.} = - ## Fetch data using the `snap#` protocol, returns the range covered. + ## Fetch data using the `snap/1` protocol, returns the range covered. ## - ## If the first `accounts` argument sequence item has the `firstSlot` field - ## set non-zero, only this account is fetched with a range. Otherwise all - ## accounts are asked for without a range (non-zero `firstSlot` fields are - ## ignored of later sequence items.) - let - peer {.used.} = buddy.peer - var - nAccounts = accounts.len - + ## If the first `accounts` argument sequence item has the optional `subRange` + ## field set, only this account is fetched with for the range `subRange`. + ## Otherwise all accounts are asked for without a range (`subRange` fields + ## are ignored for later accounts list items.) + var nAccounts = accounts.len if nAccounts == 0: return err(ComEmptyAccountsArguments) - if trSnapTracePacketsOk: - trace trSnapSendSending & "GetStorageRanges", peer, pivot, nAccounts + let + peer {.used.} = buddy.peer + iv = accounts[0].subRange + + when trSnapTracePacketsOk: + when extraTraceMessages: + trace trSnapSendSending & "GetStorageRanges", peer, pivot, nAccounts, + iv=iv.get(otherwise=FullNodeTagRange) + else: + trace trSnapSendSending & "GetStorageRanges", peer, pivot, nAccounts let - iv = accounts[0].subRange snStoRanges = block: let rc = await buddy.getStorageRangesReq(stateRoot, accounts.mapIt(it.accKey.to(Hash256)), iv, pivot) @@ -119,7 +124,6 @@ proc getStorageRanges*( return err(ComTooManyStorageSlots) rc.value.get - let nSlotLists = snStoRanges.slotLists.len nProof = snStoRanges.proof.nodes.len @@ -148,40 +152,52 @@ proc getStorageRanges*( # Filter remaining `slots` responses: # * Accounts for empty ones go back to the `leftOver` list. for n in 0 ..< nSlotLists: - # Empty data for a slot indicates missing data - if snStoRanges.slotLists[n].len == 0: - dd.leftOver.add accounts[n] - else: + if 0 < snStoRanges.slotLists[n].len or (n == nSlotLists-1 and 0 < nProof): + # Storage slot data available. The last storage slots list may + # be a proved empty sub-range. dd.data.storages.add AccountSlots( account: accounts[n], # known to be no fewer accounts than slots data: snStoRanges.slotLists[n]) - # Complete the part that was not answered by the peer - if nProof == 0: - # assigning empty slice is ok - dd.leftOver = dd.leftOver & accounts[nSlotLists ..< nAccounts] + else: # if n < nSlotLists-1 or nProof == 0: + # Empty data here indicate missing data + dd.leftOver.add AccountSlotsChanged( + account: accounts[n]) - else: - # Ok, we have a proof now - if 0 < snStoRanges.slotLists[^1].len: - # If the storage data for the last account comes with a proof, then the - # data set is incomplete. So record the missing part on the `dd.leftOver` - # list. + if 0 < nProof: + # Ok, we have a proof now. In that case, there is always a duplicate + # of the proved entry on the `dd.leftOver` list. + # + # Note that `storages[^1]` exists due to the clause + # `(n==nSlotLists-1 and 0 key` mapping - req = SnapTriePaths(accPath: accpath) - for w in fetchNodes: - req.slotPaths.add w.partialPath - nodeKey[w.partialPath] = w.nodeKey - - # Fetch nodes from the network. - let + rootHash = env.stateHeader.stateRoot pivot = "#" & $env.stateHeader.blockNumber # for logging - rc = await buddy.getTrieNodes(storageRoot, @[req], pivot) - if rc.isOk: - # Reset error counts for detecting repeated timeouts, network errors, etc. - buddy.only.errors.resetComError() - return rc.value.nodes.mapIt(NodeSpecs( - partialPath: it.partialPath, - nodeKey: nodeKey[it.partialPath], - data: it.data)) + # Initalise for fetching nodes from the network via `getTrieNodes()` + var + nodeKey: Table[Blob,NodeKey] # Temporary `path -> key` mapping + req = SnapTriePaths(accPath: accPath) # Argument for `getTrieNodes()` - let error = rc.error - if await buddy.ctrl.stopAfterSeriousComError(error, buddy.only.errors): + # There is no point in fetching too many nodes as it will be rejected. So + # rest of the `missingNodes` list is ignored to be picked up later. + for w in missingNodes: + if w.partialPath notin ignore and not nodeKey.hasKey(w.partialPath): + req.slotPaths.add w.partialPath + nodeKey[w.partialPath] = w.nodeKey + if fetchRequestTrieNodesMax <= req.slotPaths.len: + break + + if 0 < req.slotPaths.len: + # Fetch nodes from the network. + let rc = await buddy.getTrieNodes(rootHash, @[req], pivot) + if rc.isOk: + # Reset error counts for detecting repeated timeouts, network errors, etc. + buddy.only.errors.resetComError() + + return rc.value.nodes.mapIt(NodeSpecs( + partialPath: it.partialPath, + nodeKey: nodeKey[it.partialPath], + data: it.data)) + + # Process error ... + let + error = rc.error + ok = await buddy.ctrl.stopAfterSeriousComError(error, buddy.only.errors) when extraTraceMessages: - trace logTxt "fetch nodes error => stop", peer, - ctx=buddy.healingCtx(kvp,env), error + trace logTxt "reply error", peer, ctx=buddy.healingCtx(kvp,env), + error, stop=ok + + return @[] -proc slotKey(node: NodeSpecs): (bool,NodeKey) = - ## Read leaf node from persistent database (if any) - try: +proc kvStoSlotsLeaf( + buddy: SnapBuddyRef; + node: NodeSpecs; # Node data fetched from network + kvp: StoQuSlotsKVP; # For logging + env: SnapPivotRef; # For logging + ): (bool,NodeKey) = + ## Re-read leaf node from persistent database (if any) + var nNibbles = -1 + discardRlpError("kvStorageSlotsLeaf"): let nodeRlp = rlpFromBytes node.data - (_,prefix) = hexPrefixDecode node.partialPath - (_,segment) = hexPrefixDecode nodeRlp.listElem(0).toBytes + prefix = (hexPrefixDecode node.partialPath)[1] + segment = (hexPrefixDecode nodeRlp.listElem(0).toBytes)[1] nibbles = prefix & segment - if nibbles.len == 64: + + nNibbles = nibbles.len + if nNibbles == 64: return (true, nibbles.getBytes.convertTo(NodeKey)) - except CatchableError: - discard + + when extraTraceMessages: + trace logTxt "non-leaf node path or corrupt data", peer=buddy.peer, + ctx=buddy.healingCtx(kvp,env), nNibbles + + +proc registerStoSlotsLeaf( + buddy: SnapBuddyRef; + slotKey: NodeKey; + kvp: StoQuSlotsKVP; + env: SnapPivotRef; + ) = + ## Process single account node as would be done with an interval by + ## the `storeAccounts()` function + let + ctx = buddy.ctx + peer = buddy.peer + rootKey = kvp.key.to(NodeKey) + getSlotFn = ctx.pool.snapDb.getStorageSlotsFn kvp.data.accKey + pt = slotKey.to(NodeTag) + + # Extend interval [pt,pt] if possible + var iv: NodeTagRange + try: + iv = getSlotFn.hexaryRangeInflate(rootKey, pt) + except CatchableError as e: + error logTxt "inflating interval oops", peer, ctx=buddy.healingCtx(kvp,env), + accKey=kvp.data.accKey, slotKey, name=($e.name), msg=e.msg + iv = NodeTagRange.new(pt,pt) + + # Register isolated leaf node + if 0 < kvp.data.slots.processed.merge iv: + kvp.data.slots.unprocessed.reduce iv + + when extraTraceMessages: + trace logTxt "registered single slot", peer, ctx=buddy.healingCtx(env), + leftSlack=(iv.minPt < pt), rightSlack=(pt < iv.maxPt) # ------------------------------------------------------------------------------ # Private functions: do the healing for one work item (sub-trie) # ------------------------------------------------------------------------------ -proc storageSlotsHealing( +proc stoSlotsHealingImpl( buddy: SnapBuddyRef; - kvp: SnapSlotsQueuePair; + ignore: HashSet[Blob]; # Except for these partial paths listed + kvp: StoQuSlotsKVP; env: SnapPivotRef; - ) {.async.} = + ): Future[(int,HashSet[Blob])] + {.async.} = ## Returns `true` is the sub-trie is complete (probably inherited), and ## `false` if there are nodes left to be completed. let @@ -227,51 +287,53 @@ proc storageSlotsHealing( if missing.len == 0: trace logTxt "nothing to do", peer, ctx=buddy.healingCtx(kvp,env) - return - - when extraTraceMessages: - trace logTxt "started", peer, ctx=buddy.healingCtx(kvp,env) + return (0,EmptyBlobSet) # nothing to do # Get next batch of nodes that need to be merged it into the database - let nodeSpecs = await buddy.getNodesFromNetwork(kvp, missing, env) - if nodeSpecs.len == 0: - return + let fetchedNodes = await buddy.getNodesFromNetwork(missing, ignore, kvp, env) + if fetchedNodes.len == 0: + when extraTraceMessages: + trace logTxt "node set unavailable", nMissing=missing.len + return (0,EmptyBlobSet) # Store nodes onto disk - let report = db.importRawStorageSlotsNodes(peer, kvp.data.accKey, nodeSpecs) + let + nFetchedNodes = fetchedNodes.len + report = db.importRawStorageSlotsNodes(peer, kvp.data.accKey, fetchedNodes) + if 0 < report.len and report[^1].slot.isNone: # Storage error, just run the next lap (not much else that can be done) error logTxt "database error", peer, ctx=buddy.healingCtx(kvp,env), - nNodes=nodeSpecs.len, error=report[^1].error - return - - when extraTraceMessages: - trace logTxt "nodes merged into database", peer, - ctx=buddy.healingCtx(kvp,env), nNodes=nodeSpecs.len + nFetchedNodes, error=report[^1].error + return (-1,EmptyBlobSet) # Filter out leaf nodes - var nLeafNodes = 0 # for logging + var + nLeafNodes = 0 # for logging + rejected: HashSet[Blob] + trace logTxt "importRawStorageSlotsNodes", nReport=report.len ######### for w in report: - if w.slot.isSome and w.kind.get(otherwise = Branch) == Leaf: + if w.slot.isSome: # non-indexed entries appear typically at the end, though + let inx = w.slot.unsafeGet - # Leaf Node has been stored, so register it - let - inx = w.slot.unsafeGet - (isLeaf, slotKey) = nodeSpecs[inx].slotKey - if isLeaf: - let - slotTag = slotKey.to(NodeTag) - iv = NodeTagRange.new(slotTag,slotTag) - kvp.data.slots.unprocessed.reduce iv - discard kvp.data.slots.processed.merge iv - nLeafNodes.inc + # Node error, will need to pick up later and download again. Node that + # there need not be an expicit node specs (so `kind` is opted out.) + if w.kind.isNone or w.error != HexaryError(0): + rejected.incl fetchedNodes[inx].partialPath - when extraTraceMessages: - trace logTxt "stored slot", peer, - ctx=buddy.healingCtx(kvp,env), slotKey=slotTag + elif w.kind.unsafeGet == Leaf: + # Leaf node has been stored, double check + let (isLeaf, key) = buddy.kvStoSlotsLeaf(fetchedNodes[inx], kvp, env) + if isLeaf: + # Update `unprocessed` registry, collect storage roots (if any) + buddy.registerStoSlotsLeaf(key, kvp, env) + nLeafNodes.inc when extraTraceMessages: - trace logTxt "job done", peer, ctx=buddy.healingCtx(kvp,env), nLeafNodes + trace logTxt "merged into database", peer, ctx=buddy.healingCtx(kvp,env), + nLeafNodes + + return (nFetchedNodes - rejected.len, rejected) # ------------------------------------------------------------------------------ # Public functions @@ -282,48 +344,48 @@ proc healStorageSlots*( env: SnapPivotRef; ) {.async.} = ## Fetching and merging missing slorage slots trie database nodes. - let - ctx {.used.} = buddy.ctx - peer {.used.} = buddy.peer + when extraTraceMessages: + let peer {.used.} = buddy.peer + trace logTxt "started", peer, ctx=buddy.healingCtx(env) - # Extract healing slot items from partial slots list - var toBeHealed: seq[SnapSlotsQueuePair] - for kvp in env.fetchStoragePart.nextPairs: - # Delete from queue and process this entry - env.fetchStoragePart.del kvp.key + var + nNodesFetched = 0 + nFetchLoop = 0 + ignore: HashSet[Blob] + visited: HashSet[NodeKey] - # Move to returned list unless duplicated in full slots list - if env.fetchStorageFull.eq(kvp.key).isErr: - toBeHealed.add kvp - env.parkedStorage.incl kvp.data.accKey # temporarily parked - if healStorageSlotsBatchMax <= toBeHealed.len: + while buddy.ctrl.running and + visited.len <= healStorageSlotsBatchMax and + ignore.len <= healStorageSlotsFailedMax and + not env.archived: + # Pull out the next request list from the queue + let kvp = block: + let rc = env.storageQueueUnlinkPartialItem visited + if rc.isErr: + when extraTraceMessages: + trace logTxt "queue exhausted", peer, ctx=buddy.healingCtx(env), + nIgnore=ignore.len, nVisited=visited.len break + rc.value - # Run against local batch - let nHealerQueue = toBeHealed.len - if 0 < nHealerQueue: - when extraTraceMessages: - trace logTxt "processing", peer, ctx=buddy.healingCtx(env), nHealerQueue + nFetchLoop.inc - for n in 0 ..< toBeHealed.len: - # Stop processing, hand back the rest - if buddy.ctrl.stopped: - for m in n ..< toBeHealed.len: - let kvp = toBeHealed[n] - discard env.fetchStoragePart.append(kvp.key, kvp.data) - env.parkedStorage.excl kvp.data.accKey - break + # Process request range for healing + let (nNodes, rejected) = await buddy.stoSlotsHealingImpl(ignore, kvp, env) + if kvp.data.slots.processed.isFull: + env.nSlotLists.inc + env.parkedStorage.excl kvp.data.accKey + else: + # Re-queue again, to be re-processed in another cycle + visited.incl kvp.data.accKey + env.storageQueueAppend kvp - let kvp = toBeHealed[n] - await buddy.storageSlotsHealing(kvp, env) - - # Re-queue again unless ready - env.parkedStorage.excl kvp.data.accKey # un-register - if not kvp.data.slots.processed.isFull: - discard env.fetchStoragePart.append(kvp.key, kvp.data) + ignore = ignore + rejected + nNodesFetched.inc(nNodes) when extraTraceMessages: - trace logTxt "done", peer, ctx=buddy.healingCtx(env), nHealerQueue + trace logTxt "done", peer, ctx=buddy.healingCtx(env), + nNodesFetched, nFetchLoop, nIgnore=ignore.len, nVisited=visited.len # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim b/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim index 26798ef5f..d54dd7441 100644 --- a/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim +++ b/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim @@ -57,7 +57,7 @@ import "."/[storage_queue_helper, swap_in] logScope: - topics = "snap-range" + topics = "snap-acc" const extraTraceMessages = false or true @@ -70,19 +70,19 @@ const template logTxt(info: static[string]): static[string] = "Accounts range " & info -#proc `$`(rs: NodeTagRangeSet): string = -# rs.fullFactor.toPC(0) +proc `$`(rs: NodeTagRangeSet): string = + rs.fullPC3 proc `$`(iv: NodeTagRange): string = - iv.fullFactor.toPC(3) + iv.fullPC3 proc fetchCtx( buddy: SnapBuddyRef; env: SnapPivotRef; ): string {.used.} = "{" & - "pivot=" & "#" & $env.stateHeader.blockNumber & "," & - "runState=" & $buddy.ctrl.state & "," & + "piv=" & "#" & $env.stateHeader.blockNumber & "," & + "ctl=" & $buddy.ctrl.state & "," & "nStoQu=" & $env.storageQueueTotal() & "," & "nSlotLists=" & $env.nSlotLists & "}" @@ -133,12 +133,10 @@ proc accountsRangefetchImpl( rc = await buddy.getAccountRange(stateRoot, iv, pivot) if rc.isErr: fa.unprocessed.mergeSplit iv # fail => interval back to pool - let error = rc.error - if await buddy.ctrl.stopAfterSeriousComError(error, buddy.only.errors): + if await buddy.ctrl.stopAfterSeriousComError(rc.error, buddy.only.errors): when extraTraceMessages: - let reqLen {.used.} = $iv trace logTxt "fetch error", peer, ctx=buddy.fetchCtx(env), - reqLen, error + reqLen=iv, error=rc.error return rc.value @@ -169,9 +167,8 @@ proc accountsRangefetchImpl( # Bad data, just try another peer buddy.ctrl.zombie = true when extraTraceMessages: - let reqLen {.used.} = $iv trace logTxt "import failed", peer, ctx=buddy.fetchCtx(env), - gotAccounts, gotStorage, reqLen, covered, error=rc.error + gotAccounts, gotStorage, reqLen=iv, covered, error=rc.error return rc.value @@ -221,32 +218,21 @@ proc rangeFetchAccounts*( env: SnapPivotRef; ) {.async.} = ## Fetch accounts and store them in the database. - let - fa = env.fetchAccounts - + let fa = env.fetchAccounts if not fa.processed.isFull(): - let - ctx {.used.} = buddy.ctx - peer {.used.} = buddy.peer when extraTraceMessages: - trace logTxt "start", peer, ctx=buddy.fetchCtx(env) + trace logTxt "start", peer=buddy.peer, ctx=buddy.fetchCtx(env) - static: - doAssert 0 <= accountsFetchRetryMax - var - nFetchAccounts = 0 # for logging - nRetry = 0 + var nFetchAccounts = 0 # for logging while not fa.processed.isFull() and buddy.ctrl.running and - not env.archived and - nRetry <= accountsFetchRetryMax: + not env.archived: # May repeat fetching with re-arranged request intervals - if await buddy.accountsRangefetchImpl(env): - nFetchAccounts.inc - nRetry = 0 - else: - nRetry.inc + if not await buddy.accountsRangefetchImpl(env): + break + + nFetchAccounts.inc # Clean up storage slots queue first it it becomes too large let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len @@ -254,7 +240,8 @@ proc rangeFetchAccounts*( break when extraTraceMessages: - trace logTxt "done", peer, ctx=buddy.fetchCtx(env), nFetchAccounts + trace logTxt "done", peer=buddy.peer, ctx=buddy.fetchCtx(env), + nFetchAccounts # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/pivot/range_fetch_storage_slots.nim b/nimbus/sync/snap/worker/pivot/range_fetch_storage_slots.nim index 60bbfeae2..d854d7d73 100644 --- a/nimbus/sync/snap/worker/pivot/range_fetch_storage_slots.nim +++ b/nimbus/sync/snap/worker/pivot/range_fetch_storage_slots.nim @@ -65,19 +65,20 @@ {.push raises: [].} import + std/sets, chronicles, chronos, eth/[common, p2p], stew/[interval_set, keyed_queue], stint, ../../../sync_desc, - "../.."/[range_desc, worker_desc], + "../.."/[constants, range_desc, worker_desc], ../com/[com_error, get_storage_ranges], ../db/[hexary_error, snapdb_storage_slots], ./storage_queue_helper logScope: - topics = "snap-range" + topics = "snap-slot" const extraTraceMessages = false or true @@ -93,27 +94,26 @@ proc fetchCtx( buddy: SnapBuddyRef; env: SnapPivotRef; ): string = - let - nStoQu = (env.fetchStorageFull.len + - env.fetchStoragePart.len + - env.parkedStorage.len) "{" & - "pivot=" & "#" & $env.stateHeader.blockNumber & "," & - "runState=" & $buddy.ctrl.state & "," & - "nStoQu=" & $nStoQu & "," & + "piv=" & "#" & $env.stateHeader.blockNumber & "," & + "ctl=" & $buddy.ctrl.state & "," & + "nQuFull=" & $env.fetchStorageFull.len & "," & + "nQuPart=" & $env.fetchStoragePart.len & "," & + "nParked=" & $env.parkedStorage.len & "," & "nSlotLists=" & $env.nSlotLists & "}" # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ -proc storeStoragesSingleBatch( +proc fetchStorageSlotsImpl( buddy: SnapBuddyRef; req: seq[AccountSlotsHeader]; env: SnapPivotRef; - ): Future[bool] + ): Future[Result[HashSet[NodeKey],void]] {.async.} = - ## Fetch account storage slots and store them in the database. + ## Fetch account storage slots and store them in the database, returns + ## number of error or -1 for total failure. let ctx = buddy.ctx peer = buddy.peer @@ -124,29 +124,28 @@ proc storeStoragesSingleBatch( var stoRange = block: let rc = await buddy.getStorageRanges(stateRoot, req, pivot) if rc.isErr: - let error = rc.error - if await buddy.ctrl.stopAfterSeriousComError(error, buddy.only.errors): - trace logTxt "fetch error => stop", peer, ctx=buddy.fetchCtx(env), - nReq=req.len, error - return false # all of `req` failed + if await buddy.ctrl.stopAfterSeriousComError(rc.error, buddy.only.errors): + trace logTxt "fetch error", peer, ctx=buddy.fetchCtx(env), + nReq=req.len, error=rc.error + return err() # all of `req` failed rc.value # Reset error counts for detecting repeated timeouts, network errors, etc. buddy.only.errors.resetComError() - var gotSlotLists = stoRange.data.storages.len - if 0 < gotSlotLists: + var + nSlotLists = stoRange.data.storages.len + reject: HashSet[NodeKey] + if 0 < nSlotLists: # Verify/process storages data and save it to disk let report = ctx.pool.snapDb.importStorageSlots(peer, stoRange.data) if 0 < report.len: if report[^1].slot.isNone: # Failed to store on database, not much that can be done here - gotSlotLists.dec(report.len - 1) # for logging only - error logTxt "import failed", peer, ctx=buddy.fetchCtx(env), - nSlotLists=gotSlotLists, nReq=req.len, error=report[^1].error - return false # all of `req` failed + nSlotLists=0, nReq=req.len, error=report[^1].error + return err() # all of `req` failed # Push back error entries to be processed later for w in report: @@ -155,17 +154,15 @@ proc storeStoragesSingleBatch( let inx = w.slot.get acc = stoRange.data.storages[inx].account + splitOk = w.error in {RootNodeMismatch,RightBoundaryProofFailed} - if w.error == RootNodeMismatch: - # Some pathological case, needs further investigation. For the - # moment, provide partial fetches. - env.storageQueueAppendPartialBisect acc + reject.incl acc.accKey - elif w.error == RightBoundaryProofFailed and - acc.subRange.isSome and 1 < acc.subRange.unsafeGet.len: - # Some pathological case, needs further investigation. For the - # moment, provide a partial fetches. - env.storageQueueAppendPartialBisect acc + if splitOk: + # Some pathological cases need further investigation. For the + # moment, provide partial split requeue. So a different range + # will be unqueued and processed, next time. + env.storageQueueAppendPartialSplit acc else: # Reset any partial result (which would be the last entry) to @@ -173,33 +170,24 @@ proc storeStoragesSingleBatch( # re-fetched completely for this account. env.storageQueueAppendFull acc - # Last entry might be partial (if any) - # - # Forget about partial result processing if the last partial entry - # was reported because - # * either there was an error processing it - # * or there were some gaps reprored as dangling links - stoRange.data.proof = @[] - - # Update local statistics counter for `nSlotLists` counter update - gotSlotLists.dec - - error logTxt "processing error", peer, ctx=buddy.fetchCtx(env), - nSlotLists=gotSlotLists, nReqInx=inx, nReq=req.len, + error logTxt "import error", peer, ctx=buddy.fetchCtx(env), splitOk, + nSlotLists, nRejected=reject.len, nReqInx=inx, nReq=req.len, nDangling=w.dangling.len, error=w.error - # Update statistics - if gotSlotLists == 1 and - req[0].subRange.isSome and - env.fetchStoragePart.hasKey req[0].storageRoot: - # Successful partial request, but not completely done with yet. - gotSlotLists = 0 + # Return unprocessed left overs to batch queue. The `req[^1].subRange` is + # the original range requested for the last item (if any.) + let (_,removed) = env.storageQueueUpdate(stoRange.leftOver, reject) - env.nSlotLists.inc(gotSlotLists) + # Update statistics. The variable removed is set if the queue for a partial + # slot range was logically removed. A partial slot range list has one entry. + # So the correction factor for the slot lists statistics is `removed - 1`. + env.nSlotLists.inc(nSlotLists - reject.len + (removed - 1)) - # Return unprocessed left overs to batch queue - env.storageQueueAppend(stoRange.leftOver, req[^1].subRange) - return true + # Clean up, un-park successful slots (if any) + for w in stoRange.data.storages: + env.parkedStorage.excl w.account.accKey + + return ok(reject) # ------------------------------------------------------------------------------ # Public functions @@ -214,63 +202,58 @@ proc rangeFetchStorageSlots*( ## each work item on the queue at least once.For partial partial slot range ## items this means in case of success that the outstanding range has become ## at least smaller. + when extraTraceMessages: + trace logTxt "start", peer=buddy.peer, ctx=buddy.fetchCtx(env) # Fetch storage data and save it on disk. Storage requests are managed by # request queues for handling full/partial replies and re-fetch issues. For # all practical puroses, this request queue should mostly be empty. - if 0 < env.fetchStorageFull.len or 0 < env.fetchStoragePart.len: - let - ctx = buddy.ctx - peer {.used.} = buddy.peer - - when extraTraceMessages: - trace logTxt "start", peer, ctx=buddy.fetchCtx(env) + for (fetchFn, failMax) in [ + (storageQueueFetchFull, storageSlotsFetchFailedFullMax), + (storageQueueFetchPartial, storageSlotsFetchFailedPartialMax)]: + var + ignored: HashSet[NodeKey] + rc = Result[HashSet[NodeKey],void].ok(ignored) # set ok() start value # Run batch even if `archived` flag is set in order to shrink the queues. - var delayed: seq[AccountSlotsHeader] - while buddy.ctrl.running: + while buddy.ctrl.running and + rc.isOk and + ignored.len <= failMax: + # Pull out the next request list from the queue - let (req, nComplete {.used.}, nPartial {.used.}) = - ctx.storageQueueFetchFull(env) - if req.len == 0: + let reqList = buddy.ctx.fetchFn(env, ignored) + if reqList.len == 0: + when extraTraceMessages: + trace logTxt "queue exhausted", peer=buddy.peer, + ctx=buddy.fetchCtx(env), + isPartQueue=(fetchFn==storageQueueFetchPartial) break - when extraTraceMessages: - trace logTxt "fetch full", peer, ctx=buddy.fetchCtx(env), - nStorageQuFull=env.fetchStorageFull.len, nReq=req.len, - nPartial, nComplete - - if await buddy.storeStoragesSingleBatch(req, env): - for w in req: - env.parkedStorage.excl w.accKey # Done with these items + # Process list, store in database. The `reqList` is re-queued accordingly + # in the `fetchStorageSlotsImpl()` function unless there is an error. In + # the error case, the whole argument list `reqList` is left untouched. + rc = await buddy.fetchStorageSlotsImpl(reqList, env) + if rc.isOk: + for w in rc.value: + ignored.incl w # Ignoring bogus response items else: - delayed &= req - env.storageQueueAppend delayed - - # Ditto for partial queue - delayed.setLen(0) - while buddy.ctrl.running: - # Pull out the next request item from the queue - let rc = env.storageQueueFetchPartial() - if rc.isErr: - break + # Push back unprocessed jobs after error + env.storageQueueAppendPartialSplit reqList when extraTraceMessages: - let - subRange {.used.} = rc.value.subRange.get - account {.used.} = rc.value.accKey - trace logTxt "fetch partial", peer, ctx=buddy.fetchCtx(env), - nStorageQuPart=env.fetchStoragePart.len, subRange, account + trace logTxt "processed", peer=buddy.peer, ctx=buddy.fetchCtx(env), + isPartQueue=(fetchFn==storageQueueFetchPartial), + nReqList=reqList.len, + nIgnored=ignored.len, + subRange0=reqList[0].subRange.get(otherwise=FullNodeTagRange), + account0=reqList[0].accKey, + rc=(if rc.isOk: rc.value.len else: -1) + # End `while` + # End `for` - if await buddy.storeStoragesSingleBatch(@[rc.value], env): - env.parkedStorage.excl rc.value.accKey # Done with this item - else: - delayed.add rc.value - env.storageQueueAppend delayed - - when extraTraceMessages: - trace logTxt "done", peer, ctx=buddy.fetchCtx(env) + when extraTraceMessages: + trace logTxt "done", peer=buddy.peer, ctx=buddy.fetchCtx(env) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/pivot/storage_queue_helper.nim b/nimbus/sync/snap/worker/pivot/storage_queue_helper.nim index 9c12b7de6..207e0b1f4 100644 --- a/nimbus/sync/snap/worker/pivot/storage_queue_helper.nim +++ b/nimbus/sync/snap/worker/pivot/storage_queue_helper.nim @@ -8,19 +8,46 @@ # at your option. This file may not be copied, modified, or distributed # except according to those terms. +{.push raises: [].} + import + std/sets, + chronicles, eth/[common, p2p], stew/[interval_set, keyed_queue], ../../../sync_desc, "../.."/[constants, range_desc, worker_desc], ../db/[hexary_inspect, snapdb_storage_slots] -{.push raises: [].} +logScope: + topics = "snap-slots" + +type + StoQuSlotsKVP* = KeyedQueuePair[Hash256,SnapSlotsQueueItemRef] + ## Key-value return code from `SnapSlotsQueue` handler + + StoQuPartialSlotsQueue = object + ## Return type for `getOrMakePartial()` + stoQu: SnapSlotsQueueItemRef + isCompleted: bool + +const + extraTraceMessages = false # or true + ## Enabled additional logging noise # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ +template logTxt(info: static[string]): static[string] = + "Storage queue " & info + +proc `$`(rs: NodeTagRangeSet): string = + rs.fullPC3 + +proc `$`(tr: SnapTodoRanges): string = + tr.fullPC3 + template noExceptionOops(info: static[string]; code: untyped) = try: code @@ -32,23 +59,151 @@ template noExceptionOops(info: static[string]; code: untyped) = # Private functions # ------------------------------------------------------------------------------ -proc getOrMakePartial( - env: SnapPivotRef; - stoRoot: Hash256; - accKey: NodeKey; - ): (SnapSlotsQueueItemRef, bool) = - ## Create record on `fetchStoragePart` or return existing one - let rc = env.fetchStoragePart.lruFetch stoRoot - if rc.isOk: - result = (rc.value, true) # Value exists - else: - result = (SnapSlotsQueueItemRef(accKey: accKey), false) # New value - env.parkedStorage.excl accKey # Un-park - discard env.fetchStoragePart.append(stoRoot, result[0]) +proc updatePartial( + env: SnapPivotRef; # Current pivot environment + req: AccountSlotsChanged; # Left over account data + ): bool = # List entry was added + ## Update the range of account argument `req` to the partial slot ranges + ## queue. + ## + ## The function returns `true` if a new list entry was added. + let + accKey = req.account.accKey + stoRoot = req.account.storageRoot + noFullEntry = env.fetchStorageFull.delete(stoRoot).isErr + iv = req.account.subRange.get(otherwise = FullNodeTagRange) + jv = req.newRange.get(otherwise = FullNodeTagRange) + (slots, newEntry, newPartEntry) = block: + let rc = env.fetchStoragePart.lruFetch stoRoot + if rc.isOk: + (rc.value.slots, false, false) + else: + # New entry + let + stoSlo = SnapRangeBatchRef(processed: NodeTagRangeSet.init()) + stoItem = SnapSlotsQueueItemRef(accKey: accKey, slots: stoSlo) + discard env.fetchStoragePart.append(stoRoot, stoItem) + stoSlo.unprocessed.init(clear = true) - if result[0].slots.isNil: - result[0].slots = SnapRangeBatchRef(processed: NodeTagRangeSet.init()) - result[0].slots.unprocessed.init() + # Initalise ranges + var newItem = false + if iv == FullNodeTagRange: + # New record (probably was a full range, before) + stoSlo.unprocessed.mergeSplit FullNodeTagRange + newItem = noFullEntry + else: + # Restore `processed` range, `iv` was the left over. + discard stoSlo.processed.merge FullNodeTagRange + discard stoSlo.processed.reduce iv + (stoSlo, newItem, true) + + # Remove delta state relative to original state + if iv != jv: + # Calculate `iv - jv` + let ivSet = NodeTagRangeSet.init() + discard ivSet.merge iv # Previous range + discard ivSet.reduce jv # Left over range + + # Update `processed` by delta range + for w in ivSet.increasing: + discard slots.processed.merge w + + # Update left over + slots.unprocessed.merge jv # Left over range + + when extraTraceMessages: + trace logTxt "updated partially", accKey, iv, jv, + processed=slots.processed, unprocessed=slots.unprocessed, + noFullEntry, newEntry, newPartEntry + + env.parkedStorage.excl accKey # Un-park (if any) + newEntry + + +proc appendPartial( + env: SnapPivotRef; # Current pivot environment + acc: AccountSlotsHeader; # Left over account data + splitMerge: bool; # Bisect or straight merge + ): bool = # List entry was added + ## Append to partial queue. The argument range of `acc` is split so that + ## the next request of this range will result in the right most half size + ## of this very range. + ## + ## The function returns `true` if a new list entry was added. + let + accKey = acc.accKey + stoRoot = acc.storageRoot + notFull = env.fetchStorageFull.delete(stoRoot).isErr + iv = acc.subRange.get(otherwise = FullNodeTagRange) + rc = env.fetchStoragePart.lruFetch acc.storageRoot + (slots,newEntry) = block: + if rc.isOk: + (rc.value.slots, false) + else: + # Restore missing range + let + stoSlo = SnapRangeBatchRef(processed: NodeTagRangeSet.init()) + stoItem = SnapSlotsQueueItemRef(accKey: accKey, slots: stoSlo) + discard env.fetchStoragePart.append(stoRoot, stoItem) + stoSlo.unprocessed.init(clear = true) + discard stoSlo.processed.merge FullNodeTagRange + discard stoSlo.processed.reduce iv + (stoSlo, notFull) + + if splitMerge: + slots.unprocessed.mergeSplit iv + else: + slots.unprocessed.merge iv + + when extraTraceMessages: + trace logTxt "merged partial", splitMerge, accKey, iv, + processed=slots.processed, unprocessed=slots.unprocessed, newEntry + + env.parkedStorage.excl accKey # Un-park (if any) + newEntry + + +proc reducePartial( + env: SnapPivotRef; # Current pivot environment + acc: AccountSlotsHeader; # Left over account data + ): bool = # List entry was removed + ## Reduce range from partial ranges list. + ## + ## The function returns `true` if a list entry was removed. + # So `iv` was not the full range in which case all of `iv` was fully + # processed and there is nothing left. + let + accKey = acc.accKey + stoRoot = acc.storageRoot + notFull = env.fetchStorageFull.delete(stoRoot).isErr + iv = acc.subRange.get(otherwise = FullNodeTagRange) + rc = env.fetchStoragePart.lruFetch stoRoot + + var entryRemoved = false + if rc.isErr: + # This was the last missing range anyway. So there is no need to + # re-insert this entry. + entryRemoved = true # Virtually deleted + when extraTraceMessages: + trace logTxt "reduced partial, discarded", accKey, iv, entryRemoved + else: + let slots = rc.value.slots + discard slots.processed.merge iv + + if slots.processed.isFull: + env.fetchStoragePart.del stoRoot + result = true + when extraTraceMessages: + trace logTxt "reduced partial, deleted", accKey, iv, entryRemoved + else: + slots.unprocessed.reduce iv + when extraTraceMessages: + trace logTxt "reduced partial, completed", accKey, iv, + processed=slots.processed, unprocessed=slots.unprocessed, + entryRemoved + + env.parkedStorage.excl accKey # Un-park (if any) + entryRemoved # ------------------------------------------------------------------------------ # Public helpers @@ -66,130 +221,150 @@ proc storageQueueAppendFull*( env: SnapPivotRef; stoRoot: Hash256; accKey: NodeKey; - ) = - ## Append item to `fetchStorageFull` queue - env.fetchStoragePart.del stoRoot # Not a partial item anymore (if any) - env.parkedStorage.excl accKey # Un-park - discard env.fetchStorageFull.append( - stoRoot, SnapSlotsQueueItemRef(accKey: accKey)) + ): bool + {.discardable.} = + ## Append item to `fetchStorageFull` queue. This undoes the effect of the + ## function `storageQueueFetchFull()`. The function returns `true` if + ## a new entry was added. + let + notPart = env.fetchStoragePart.delete(stoRoot).isErr + stoItem = SnapSlotsQueueItemRef(accKey: accKey) + env.parkedStorage.excl accKey # Un-park (if any) + env.fetchStorageFull.append(stoRoot, stoItem) and notPart proc storageQueueAppendFull*( env: SnapPivotRef; acc: AccountSlotsHeader; - ) = - ## variant of `storageQueueAppendFull()` + ): bool + {.discardable.} = + ## Variant of `storageQueueAppendFull()` env.storageQueueAppendFull(acc.storageRoot, acc.accKey) -proc storageQueueAppendFull*( - env: SnapPivotRef; - kvp: SnapSlotsQueuePair; +proc storageQueueAppendPartialSplit*( + env: SnapPivotRef; # Current pivot environment + acc: AccountSlotsHeader; # Left over account data + ): bool + {.discardable.} = + ## Merge slot range back into partial queue. This undoes the effect of the + ## function `storageQueueFetchPartial()` with the additional feature that + ## the argument range of `acc` is split. So some next range request for this + ## account will result in the right most half size of this very range just + ## inserted. + ## + ## The function returns `true` if a new entry was added. + env.appendPartial(acc, splitMerge=true) + +proc storageQueueAppendPartialSplit*( + env: SnapPivotRef; # Current pivot environment + req: openArray[AccountSlotsHeader]; # List of entries to push back ) = - ## variant of `storageQueueAppendFull()` - env.storageQueueAppendFull(kvp.key, kvp.data.accKey) - - -proc storageQueueAppendPartialBisect*( - env: SnapPivotRef; - acc: AccountSlotsHeader; - ) = - ## Append to partial queue so that the next fetch range is half the size of - ## the current next range. - - # Fetch/rotate queue item - let data = env.getOrMakePartial(acc.storageRoot, acc.accKey)[0] - - # Derive unprocessed ranges => into lower priority queue - data.slots.unprocessed.clear() - discard data.slots.unprocessed[1].merge(low(NodeTag),high(NodeTag)) - for iv in data.slots.processed.increasing: - discard data.slots.unprocessed[1].reduce iv # complements processed ranges - - # Prioritise half of first unprocessed range - let rc = data.slots.unprocessed[1].ge() - if rc.isErr: - env.fetchStoragePart.del acc.storageRoot # Oops, nothing to do - return # Done - let halfTag = rc.value.minPt + ((rc.value.maxPt - rc.value.minPt) div 2) - data.slots.unprocessed.merge NodeTagRange.new(rc.value.minPt, halfTag) - + ## Variant of `storageQueueAppendPartialSplit()` + for w in req: + discard env.appendPartial(w, splitMerge=true) proc storageQueueAppend*( - env: SnapPivotRef; - reqList: openArray[AccountSlotsHeader]; - subRange = none(NodeTagRange); # For a partially fetched slot + env: SnapPivotRef; # Current pivot environment + req: openArray[AccountSlotsHeader]; # List of entries to push back ) = - for n,w in reqList: - env.parkedStorage.excl w.accKey # Un-park - - # Only last item (when `n+1 == reqList.len`) may be registered partial - if w.subRange.isNone or n + 1 < reqList.len: + ## Append a job list of ranges. This undoes the effect of either function + ## `storageQueueFetchFull()` or `storageQueueFetchPartial()`. + for w in req: + let iv = w.subRange.get(otherwise = FullNodeTagRange) + if iv == FullNodeTagRange: env.storageQueueAppendFull w - else: - env.fetchStorageFull.del w.storageRoot + discard env.appendPartial(w, splitMerge=false) +proc storageQueueAppend*( + env: SnapPivotRef; # Current pivot environment + kvp: StoQuSlotsKVP; # List of entries to push back + ) = + ## Insert back a full administrative queue record. This function is typically + ## used after a record was unlinked vis `storageQueueUnlinkPartialItem()`. + let accKey = kvp.data.accKey + env.parkedStorage.excl accKey # Un-park (if any) + + if kvp.data.slots.isNil: + env.fetchStoragePart.del kvp.key # Sanitise data + discard env.fetchStorageFull.append(kvp.key, kvp.data) + + when extraTraceMessages: + trace logTxt "re-queued full", accKey + else: + env.fetchStorageFull.del kvp.key # Sanitise data + + let rc = env.fetchStoragePart.eq kvp.key + if rc.isErr: + discard env.fetchStoragePart.append(kvp.key, kvp.data) + + when extraTraceMessages: + trace logTxt "re-queued partial", + processed=kvp.data.slots.processed, + unprocessed=kvp.data.slots.unprocessed, accKey + else: + # Merge `processed` ranges + for w in kvp.data.slots.processed.increasing: + discard rc.value.slots.processed.merge w + + # Intersect `unprocessed` ranges + for w in kvp.data.slots.unprocessed.ivItems: + rc.value.slots.unprocessed.reduce w + + when extraTraceMessages: + trace logTxt "re-merged partial", + processed=kvp.data.slots.processed, + unprocessed=kvp.data.slots.unprocessed, accKey + +# ------------------------------------------------------------------------------ +# Public functions, modify/update/remove queue items +# ------------------------------------------------------------------------------ + +proc storageQueueUpdate*( + env: SnapPivotRef; # Current pivot environment + req: openArray[AccountSlotsChanged]; # List of entries to push back + ignore: HashSet[NodeKey]; # Ignore accounts with these keys + ): (int,int) = # Added, removed + ## Similar to `storageQueueAppend()`, this functions appends account header + ## entries back into the storage queues. Different to `storageQueueAppend()`, + ## this function is aware of changes after partial downloads from the network. + ## + ## The function returns the tuple `(added, removed)` reflecting the numbers + ## of changed list items (accumulated for partial and full range lists.) + for w in req: + if w.account.accKey notin ignore: let - (data, hasItem) = env.getOrMakePartial(w.storageRoot, w.accKey) - iv = w.subRange.unsafeGet - - # Register partial range - if subRange.isSome: - # The `subRange` is the original request, `iv` the uncompleted part - let reqRange = subRange.unsafeGet - if not hasItem: - # Re-initialise book keeping - discard data.slots.processed.merge(low(NodeTag),high(NodeTag)) - discard data.slots.processed.reduce reqRange - data.slots.unprocessed.clear() - - # Calculate `reqRange - iv` which are the completed ranges - let temp = NodeTagRangeSet.init() - discard temp.merge reqRange - discard temp.reduce iv - - # Update `processed` ranges by adding `reqRange - iv` - for w in temp.increasing: - discard data.slots.processed.merge w - - # Update `unprocessed` ranges - data.slots.unprocessed.merge reqRange - data.slots.unprocessed.reduce iv - - elif hasItem: - # Restore unfetched request - data.slots.unprocessed.merge iv - + iv = w.account.subRange.get(otherwise = FullNodeTagRange) + jv = w.newRange.get(otherwise = FullNodeTagRange) + if jv != FullNodeTagRange: + # So `jv` is some rest after processing. Typically this entry is + # related to partial range response message that came with a proof. + if env.updatePartial w: + result[0].inc + when extraTraceMessages: + trace logTxt "update/append partial", accKey=w.account.accKey, + iv, jv, nAdded=result[0], nRemoved=result[1] + elif jv == iv: + if env.storageQueueAppendFull w.account: + result[0].inc + #when extraTraceMessages: + # trace logTxt "update/append full", accKey=w.account.accKey, + # nAdded=result[0], nRemoved=result[1]t else: - # Makes no sense with a `leftOver` item - env.storageQueueAppendFull w + if env.reducePartial w.account: + result[1].inc + when extraTraceMessages: + trace logTxt "update/reduce partial", accKey=w.account.accKey, + iv, jv, nAdded=result[0], nRemoved=result[1] # ------------------------------------------------------------------------------ -# Public functions, make/create queue items -# ------------------------------------------------------------------------------ - -proc storageQueueGetOrMakePartial*( - env: SnapPivotRef; - stoRoot: Hash256; - accKey: NodeKey; - ): SnapSlotsQueueItemRef = - ## Create record on `fetchStoragePart` or return existing one - env.getOrMakePartial(stoRoot, accKey)[0] - -proc storageQueueGetOrMakePartial*( - env: SnapPivotRef; - acc: AccountSlotsHeader; - ): SnapSlotsQueueItemRef = - ## Variant of `storageQueueGetOrMakePartial()` - env.getOrMakePartial(acc.storageRoot, acc.accKey)[0] - -# ------------------------------------------------------------------------------ -# Public functions, fetch and remove queue items +# Public functions, fetch/remove queue items # ------------------------------------------------------------------------------ proc storageQueueFetchFull*( ctx: SnapCtxRef; # Global context env: SnapPivotRef; # Current pivot environment - ): (seq[AccountSlotsHeader],int,int) = + ignore: HashSet[NodeKey]; # Ignore accounts with these keys + ): seq[AccountSlotsHeader] = ## Fetch a list of at most `fetchRequestStorageSlotsMax` full work items ## from the batch queue. ## @@ -207,84 +382,118 @@ proc storageQueueFetchFull*( ## number of items moved to the partial queue is returned as third item of ## the return code tuple. ## - var - rcList: seq[AccountSlotsHeader] - nComplete = 0 - nPartial = 0 - noExceptionOops("getNextSlotItemsFull"): for kvp in env.fetchStorageFull.nextPairs: - let - getFn = ctx.pool.snapDb.getStorageSlotsFn kvp.data.accKey - rootKey = kvp.key.to(NodeKey) - accItem = AccountSlotsHeader( - accKey: kvp.data.accKey, - storageRoot: kvp.key) + if kvp.data.accKey notin ignore: + let + getFn = ctx.pool.snapDb.getStorageSlotsFn kvp.data.accKey + rootKey = kvp.key.to(NodeKey) + accItem = AccountSlotsHeader( + accKey: kvp.data.accKey, + storageRoot: kvp.key) - # This item will either be returned, discarded, or moved to the partial - # queue subject for healing. So it will be removed from this queue. - env.fetchStorageFull.del kvp.key # OK to delete current link + # This item will eventuallly be returned, discarded, or moved to the + # partial queue (also subject for healing.) So it will be removed from + # the full range lists queue. + env.fetchStorageFull.del kvp.key # OK to delete current link - # Check whether the tree is fully empty - if rootKey.ByteArray32.getFn.len == 0: - # Collect for return - rcList.add accItem - env.parkedStorage.incl accItem.accKey # Registerd as absent + # Check whether the database trie is empty. Otherwise the sub-trie is + # at least partially allocated. + if rootKey.ByteArray32.getFn.len == 0: + # Collect for return + result.add accItem + env.parkedStorage.incl accItem.accKey # Registerd as absent - # Maximal number of items to fetch - if fetchRequestStorageSlotsMax <= rcList.len: - break - else: - # Check how much there is below the top level storage slots node. For - # a small storage trie, this check will be exhaustive. - let stats = getFn.hexaryInspectTrie(rootKey, - suspendAfter = storageSlotsTrieInheritPerusalMax, - maxDangling = 1) - - if stats.dangling.len == 0 and stats.resumeCtx.isNil: - # This storage trie could be fully searched and there was no dangling - # node. So it is complete and can be fully removed from the batch. - nComplete.inc # Update for logging + # Maximal number of items to fetch + if fetchRequestStorageSlotsMax <= result.len: + break # stop here else: - # This item becomes a partially available slot - #let data = env.storageQueueGetOrMakePartial accItem -- notused - nPartial.inc # Update for logging + # Check how much there is below the top level storage slots node. For + # a small storage trie, this check will be exhaustive. + let stats = getFn.hexaryInspectTrie(rootKey, + suspendAfter = storageSlotsTrieInheritPerusalMax, + maxDangling = 1) - (rcList, nComplete, nPartial) - + if stats.dangling.len == 0 and stats.resumeCtx.isNil: + # This storage trie could be fully searched and there was no + # dangling node. So it is complete and can be considered done. + # It can be left removed from the batch queue. + env.nSlotLists.inc # Update for logging + else: + # This item must be treated as a partially available slot + env.storageQueueAppendPartialSplit accItem proc storageQueueFetchPartial*( - env: SnapPivotRef; - ): Result[AccountSlotsHeader,void] = + ctx: SnapCtxRef; # Global context (unused here) + env: SnapPivotRef; # Current pivot environment + ignore: HashSet[NodeKey]; # Ignore accounts with these keys + ): seq[AccountSlotsHeader] = # At most one item ## Get work item from the batch queue. This will typically return the full ## work item and remove it from the queue unless the parially completed ## range is fragmented. - block findItem: - for kvp in env.fetchStoragePart.nextPairs: - # Extract range and return single item request queue - let rc = kvp.data.slots.unprocessed.fetch(maxLen = high(UInt256)) + for kvp in env.fetchStoragePart.nextPairs: + # Extract range and return single item request queue + let + slots = kvp.data.slots + accKey = kvp.data.accKey + accepted = accKey notin ignore + if accepted: + let rc = slots.unprocessed.fetch() if rc.isOk: - result = ok(AccountSlotsHeader( - accKey: kvp.data.accKey, + let reqItem = AccountSlotsHeader( + accKey: accKey, storageRoot: kvp.key, - subRange: some rc.value)) + subRange: some rc.value) - # Delete from batch queue if the `unprocessed` range set becomes empty - # and the `processed` set is the complemet of `rc.value`. - if kvp.data.slots.unprocessed.isEmpty and - high(UInt256) - rc.value.len <= kvp.data.slots.processed.total: - env.fetchStoragePart.del kvp.key - env.parkedStorage.incl kvp.data.accKey # Temporarily parked - return + # Delete from batch queue if the `unprocessed` range has become empty. + if slots.unprocessed.isEmpty and + high(UInt256) - rc.value.len <= slots.processed.total: + # If this is all the rest, the record can be deleted from the todo + # list. If not fully downloaded at a later stage, a new record will + # be created on-the-fly. + env.parkedStorage.incl accKey # Temporarily parked + env.fetchStoragePart.del kvp.key # Last one not needed else: - # Otherwise rotate queue - break findItem - # End for() + # Otherwise accept and update/rotate queue. Note that `lruFetch` + # does leave the item on the queue. + discard env.fetchStoragePart.lruFetch reqItem.storageRoot - return err() + when extraTraceMessages: + trace logTxt "fetched partial", + processed=slots.processed, unprocessed=slots.unprocessed, + accKey, iv=rc.value + return @[reqItem] # done - # Rotate queue item - discard env.fetchStoragePart.lruFetch result.value.storageRoot + when extraTraceMessages: + trace logTxt "rejected partial", accepted, + processed=slots.processed, unprocessed=slots.unprocessed, accKey + # End for() + +proc storageQueueUnlinkPartialItem*( + env: SnapPivotRef; # Current pivot environment + ignore: HashSet[NodeKey]; # Ignore accounts with these keys + ): Result[StoQuSlotsKVP,void] = + ## Fetch an item from the partial list. This item will be removed from the + ## list and ca be re-queued via `storageQueueAppend()`. + for kvp in env.fetchStoragePart.nextPairs: + # Extract range and return single item request queue + let + accKey = kvp.data.accKey + accepted = accKey notin ignore + if accepted: + env.parkedStorage.incl accKey # Temporarily parked + env.fetchStoragePart.del kvp.key # Last one not needed + + when extraTraceMessages: + trace logTxt "unlink partial item", processed=kvp.data.slots.processed, + unprocessed=kvp.data.slots.unprocessed, accKey + return ok(kvp) # done + + when extraTraceMessages: + trace logTxt "unlink partial skip", accepted, + processed=kvp.data.slots.processed, + unprocessed=kvp.data.slots.unprocessed, accKey + # End for() # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/pivot/swap_in.nim b/nimbus/sync/snap/worker/pivot/swap_in.nim index ded88e374..5cb7d0536 100644 --- a/nimbus/sync/snap/worker/pivot/swap_in.nim +++ b/nimbus/sync/snap/worker/pivot/swap_in.nim @@ -72,10 +72,10 @@ proc `$`(node: NodeSpecs): string = node.partialPath.toHex proc `$`(rs: NodeTagRangeSet): string = - rs.fullFactor.toPC(3) + rs.fullPC3 proc `$`(iv: NodeTagRange): string = - iv.fullFactor.toPC(3) + iv.fullPC3 proc toPC(w: openArray[NodeSpecs]; n: static[int] = 3): string = let sumUp = w.mapIt(it.hexaryEnvelope.len).foldl(a+b, 0.u256) diff --git a/nimbus/sync/snap/worker_desc.nim b/nimbus/sync/snap/worker_desc.nim index fca2503ff..da8f2b178 100644 --- a/nimbus/sync/snap/worker_desc.nim +++ b/nimbus/sync/snap/worker_desc.nim @@ -32,9 +32,6 @@ type ## there is only a partial list of slots to fetch, the queue entry is ## stored left-most for easy access. - SnapSlotsQueuePair* = KeyedQueuePair[Hash256,SnapSlotsQueueItemRef] - ## Key-value return code from `SnapSlotsQueue` handler - SnapSlotsQueueItemRef* = ref object ## Storage slots request data. This entry is similar to `AccountSlotsHeader` ## where the optional `subRange` interval has been replaced by an interval @@ -71,7 +68,7 @@ type nSlotLists*: uint64 ## Imported # of account storage tries # Mothballing, ready to be swapped into newer pivot record - storageAccounts*: SnapAccountsList ## Accounts with missing stortage slots + storageAccounts*: SnapAccountsList ## Accounts with missing storage slots archived*: bool ## Not latest pivot, anymore SnapPivotTable* = KeyedQueue[Hash256,SnapPivotRef] @@ -142,7 +139,7 @@ proc pivotAccountsCoverage100PcRollOver*(ctx: SnapCtxRef) = # Public helpers: SnapTodoRanges # ------------------------------------------------------------------------------ -proc init*(q: var SnapTodoRanges) = +proc init*(q: var SnapTodoRanges; clear = false) = ## Populate node range sets with maximal range in the first range set. This ## kind of pair or interval sets is managed as follows: ## * As long as possible, fetch and merge back intervals on the first set. @@ -152,7 +149,8 @@ proc init*(q: var SnapTodoRanges) = ## is considered after the prioitised intervals are exhausted. q[0] = NodeTagRangeSet.init() q[1] = NodeTagRangeSet.init() - discard q[0].merge(low(NodeTag),high(NodeTag)) + if not clear: + discard q[0].merge FullNodeTagRange proc clear*(q: var SnapTodoRanges) = ## Reset argument range sets empty. @@ -167,8 +165,12 @@ proc merge*(q: var SnapTodoRanges; iv: NodeTagRange) = proc mergeSplit*(q: var SnapTodoRanges; iv: NodeTagRange) = ## Ditto w/priorities partially reversed - if 1 < iv.len: + if iv.len == 1: + discard q[0].reduce iv + discard q[1].merge iv + else: let + # note that (`iv.len` == 0) => (`iv` == `FullNodeTagRange`) midPt = iv.minPt + ((iv.maxPt - iv.minPt) shr 1) iv1 = NodeTagRange.new(iv.minPt, midPt) iv2 = NodeTagRange.new(midPt + 1.u256, iv.maxPt) @@ -176,9 +178,6 @@ proc mergeSplit*(q: var SnapTodoRanges; iv: NodeTagRange) = discard q[1].merge iv1 discard q[0].merge iv2 discard q[1].reduce iv2 - else: - discard q[0].reduce iv - discard q[1].merge iv proc reduce*(q: var SnapTodoRanges; iv: NodeTagRange) = @@ -194,8 +193,9 @@ iterator ivItems*(q: var SnapTodoRanges): NodeTagRange = yield iv -proc fetch*(q: var SnapTodoRanges; maxLen: UInt256): Result[NodeTagRange,void] = - ## Fetch interval from node ranges with maximal size `maxLen` +proc fetch*(q: var SnapTodoRanges; maxLen = 0.u256): Result[NodeTagRange,void] = + ## Fetch interval from node ranges with maximal size `maxLen`, where + ## `0.u256` is interpreted as `2^256`. # Swap batch queues if the first one is empty if q[0].isEmpty: @@ -207,9 +207,17 @@ proc fetch*(q: var SnapTodoRanges; maxLen: UInt256): Result[NodeTagRange,void] = return err() let - val = rc.value - iv = if 0 < val.len and val.len <= maxLen: val # val.len==0 => 2^256 - else: NodeTagRange.new(val.minPt, val.minPt + (maxLen - 1.u256)) + jv = rc.value + iv = block: + if maxLen == 0 or (0 < jv.len and jv.len <= maxLen): + jv + else: + # Note that either: + # (`jv.len` == 0) => (`jv` == `FullNodeTagRange`) => `jv.minPt` == 0 + # or + # (`maxLen` < `jv.len`) => (`jv.minPt`+`maxLen` <= `jv.maxPt`) + NodeTagRange.new(jv.minPt, jv.minPt + maxLen) + discard q[0].reduce(iv) ok(iv) diff --git a/nimbus/sync/sync_desc.nim b/nimbus/sync/sync_desc.nim index cd42ea11e..62a4df243 100644 --- a/nimbus/sync/sync_desc.nim +++ b/nimbus/sync/sync_desc.nim @@ -13,19 +13,19 @@ ## ## Public descriptors +{.push raises: [].} + import #std/options, eth/[common, p2p], ../core/chain, ../db/db_chain, - ./handlers + ./handlers/eth export chain, db_chain -{.push raises: [].} - type BuddyRunState* = enum Running = 0 ## Running, default state @@ -121,6 +121,13 @@ proc `stopped=`*(ctrl: BuddyCtrlRef; value: bool) = else: discard +proc `forceRun=`*(ctrl: BuddyCtrlRef; value: bool) = + ## Setter, gets out of `Zombie` jail/locked state with `true argument. + if value: + ctrl.runState = Running + else: + ctrl.stopped = true + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------