diff --git a/nimbus/db/storage_types.nim b/nimbus/db/storage_types.nim index 1484bdbda..b94a07dcf 100644 --- a/nimbus/db/storage_types.nim +++ b/nimbus/db/storage_types.nim @@ -19,6 +19,8 @@ type skeletonBlockHashToNumber skeletonBlock skeletonTransaction + snapSyncAccount + snapSyncStorageSlot DbKey* = object # The first byte stores the key type. The rest are key-specific values @@ -104,6 +106,18 @@ proc skeletonTransactionKey*(u: BlockNumber): DbKey {.inline.} = copyMem(addr result.data[1], unsafeAddr u, sizeof(u)) result.dataEndPos = uint8 sizeof(u) +proc snapSyncAccountKey*(h: openArray[byte]): DbKey {.inline.} = + doAssert(h.len == 32) + result.data[0] = byte ord(snapSyncAccount) + result.data[1 .. 32] = h + result.dataEndPos = uint8 sizeof(h) + +proc snapSyncStorageSlotKey*(h: openArray[byte]): DbKey {.inline.} = + doAssert(h.len == 32) + result.data[0] = byte ord(snapSyncStorageSlot) + result.data[1 .. 32] = h + result.dataEndPos = uint8 sizeof(h) + template toOpenArray*(k: DbKey): openArray[byte] = k.data.toOpenArray(0, int(k.dataEndPos)) diff --git a/nimbus/sync/snap/constants.nim b/nimbus/sync/snap/constants.nim index ff1a97d74..8153ef888 100644 --- a/nimbus/sync/snap/constants.nim +++ b/nimbus/sync/snap/constants.nim @@ -51,9 +51,18 @@ const ## Keap on gloing in healing task up until this many nodes have been ## fetched from the network or some error contition therminates the task. + snapNewBuddyStoragesSlotsQuPrioThresh* = 5_000 + ## For a new worker, prioritise processing the storage slots queue over + ## processing accounts if the queue has more than this many items. + + snapAccountsBuddyStoragesSlotsQuPrioThresh* = 30_000 + ## For a running worker processing accounts, stop processing accounts + ## and switch to processing the storage slots queue if the queue has + ## more than this many items. + # -------------- - healAccountsTrigger* = 0.95 + healAccountsTrigger* = 0.99 ## Apply accounts healing if the global snap download coverage factor ## exceeds this setting. The global coverage factor is derived by merging ## all account ranges retrieved for all pivot state roots (see @@ -76,7 +85,7 @@ const # -------------- - comErrorsTimeoutMax* = 4 + comErrorsTimeoutMax* = 3 ## Maximal number of non-resonses accepted in a row. If there are more than ## `comErrorsTimeoutMax` consecutive errors, the worker will be degraded ## as zombie. diff --git a/nimbus/sync/snap/range_desc.nim b/nimbus/sync/snap/range_desc.nim index 411ca3eaa..72cb08976 100644 --- a/nimbus/sync/snap/range_desc.nim +++ b/nimbus/sync/snap/range_desc.nim @@ -9,8 +9,8 @@ # distributed except according to those terms. import - std/[math, sequtils, hashes], - eth/common/eth_types_rlp, + std/[math, sequtils, strutils, hashes], + eth/[common, trie/nibbles], stew/[byteutils, interval_set], stint, ../../constants, @@ -72,15 +72,37 @@ type AccountStorageRange* = object ## List of storage descriptors, the last `AccountSlots` storage data might - ## be incomplete and tthe `proof` is needed for proving validity. + ## be incomplete and the `proof` is needed for proving validity. storages*: seq[AccountSlots] ## List of accounts and storage data proof*: SnapStorageProof ## Boundary proofs for last entry + base*: NodeTag ## Lower limit for last entry w/proof AccountSlots* = object ## Account storage descriptor account*: AccountSlotsHeader data*: seq[SnapStorage] +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +proc padPartialPath(partialPath: NibblesSeq; dblNibble: byte): NodeKey = + ## Extend (or cut) `partialPath` nibbles sequence and generate `NodeKey` + # Pad with zeroes + var padded: NibblesSeq + + let padLen = 64 - partialPath.len + if 0 <= padLen: + padded = partialPath & dblNibble.repeat(padlen div 2).initNibbleRange + if (padLen and 1) == 1: + padded = padded & @[dblNibble].initNibbleRange.slice(1) + else: + let nope = seq[byte].default.initNibbleRange + padded = partialPath.slice(0,63) & nope # nope forces re-alignment + + let bytes = padded.getBytes + (addr result.ByteArray32[0]).copyMem(unsafeAddr bytes[0], bytes.len) + # ------------------------------------------------------------------------------ # Public helpers # ------------------------------------------------------------------------------ @@ -117,6 +139,11 @@ proc to*(n: SomeUnsignedInt|UInt256; T: type NodeTag): T = ## Syntactic sugar n.u256.T +proc min*(partialPath: Blob; T: type NodeKey): T = + (hexPrefixDecode partialPath)[1].padPartialPath(0) + +proc max*(partialPath: Blob; T: type NodeKey): T = + (hexPrefixDecode partialPath)[1].padPartialPath(0xff) proc digestTo*(data: Blob; T: type NodeKey): T = keccakHash(data).data.T @@ -273,7 +300,7 @@ proc fullFactor*(lrs: openArray[NodeTagRangeSet]): float = proc `$`*(nodeTag: NodeTag): string = if nodeTag == high(NodeTag): - "high(NodeTag)" + "2^256-1" elif nodeTag == 0.u256.NodeTag: "0" else: @@ -317,6 +344,61 @@ proc `$`*(n: NodeSpecs): string = result &= "ditto" result &= ")" +proc dump*( + ranges: openArray[NodeTagRangeSet]; + moan: proc(overlap: UInt256; iv: NodeTagRange) {.gcsafe.}; + printRangesMax = high(int); + ): string = + ## Dump/anlalyse range sets + var + cache: NodeTagRangeSet + ivTotal = 0.u256 + ivCarry = false + + if ranges.len == 1: + cache = ranges[0] + ivTotal = cache.total + if ivTotal == 0.u256 and 0 < cache.chunks: + ivCarry = true + else: + cache = NodeTagRangeSet.init() + for ivSet in ranges: + if ivSet.total == 0.u256 and 0 < ivSet.chunks: + ivCarry = true + elif ivTotal <= high(UInt256) - ivSet.total: + ivTotal += ivSet.total + else: + ivCarry = true + for iv in ivSet.increasing(): + let n = cache.merge(iv) + if n != iv.len and not moan.isNil: + moan(iv.len - n, iv) + + if 0 == cache.total and 0 < cache.chunks: + result = "2^256" + if not ivCarry: + result &= ":" & $ivTotal + else: + result = $cache.total + if ivCarry: + result &= ":2^256" + elif ivTotal != cache.total: + result &= ":" & $ivTotal + + result &= ":" + if cache.chunks <= printRangesMax: + result &= toSeq(cache.increasing).mapIt($it).join(",") + else: + result &= toSeq(cache.increasing).mapIt($it)[0 ..< printRangesMax].join(",") + result &= " " & $(cache.chunks - printRangesMax) & " more .." + +proc dump*( + range: NodeTagRangeSet; + printRangesMax = high(int); + ): string = + ## Ditto + [range].dump(nil, printRangesMax) + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker.nim b/nimbus/sync/snap/worker.nim index 55420553d..e0ac9709a 100644 --- a/nimbus/sync/snap/worker.nim +++ b/nimbus/sync/snap/worker.nim @@ -77,35 +77,38 @@ proc `pivot=`(buddy: SnapBuddyRef; val: BestPivotWorkerRef) = proc init(batch: var SnapTrieRangeBatch; ctx: SnapCtxRef) = ## Returns a pair of account hash range lists with the full range of hashes ## smartly spread across the mutually disjunct interval sets. - for n in 0 ..< batch.unprocessed.len: - batch.unprocessed[n] = NodeTagRangeSet.init() + batch.unprocessed.init() # Initialise accounts range fetch batch, the pair of `fetchAccounts[]` # range sets. - if ctx.data.coveredAccounts.total == 0 and - ctx.data.coveredAccounts.chunks == 1: - # All (i.e. 100%) of accounts hashes are covered by completed range fetch - # processes for all pivot environments. Do a random split distributing the - # full accounts hash range across the pair of range sats. - var nodeKey: NodeKey - ctx.data.rng[].generate(nodeKey.ByteArray32) - - let partition = nodeKey.to(NodeTag) - discard batch.unprocessed[0].merge(partition, high(NodeTag)) - if low(NodeTag) < partition: - discard batch.unprocessed[1].merge(low(NodeTag), partition - 1.u256) + if ctx.data.coveredAccounts.isFull: + # All of accounts hashes are covered by completed range fetch processes + # for all pivot environments. Do a random split distributing the full + # accounts hash range across the pair of range sets. + for _ in 0 .. 5: + var nodeKey: NodeKey + ctx.data.rng[].generate(nodeKey.ByteArray32) + let top = nodeKey.to(NodeTag) + if low(NodeTag) < top and top < high(NodeTag): + # Move covered account ranges (aka intervals) to the second set. + batch.unprocessed.merge NodeTagRange.new(low(NodeTag), top) + break + # Otherwise there is a full single range in `unprocessed[0]` else: # Not all account hashes are covered, yet. So keep the uncovered # account hashes in the first range set, and the other account hashes # in the second range set. - - # Pre-filled with the first range set with largest possible interval - discard batch.unprocessed[0].merge(low(NodeTag),high(NodeTag)) - - # Move covered account ranges (aka intervals) to the second set. for iv in ctx.data.coveredAccounts.increasing: - discard batch.unprocessed[0].reduce(iv) - discard batch.unprocessed[1].merge(iv) + # Move covered account ranges (aka intervals) to the second set. + batch.unprocessed.merge(iv) + + if batch.unprocessed[0].isEmpty: + doAssert batch.unprocessed[1].isFull + elif batch.unprocessed[1].isEmpty: + doAssert batch.unprocessed[0].isFull + else: + doAssert((batch.unprocessed[0].total - 1) + + batch.unprocessed[1].total == high(UInt256)) proc appendPivotEnv(buddy: SnapBuddyRef; header: BlockHeader) = @@ -198,7 +201,8 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater = pivotBlock = if env.isNil: none(BlockNumber) else: some(env.stateHeader.blockNumber) stoQuLen = if env.isNil: none(uint64) - else: some(env.fetchStorage.len.uint64) + else: some(env.fetchStorageFull.len.uint64 + + env.fetchStoragePart.len.uint64) accCoverage = ctx.data.coveredAccounts.fullFactor accFill = meanStdDev(uSum, uSqSum, count) @@ -329,7 +333,8 @@ proc runPool*(buddy: SnapBuddyRef, last: bool) = env.accountsState = HealerDone # Check whether storage slots are complete - if env.fetchStorage.len == 0: + if env.fetchStorageFull.len == 0 and + env.fetchStoragePart.len == 0: env.storageDone = true if extraTraceMessages: @@ -389,12 +394,19 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} = block: let rc = ctx.data.pivotTable.beforeLastValue if rc.isOk: - let nFetchStorage = rc.value.fetchStorage.len + let nFetchStorage = + rc.value.fetchStorageFull.len + rc.value.fetchStoragePart.len if 0 < nFetchStorage: trace "Cleaning up previous pivot", peer, pivot, nFetchStorage - rc.value.fetchStorage.clear() - rc.value.fetchAccounts.checkNodes.setLen(0) - rc.value.fetchAccounts.missingNodes.setLen(0) + rc.value.fetchStorageFull.clear() + rc.value.fetchStoragePart.clear() + rc.value.fetchAccounts.checkNodes.setLen(0) + rc.value.fetchAccounts.missingNodes.setLen(0) + + # Clean up storage slots queue first it it becomes too large + let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len + if snapNewBuddyStoragesSlotsQuPrioThresh < nStoQu: + runAsync buddy.rangeFetchStorageSlots() if env.accountsState != HealerDone: runAsync buddy.rangeFetchAccounts() @@ -412,10 +424,6 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} = runAsync buddy.healStorageSlots() - # Debugging log: analyse pivot against database - discard buddy.checkAccountsListOk(env) - discard buddy.checkStorageSlotsTrieIsComplete(env) - # Check whether there are more accounts to fetch. # # Note that some other process might have temporarily borrowed from the @@ -423,11 +431,20 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} = # if only a single buddy is active. S be it. if env.fetchAccounts.unprocessed.isEmpty(): + # Debugging log: analyse pivot against database + warn "Analysing accounts database -- might be slow", peer, pivot + discard buddy.checkAccountsListOk(env) + # Check whether pivot download is complete. - if env.fetchStorage.len == 0: + if env.fetchStorageFull.len == 0 and + env.fetchStoragePart.len == 0: trace "Running pool mode for verifying completeness", peer, pivot buddy.ctx.poolMode = true + # Debugging log: analyse pivot against database + warn "Analysing storage slots database -- might be slow", peer, pivot + discard buddy.checkStorageSlotsTrieIsComplete(env) + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/com/com_error.nim b/nimbus/sync/snap/worker/com/com_error.nim index b338561e1..0db41cf76 100644 --- a/nimbus/sync/snap/worker/com/com_error.nim +++ b/nimbus/sync/snap/worker/com/com_error.nim @@ -28,11 +28,9 @@ type ComEmptyAccountsArguments ComEmptyPartialRange ComEmptyRequestArguments - ComMissingProof ComNetworkProblem ComNoAccountsForStateRoot ComNoByteCodesAvailable - ComNoDataForProof #ComNoHeaderAvailable -- unused, see get_block_header.nim ComNoStorageForAccounts ComNoTrieNodesAvailable @@ -91,8 +89,7 @@ proc stopAfterSeriousComError*( # Otherwise try again some time later. await sleepAsync(comErrorsNoDataSleepMSecs.milliseconds) - of ComMissingProof, - ComAccountsMinTooSmall, + of ComAccountsMinTooSmall, ComAccountsMaxTooLarge, ComTooManyByteCodes, ComTooManyHeaders, @@ -105,7 +102,6 @@ proc stopAfterSeriousComError*( of ComEmptyAccountsArguments, ComEmptyRequestArguments, ComEmptyPartialRange, - ComNoDataForProof, ComNothingSerious: discard diff --git a/nimbus/sync/snap/worker/com/get_account_range.nim b/nimbus/sync/snap/worker/com/get_account_range.nim index c553610b3..581a7cf66 100644 --- a/nimbus/sync/snap/worker/com/get_account_range.nim +++ b/nimbus/sync/snap/worker/com/get_account_range.nim @@ -28,7 +28,6 @@ logScope: type GetAccountRange* = object - consumed*: NodeTagRange ## Real accounts interval covered data*: PackedAccountRange ## Re-packed reply data withStorage*: seq[AccountSlotsHeader] ## Accounts with non-idle storage root @@ -79,7 +78,6 @@ proc getAccountRange*( return err(ComResponseTimeout) let snAccRange = rc.value.get GetAccountRange( - consumed: iv, data: PackedAccountRange( proof: snAccRange.proof, accounts: snAccRange.accounts @@ -114,31 +112,15 @@ proc getAccountRange*( nAccounts, nProof, accRange="n/a", reqRange=iv return err(ComNoAccountsForStateRoot) - # So there is no data, otherwise an account beyond the interval end - # `iv.maxPt` would have been returned. - dd.consumed = NodeTagRange.new(iv.minPt, high(NodeTag)) - trace trSnapRecvReceived & "terminal AccountRange", peer, pivot, - nAccounts, nProof, accRange=dd.consumed, reqRange=iv + # So there is no data and a proof. + trace trSnapRecvReceived & "terminal AccountRange", peer, pivot, nAccounts, + nProof, accRange=NodeTagRange.new(iv.minPt, high(NodeTag)), reqRange=iv return ok(dd) let (accMinPt, accMaxPt) = ( dd.data.accounts[0].accKey.to(NodeTag), dd.data.accounts[^1].accKey.to(NodeTag)) - if nProof == 0: - # github.com/ethereum/devp2p/blob/master/caps/snap.md#accountrange-0x01 - # Notes: - # * If the account range is the entire state (requested origin was 0x00..0 - # and all accounts fit into the response), no proofs should be sent along - # the response. This is unlikely for accounts, but since it's a common - # situation for storage slots, this clause keeps the behavior the same - # across both. - if 0.to(NodeTag) < iv.minPt: - trace trSnapRecvProtocolViolation & "proof-less AccountRange", peer, - pivot, nAccounts, nProof, accRange=NodeTagRange.new(iv.minPt, accMaxPt), - reqRange=iv - return err(ComMissingProof) - if accMinPt < iv.minPt: # Not allowed trace trSnapRecvProtocolViolation & "min too small in AccountRange", peer, @@ -157,15 +139,14 @@ proc getAccountRange*( # Geth always seems to allow the last account to be larger than the # limit (seen with Geth/v1.10.18-unstable-4b309c70-20220517.) if iv.maxPt < dd.data.accounts[^2].accKey.to(NodeTag): - # The segcond largest should not excceed the top one requested. + # The second largest should not excceed the top one requested. trace trSnapRecvProtocolViolation & "AccountRange top exceeded", peer, pivot, nAccounts, nProof, accRange=NodeTagRange.new(iv.minPt, accMaxPt), reqRange=iv return err(ComAccountsMaxTooLarge) - dd.consumed = NodeTagRange.new(iv.minPt, max(iv.maxPt,accMaxPt)) - trace trSnapRecvReceived & "AccountRange", peer, pivot, - nAccounts, nProof, accRange=dd.consumed, reqRange=iv + trace trSnapRecvReceived & "AccountRange", peer, pivot, nAccounts, nProof, + accRange=NodeTagRange.new(accMinPt, accMaxPt), reqRange=iv return ok(dd) diff --git a/nimbus/sync/snap/worker/com/get_storage_ranges.nim b/nimbus/sync/snap/worker/com/get_storage_ranges.nim index 825254eca..1302a9d31 100644 --- a/nimbus/sync/snap/worker/com/get_storage_ranges.nim +++ b/nimbus/sync/snap/worker/com/get_storage_ranges.nim @@ -104,19 +104,21 @@ proc getStorageRanges*( trace trSnapSendSending & "GetStorageRanges", peer, pivot, nAccounts, bytesLimit=snapRequestBytesLimit - let snStoRanges = block: - let rc = await buddy.getStorageRangesReq(stateRoot, - accounts.mapIt(it.accKey.to(Hash256)), accounts[0].subRange, pivot) - if rc.isErr: - return err(ComNetworkProblem) - if rc.value.isNone: - trace trSnapRecvTimeoutWaiting & "for StorageRanges", peer, pivot, - nAccounts - return err(ComResponseTimeout) - if nAccounts < rc.value.get.slotLists.len: - # Ooops, makes no sense - return err(ComTooManyStorageSlots) - rc.value.get + let + iv = accounts[0].subRange + snStoRanges = block: + let rc = await buddy.getStorageRangesReq(stateRoot, + accounts.mapIt(it.accKey.to(Hash256)), iv, pivot) + if rc.isErr: + return err(ComNetworkProblem) + if rc.value.isNone: + trace trSnapRecvTimeoutWaiting & "for StorageRanges", peer, pivot, + nAccounts + return err(ComResponseTimeout) + if nAccounts < rc.value.get.slotLists.len: + # Ooops, makes no sense + return err(ComTooManyStorageSlots) + rc.value.get let nSlotLists = snStoRanges.slotLists.len @@ -138,6 +140,10 @@ proc getStorageRanges*( # Assemble return structure for given peer response var dd = GetStorageRanges(data: AccountStorageRange(proof: snStoRanges.proof)) + # Set the left proof boundary (if any) + if 0 < nProof and iv.isSome: + dd.data.base = iv.unsafeGet.minPt + # Filter remaining `slots` responses: # * Accounts for empty ones go back to the `leftOver` list. for n in 0 ..< nSlotLists: @@ -154,23 +160,23 @@ proc getStorageRanges*( # assigning empty slice is ok dd.leftOver = dd.leftOver & accounts[nSlotLists ..< nAccounts] - # Ok, we have a proof now. What was it to be proved? - elif snStoRanges.slotLists[^1].len == 0: - return err(ComNoDataForProof) # Now way to prove an empty node set - else: - # If the storage data for the last account comes with a proof, then the data - # set is incomplete. So record the missing part on the `dd.leftOver` list. - let - reqTop = if accounts[0].subRange.isNone: high(NodeTag) - else: accounts[0].subRange.unsafeGet.maxPt - respTop = dd.data.storages[^1].data[^1].slotHash.to(NodeTag) - if respTop < reqTop: - dd.leftOver.add AccountSlotsHeader( - subRange: some(NodeTagRange.new(respTop + 1.u256, reqTop)), - accKey: accounts[nSlotLists-1].accKey, - storageRoot: accounts[nSlotLists-1].storageRoot) - # assigning empty slice isa ok + # Ok, we have a proof now + if 0 < snStoRanges.slotLists[^1].len: + # If the storage data for the last account comes with a proof, then the + # data set is incomplete. So record the missing part on the `dd.leftOver` + # list. + let + reqTop = if accounts[0].subRange.isNone: high(NodeTag) + else: accounts[0].subRange.unsafeGet.maxPt + respTop = dd.data.storages[^1].data[^1].slotHash.to(NodeTag) + if respTop < reqTop: + dd.leftOver.add AccountSlotsHeader( + subRange: some(NodeTagRange.new(respTop + 1.u256, reqTop)), + accKey: accounts[nSlotLists-1].accKey, + storageRoot: accounts[nSlotLists-1].storageRoot) + + # Do thew rest (assigning empty slice is ok) dd.leftOver = dd.leftOver & accounts[nSlotLists ..< nAccounts] trace trSnapRecvReceived & "StorageRanges", peer, pivot, nAccounts, diff --git a/nimbus/sync/snap/worker/db/hexary_desc.nim b/nimbus/sync/snap/worker/db/hexary_desc.nim index 7b4be440c..ce60e403e 100644 --- a/nimbus/sync/snap/worker/db/hexary_desc.nim +++ b/nimbus/sync/snap/worker/db/hexary_desc.nim @@ -155,6 +155,7 @@ type ## Return code for single node operations slot*: Option[int] ## May refer to indexed argument slots kind*: Option[NodeKind] ## Node type (if any) + dangling*: seq[NodeSpecs] ## Missing inner sub-tries error*: HexaryDbError ## Error code, or `NothingSerious` const diff --git a/nimbus/sync/snap/worker/db/hexary_error.nim b/nimbus/sync/snap/worker/db/hexary_error.nim index 790756c9e..034048e59 100644 --- a/nimbus/sync/snap/worker/db/hexary_error.nim +++ b/nimbus/sync/snap/worker/db/hexary_error.nim @@ -13,9 +13,10 @@ type NothingSerious = 0 AccountNotFound - AccountSmallerThanBase AccountsNotSrictlyIncreasing AccountRangesOverlap + LowerBoundAfterFirstEntry + LowerBoundProofError NodeNotFound RlpEncoding SlotsNotSrictlyIncreasing diff --git a/nimbus/sync/snap/worker/db/hexary_paths.nim b/nimbus/sync/snap/worker/db/hexary_paths.nim index 19cdedd5f..c11abeba7 100644 --- a/nimbus/sync/snap/worker/db/hexary_paths.nim +++ b/nimbus/sync/snap/worker/db/hexary_paths.nim @@ -29,7 +29,7 @@ proc pp(w: Blob; db: HexaryTreeDbRef): string = # Private helpers # ------------------------------------------------------------------------------ -proc getNibblesImpl(path: XPath; start = 0): NibblesSeq = +proc getNibblesImpl(path: XPath|RPath; start = 0): NibblesSeq = ## Re-build the key path for n in start ..< path.path.len: let it = path.path[n] @@ -63,16 +63,26 @@ proc toExtensionNode( {.gcsafe, raises: [Defect,RlpError]} = XNodeObj(kind: Extension, ePfx: pSegm, eLink: rlp.listElem(1).toBytes) -# not now ... -when false: - proc `[]`(path: XPath; n: int): XPathStep = - path.path[n] - proc `[]`(path: XPath; s: Slice[int]): XPath = - XPath(path: path.path[s.a .. s.b], tail: path.getNibbles(s.b+1)) +proc `<=`(a, b: NibblesSeq): bool = + ## Compare nibbles, different lengths are padded to the right with zeros + let abMin = min(a.len, b.len) + for n in 0 ..< abMin: + if a[n] < b[n]: + return true + if b[n] < a[n]: + return false + # otherwise a[n] == b[n] - proc len(path: XPath): int = - path.path.len + # Assuming zero for missing entries + if b.len < a.len: + for n in abMin + 1 ..< a.len: + if 0 < a[n]: + return false + true + +proc `<`(a, b: NibblesSeq): bool = + not (b <= a) # ------------------------------------------------------------------------------ # Private functions @@ -173,6 +183,48 @@ proc pathExtend( # notreached +proc completeLeast( + path: RPath; + key: RepairKey; + db: HexaryTreeDbRef; + pathLenMax = 64; + ): RPath + {.gcsafe, raises: [Defect,KeyError].} = + ## Extend path using least nodes without recursion. + result.path = path.path + if db.tab.hasKey(key): + var + key = key + node = db.tab[key] + + while result.path.len < pathLenMax: + case node.kind: + of Leaf: + result.path.add RPathStep(key: key, node: node, nibble: -1) + return # done + + of Extension: + block useExtensionLink: + let newKey = node.eLink + if not newkey.isZero and db.tab.hasKey(newKey): + result.path.add RPathStep(key: key, node: node, nibble: -1) + key = newKey + node = db.tab[key] + break useExtensionLink + return # Oops, no way + + of Branch: + block findBranchLink: + for inx in 0 .. 15: + let newKey = node.bLink[inx] + if not newkey.isZero and db.tab.hasKey(newKey): + result.path.add RPathStep(key: key, node: node, nibble: inx.int8) + key = newKey + node = db.tab[key] + break findBranchLink + return # Oops, no way + + proc pathLeast( path: XPath; key: Blob; @@ -357,7 +409,7 @@ proc pathMost( # Public helpers # ------------------------------------------------------------------------------ -proc getNibbles*(path: XPath; start = 0): NibblesSeq = +proc getNibbles*(path: XPath|RPath; start = 0): NibblesSeq = ## Re-build the key path path.getNibblesImpl(start) @@ -434,6 +486,117 @@ proc hexaryPath*( ## Variant of `hexaryPath`. XPath(tail: partialPath).pathExtend(root.to(Blob), getFn) + +proc right*( + path: RPath; + db: HexaryTreeDbRef; + ): RPath + {.gcsafe, raises: [Defect,KeyError]} = + ## Extends the maximally extended argument nodes `path` to the right (with + ## path value not decreasing). This is similar to `next()`, only that the + ## algorithm does not backtrack if there are dangling links in between. + ## + ## This code is intended be used for verifying a left-bound proof. + + # Some easy cases + if path.path.len == 0: + return RPath() # error + if path.path[^1].node.kind == Leaf: + return path + + var rPath = path + while 0 < rPath.path.len: + let top = rPath.path[^1] + if top.node.kind != Branch or + top.nibble < 0 or + rPath.tail.len == 0: + return RPath() # error + + let topLink = top.node.bLink[top.nibble] + if topLink.isZero or not db.tab.hasKey(topLink): + return RPath() # error + + let nextNibble = rPath.tail[0].int8 + if nextNibble < 15: + let + nextNode = db.tab[topLink] + rPathLen = rPath.path.len # in case of backtracking + case nextNode.kind + of Leaf: + if rPath.tail <= nextNode.lPfx: + return rPath.completeLeast(topLink, db) + of Extension: + if rPath.tail <= nextNode.ePfx: + return rPath.completeLeast(topLink, db) + of Branch: + # Step down and complete with a branch link on the child node + rPath.path = rPath.path & RPathStep( + key: topLink, + node: nextNode, + nibble: nextNibble) + + # Find the next item to the right of the new top entry + let step = rPath.path[^1] + for inx in (step.nibble + 1) .. 15: + let link = step.node.bLink[inx] + if not link.isZero: + rPath.path[^1].nibble = inx.int8 + return rPath.completeLeast(link, db) + + # Restore `rPath` and backtrack + rPath.path.setLen(rPathLen) + + # Pop `Branch` node on top and append nibble to `tail` + rPath.tail = @[top.nibble.byte].initNibbleRange.slice(1) & rPath.tail + rPath.path.setLen(rPath.path.len - 1) + + # Pathological case: nfffff.. for n < f + var step = path.path[0] + for inx in (step.nibble + 1) .. 15: + let link = step.node.bLink[inx] + if not link.isZero: + step.nibble = inx.int8 + rPath.path = @[step] + return rPath.completeLeast(link, db) + + RPath() # error + + +proc rightStop*( + path: RPath; + db: HexaryTreeDbRef; + ): bool + {.gcsafe, raises: [Defect,KeyError]} = + ## Returns `true` if the maximally extended argument nodes `path` is the + ## rightmost on the hexary trie database. It verifies that there is no more + ## leaf entry to the right of the argument `path`. + ## + ## This code is intended be used for verifying a left-bound proof. + if 0 < path.path.len and 0 < path.tail.len: + let top = path.path[^1] + if top.node.kind == Branch and 0 <= top.nibble: + + let topLink = top.node.bLink[top.nibble] + if not topLink.isZero and db.tab.hasKey(topLink): + let + nextNibble = path.tail[0] + nextNode = db.tab[topLink] + + case nextNode.kind + of Leaf: + return nextNode.lPfx < path.tail + + of Extension: + return nextNode.ePfx < path.tail + + of Branch: + # Step down and verify that there is no branch link + for inx in nextNibble .. 15: + if not nextNode.bLink[inx].isZero: + return false + return true + + proc next*( path: XPath; getFn: HexaryGetFn; diff --git a/nimbus/sync/snap/worker/db/snapdb_accounts.nim b/nimbus/sync/snap/worker/db/snapdb_accounts.nim index f71823e85..12348e84d 100644 --- a/nimbus/sync/snap/worker/db/snapdb_accounts.nim +++ b/nimbus/sync/snap/worker/db/snapdb_accounts.nim @@ -9,7 +9,7 @@ # except according to those terms. import - std/[algorithm, sequtils, strutils, tables], + std/[algorithm, sequtils, tables], chronicles, eth/[common, p2p, rlp, trie/nibbles], stew/byteutils, @@ -97,23 +97,12 @@ proc collectAccounts( ## can be used for validating the argument account data. var rcAcc: seq[RLeafSpecs] - if acc.len != 0: + if 0 < acc.len: let pathTag0 = acc[0].accKey.to(NodeTag) # Verify lower bound if pathTag0 < base: - let error = HexaryDbError.AccountSmallerThanBase - trace "collectAccounts()", peer, base, accounts=acc.len, error - return err(error) - - # Add base for the records (no payload). Note that the assumption - # holds: `rcAcc[^1].tag <= base` - if base < pathTag0: - rcAcc.add RLeafSpecs(pathTag: base) - - # Check for the case that accounts are appended - elif 0 < rcAcc.len and pathTag0 <= rcAcc[^1].pathTag: - let error = HexaryDbError.AccountsNotSrictlyIncreasing + let error = LowerBoundAfterFirstEntry trace "collectAccounts()", peer, base, accounts=acc.len, error return err(error) @@ -179,17 +168,56 @@ proc importAccounts*( base: NodeTag; ## before or at first account entry in `data` data: PackedAccountRange; ## re-packed `snap/1 ` reply data persistent = false; ## store data on disk - ): Result[void,HexaryDbError] = + noBaseBoundCheck = false; ## Ignore left boundary proof check if `true` + ): Result[seq[NodeSpecs],HexaryDbError] = ## Validate and import accounts (using proofs as received with the snap ## message `AccountRange`). This function accumulates data in a memory table ## which can be written to disk with the argument `persistent` set `true`. The ## memory table is held in the descriptor argument`ps`. ## + ## On success, the function returns a list of dangling node links from the + ## argument `proof` list of nodes after the populating with accounts. The + ## following example may illustrate the case: + ## + ## Assume an accounts hexary trie + ## :: + ## | 0 1 2 3 4 5 6 7 8 9 a b c d e f -- nibble positions + ## | root -> (a, .. b, .. c, .. d, .. ,) -- root branch node + ## | | | | | + ## | ... v v v + ## | (x,X) (y,Y) (z,Z) + ## + ## with `a`,`b`,`c`,`d` node hashes, `x`,`y`,`z` partial paths and account + ## hashes `3&x`,`7&y`,`b&z` for account values `X`,`Y`,`Z`. All other + ## links in the *root branch node* are assumed nil. + ## + ## The passing to this function + ## * base: `3&x` + ## * data.proof: *root branch node* + ## * data.accounts: `(3&x,X)`, `(7&y,Y)`, `(b&z,Z)` + ## a partial tree can be fully constructed and boundary proofs succeed. + ## The return value will be an empty list. + ## + ## Leaving out `(7&y,Y)` the boundary proofs still succeed but the + ## return value will be @[`(7&y,c)`]. + ## + ## The left boundary proof might be omitted by passing `true` for the + ## `noBaseBoundCheck` argument. In this case, the boundary check must be + ## performed on the return code as + ## * if `data.accounts` is empty, the return value must be an empty list + ## * otherwise, all type `NodeSpecs` items `w` of the return code must + ## satisfy + ## :: + ## let leastAccountPath = data.accounts[0].accKey.to(NodeTag) + ## leastAccountPath <= w.partialPath.max(NodeKey).to(NodeTag) + ## ## Note that the `peer` argument is for log messages, only. - var accounts: seq[RLeafSpecs] + var + accounts: seq[RLeafSpecs] + dangling: seq[NodeSpecs] try: if 0 < data.proof.len: - let rc = ps.mergeProofs(ps.peer, ps.root, data.proof) + let rc = ps.mergeProofs(ps.peer, data.proof) if rc.isErr: return err(rc.error) block: @@ -197,16 +225,50 @@ proc importAccounts*( if rc.isErr: return err(rc.error) accounts = rc.value - block: + + if 0 < accounts.len: + var innerSubTrie: seq[NodeSpecs] + if 0 < data.proof.len: + # Inspect trie for dangling nodes. This is not a big deal here as the + # proof data is typically small. + let + proofStats = ps.hexaDb.hexaryInspectTrie(ps.root, @[]) + topTag = accounts[^1].pathTag + for w in proofStats.dangling: + if base <= w.partialPath.max(NodeKey).to(NodeTag) and + w.partialPath.min(NodeKey).to(NodeTag) <= topTag: + # Extract dangling links which are inside the accounts range + innerSubTrie.add w + + # Build partial hexary trie let rc = ps.hexaDb.hexaryInterpolate( ps.root, accounts, bootstrap = (data.proof.len == 0)) if rc.isErr: return err(rc.error) - if persistent and 0 < ps.hexaDb.tab.len: - let rc = ps.hexaDb.persistentAccounts(ps) - if rc.isErr: - return err(rc.error) + # Collect missing inner sub-trees in the reconstructed partial hexary + # trie (if any). + let bottomTag = accounts[0].pathTag + for w in innerSubTrie: + if ps.hexaDb.tab.hasKey(w.nodeKey.to(RepairKey)): + continue + # Verify that `base` is to the left of the first account and there is + # nothing in between. Without proof, there can only be a complete + # set/list of accounts. There must be a proof for an empty list. + if not noBaseBoundCheck and + w.partialPath.max(NodeKey).to(NodeTag) < bottomTag: + return err(LowerBoundProofError) + # Otherwise register left over entry + dangling.add w + + if persistent: + let rc = ps.hexaDb.persistentAccounts(ps) + if rc.isErr: + return err(rc.error) + + elif data.proof.len == 0: + # There must be a proof for an empty argument list. + return err(LowerBoundProofError) except RlpError: return err(RlpEncoding) @@ -217,21 +279,24 @@ proc importAccounts*( return err(OSErrorException) #when extraTraceMessages: - # trace "Accounts imported", peer=ps.peer, - # root=ps.root.ByteArray32.toHex, - # proof=data.proof.len, base, accounts=data.accounts.len - ok() + # trace "Accounts imported", peer=ps.peer, root=ps.root.ByteArray32.toHex, + # proof=data.proof.len, base, accounts=data.accounts.len, + # top=accounts[^1].pathTag, danglingLen=dangling.len + + ok(dangling) proc importAccounts*( pv: SnapDbRef; ## Base descriptor on `BaseChainDB` - peer: Peer, ## for log messages - root: Hash256; ## state root - base: NodeTag; ## before or at first account entry in `data` - data: PackedAccountRange; ## re-packed `snap/1 ` reply data - ): Result[void,HexaryDbError] = + peer: Peer; ## For log messages + root: Hash256; ## State root + base: NodeTag; ## Before or at first account entry in `data` + data: PackedAccountRange; ## Re-packed `snap/1 ` reply data + noBaseBoundCheck = false; ## Ignore left bound proof check if `true` + ): Result[seq[NodeSpecs],HexaryDbError] = ## Variant of `importAccounts()` SnapDbAccountsRef.init( - pv, root, peer).importAccounts(base, data, persistent=true) + pv, root, peer).importAccounts( + base, data, persistent=true, noBaseBoundCheck) proc importRawAccountsNodes*( diff --git a/nimbus/sync/snap/worker/db/snapdb_check.nim b/nimbus/sync/snap/worker/db/snapdb_check.nim index 656e9b2b3..c8ab4dc8d 100644 --- a/nimbus/sync/snap/worker/db/snapdb_check.nim +++ b/nimbus/sync/snap/worker/db/snapdb_check.nim @@ -61,7 +61,10 @@ proc storageSlotsCtx( ): string = let ctx = buddy.ctx - rc = env.fetchStorage.eq(storageRoot) + rc = block: + let rcx = env.fetchStorageFull.eq(storageRoot) + if rcx.isOk: rcx + else: env.fetchStoragePart.eq(storageRoot) if rc.isErr: return "n/a" let @@ -221,8 +224,8 @@ proc checkStorageSlotsTrieIsComplete*( return rc.value when extraTraceMessages: - debug logTxt "atorage slots health check failed", peer, - nStoQueue=env.fetchStorage.len, + let nStoQueue = env.fetchStorageFull.len + env.fetchStoragePart.len + debug logTxt "atorage slots health check failed", peer, nStoQueue, ctx=buddy.storageSlotsCtx(storageRoot, env), error=rc.error proc checkStorageSlotsTrieIsComplete*( @@ -240,8 +243,9 @@ proc checkStorageSlotsTrieIsComplete*( for (accKey,accData,error) in buddy.accountsWalk(env): if error != NothingSerious: - error logTxt "atorage slots accounts loop stopped", peer, - nStoQueue=env.fetchStorage.len, accounts, incomplete, complete, error + let nStoQueue = env.fetchStorageFull.len + env.fetchStoragePart.len + error logTxt "atorage slots accounts loop stopped", peer, nStoQueue, + accounts, incomplete, complete, error return false accounts.inc @@ -256,8 +260,9 @@ proc checkStorageSlotsTrieIsComplete*( incomplete.inc when extraTraceMessages: + let nStoQueue = env.fetchStorageFull.len + env.fetchStoragePart.len debug logTxt "storage slots report", peer, ctx=buddy.accountsCtx(env), - nStoQueue=env.fetchStorage.len, accounts, incomplete, complete + nStoQueue, accounts, incomplete, complete 0 < accounts and incomplete == 0 diff --git a/nimbus/sync/snap/worker/db/snapdb_desc.nim b/nimbus/sync/snap/worker/db/snapdb_desc.nim index 5739aa9e1..86a94ba11 100644 --- a/nimbus/sync/snap/worker/db/snapdb_desc.nim +++ b/nimbus/sync/snap/worker/db/snapdb_desc.nim @@ -11,7 +11,7 @@ import std/[sequtils, tables], chronicles, - eth/[common, p2p, trie/db], + eth/[common, p2p, trie/db, trie/nibbles], ../../../../db/[select_backend, storage_types], ../../range_desc, "."/[hexary_desc, hexary_error, hexary_import, hexary_paths, rocky_bulk_load] @@ -22,6 +22,8 @@ logScope: topics = "snap-db" const + extraTraceMessages = false or true + RockyBulkCache* = "accounts.sst" ## Name of temporary file to accomodate SST records for `rocksdb` @@ -34,7 +36,7 @@ type SnapDbBaseRef* = ref object of RootRef ## Session descriptor - xDb: HexaryTreeDbRef ## Hexary database + xDb: HexaryTreeDbRef ## Hexary database, memory based base: SnapDbRef ## Back reference to common parameters root*: NodeKey ## Session DB root node key @@ -70,6 +72,9 @@ proc toKey(a: NodeKey; ps: SnapDbBaseRef): uint = proc toKey(a: NodeTag; ps: SnapDbBaseRef): uint = a.to(NodeKey).toKey(ps) +proc ppImpl(a: RepairKey; pv: SnapDbRef): string = + if a.isZero: "ΓΈ" else:"$" & $a.toKey(pv) + # ------------------------------------------------------------------------------ # Debugging, pretty printing # ------------------------------------------------------------------------------ @@ -122,7 +127,7 @@ proc init*( ): T = ## Constructor for inner hexary trie database let xDb = HexaryTreeDbRef() - xDb.keyPp = proc(key: RepairKey): string = key.pp(xDb) # will go away + xDb.keyPp = proc(key: RepairKey): string = key.ppImpl(pv) # will go away return xDb proc init*( @@ -138,7 +143,7 @@ proc init*( ps: SnapDbBaseRef; pv: SnapDbRef; root: NodeKey; - peer: Peer = nil) = + ) = ## Session base constructor ps.base = pv ps.root = root @@ -148,7 +153,7 @@ proc init*( T: type SnapDbBaseRef; ps: SnapDbBaseRef; root: NodeKey; - peer: Peer = nil): T = + ): T = ## Variant of session base constructor new result result.init(ps.base, root) @@ -177,11 +182,11 @@ proc kvDb*(pv: SnapDbRef): TrieDatabaseRef = # Public functions, select sub-tables for persistent storage # ------------------------------------------------------------------------------ -proc toAccountsKey*(a: NodeKey): ByteArray32 = - a.ByteArray32 +proc toAccountsKey*(a: NodeKey): ByteArray33 = + a.ByteArray32.snapSyncAccountKey.data proc toStorageSlotsKey*(a: NodeKey): ByteArray33 = - a.ByteArray32.slotHashToSlotKey.data + a.ByteArray32.snapSyncStorageSlotKey.data template toOpenArray*(k: ByteArray32): openArray[byte] = k.toOpenArray(0, 31) @@ -204,7 +209,6 @@ proc dbBackendRocksDb*(ps: SnapDbBaseRef): bool = proc mergeProofs*( ps: SnapDbBaseRef; ## Session database peer: Peer; ## For log messages - root: NodeKey; ## Root for checking nodes proof: seq[Blob]; ## Node records freeStandingOk = false; ## Remove freestanding nodes ): Result[void,HexaryDbError] @@ -216,7 +220,7 @@ proc mergeProofs*( db = ps.hexaDb var nodes: HashSet[RepairKey] - refs = @[root.to(RepairKey)].toHashSet + refs = @[ps.root.to(RepairKey)].toHashSet for n,rlpRec in proof: let report = db.hexaryImport(rlpRec, nodes, refs) @@ -240,6 +244,52 @@ proc mergeProofs*( ok() + +proc verifyLowerBound*( + ps: SnapDbBaseRef; ## Database session descriptor + peer: Peer; ## For log messages + base: NodeTag; ## Before or at first account entry in `data` + first: NodeTag; ## First account key + ): Result[void,HexaryDbError] + {.gcsafe, raises: [Defect, KeyError].} = + ## Verify that `base` is to the left of the first leaf entry and there is + ## nothing in between. + proc convertTo(data: openArray[byte]; T: type Hash256): T = + discard result.data.NodeKey.init(data) # size error => zero + + let + root = ps.root.to(RepairKey) + base = base.to(NodeKey) + next = base.hexaryPath(root, ps.hexaDb).right(ps.hexaDb).getNibbles + if next.len == 64: + if first == next.getBytes.convertTo(Hash256).to(NodeTag): + return ok() + + let error = LowerBoundProofError + when extraTraceMessages: + trace "verifyLowerBound()", peer, base=base.pp, + first=first.to(NodeKey).pp, error + err(error) + +proc verifyNoMoreRight*( + ps: SnapDbBaseRef; ## Database session descriptor + peer: Peer; ## For log messages + base: NodeTag; ## Before or at first account entry in `data` + ): Result[void,HexaryDbError] + {.gcsafe, raises: [Defect, KeyError].} = + ## Verify that there is are no more leaf entries to the right of and + ## including `base`. + let + root = ps.root.to(RepairKey) + base = base.to(NodeKey) + if base.hexaryPath(root, ps.hexaDb).rightStop(ps.hexaDb): + return ok() + + let error = LowerBoundProofError + when extraTraceMessages: + trace "verifyLeftmostBound()", peer, base=base.pp, error + err(error) + # ------------------------------------------------------------------------------ # Debugging (and playing with the hexary database) # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/db/snapdb_persistent.nim b/nimbus/sync/snap/worker/db/snapdb_persistent.nim index e8e780765..ec710c619 100644 --- a/nimbus/sync/snap/worker/db/snapdb_persistent.nim +++ b/nimbus/sync/snap/worker/db/snapdb_persistent.nim @@ -40,7 +40,7 @@ proc convertTo(key: RepairKey; T: type NodeTag): T = ## Might be lossy, check before use UInt256.fromBytesBE(key.ByteArray33[1 .. 32]).T -proc toAccountsKey(a: RepairKey): ByteArray32 = +proc toAccountsKey(a: RepairKey): ByteArray33 = a.convertTo(NodeKey).toAccountsKey proc toStorageSlotsKey(a: RepairKey): ByteArray33 = diff --git a/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim b/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim index 44ceb36f6..31a2e2387 100644 --- a/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim +++ b/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim @@ -94,7 +94,8 @@ proc persistentStorageSlots( proc collectStorageSlots( - peer: Peer; + peer: Peer; ## for log messages + base: NodeTag; ## before or at first account entry in `data` slotLists: seq[SnapStorage]; ): Result[seq[RLeafSpecs],HexaryDbError] {.gcsafe, raises: [Defect, RlpError].} = @@ -102,6 +103,15 @@ proc collectStorageSlots( var rcSlots: seq[RLeafSpecs] if slotLists.len != 0: + let pathTag0 = slotLists[0].slotHash.to(NodeTag) + + # Verify lower bound + if pathTag0 < base: + let error = LowerBoundAfterFirstEntry + trace "collectStorageSlots()", peer, base, item=0, + nSlotLists=slotLists.len, error + return err(error) + # Add initial account rcSlots.add RLeafSpecs( pathTag: slotLists[0].slotHash.to(NodeTag), @@ -126,38 +136,75 @@ proc collectStorageSlots( proc importStorageSlots( ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor + base: NodeTag; ## before or at first account entry in `data` data: AccountSlots; ## Account storage descriptor - proof: SnapStorageProof; ## Account storage proof - ): Result[void,HexaryDbError] + proof: SnapStorageProof; ## Storage slots proof data + noBaseBoundCheck = false; ## Ignore left boundary proof check if `true` + ): Result[seq[NodeSpecs],HexaryDbError] {.gcsafe, raises: [Defect,RlpError,KeyError].} = - ## Preocess storage slots for a particular storage root + ## Process storage slots for a particular storage root. See `importAccounts()` + ## for comments on the return value. let - root = data.account.storageRoot.to(NodeKey) - tmpDb = SnapDbBaseRef.init(ps, ps.root, ps.peer) + tmpDb = SnapDbBaseRef.init(ps, data.account.storageRoot.to(NodeKey)) var slots: seq[RLeafSpecs] + dangling: seq[NodeSpecs] if 0 < proof.len: - let rc = tmpDb.mergeProofs(ps.peer, root, proof) + let rc = tmpDb.mergeProofs(ps.peer, proof) if rc.isErr: return err(rc.error) block: - let rc = ps.peer.collectStorageSlots(data.data) + let rc = ps.peer.collectStorageSlots(base, data.data) if rc.isErr: return err(rc.error) slots = rc.value - block: + + if 0 < slots.len: + var innerSubTrie: seq[NodeSpecs] + if 0 < proof.len: + # Inspect trie for dangling nodes. This is not a big deal here as the + # proof data is typically small. + let + proofStats = ps.hexaDb.hexaryInspectTrie(ps.root, @[]) + topTag = slots[^1].pathTag + for w in proofStats.dangling: + if base <= w.partialPath.max(NodeKey).to(NodeTag) and + w.partialPath.min(NodeKey).to(NodeTag) <= topTag: + # Extract dangling links which are inside the accounts range + innerSubTrie.add w + + # Build partial hexary trie let rc = tmpDb.hexaDb.hexaryInterpolate( - root, slots, bootstrap = (proof.len == 0)) + tmpDb.root, slots, bootstrap = (proof.len == 0)) if rc.isErr: return err(rc.error) - # Commit to main descriptor - for k,v in tmpDb.hexaDb.tab.pairs: - if not k.isNodeKey: - return err(UnresolvedRepairNode) - ps.hexaDb.tab[k] = v + # Collect missing inner sub-trees in the reconstructed partial hexary + # trie (if any). + let bottomTag = slots[0].pathTag + for w in innerSubTrie: + if ps.hexaDb.tab.hasKey(w.nodeKey.to(RepairKey)): + continue + # Verify that `base` is to the left of the first slot and there is + # nothing in between. Without proof, there can only be a complete + # set/list of slots. There must be a proof for an empty list. + if not noBaseBoundCheck and + w.partialPath.max(NodeKey).to(NodeTag) < bottomTag: + return err(LowerBoundProofError) + # Otherwise register left over entry + dangling.add w - ok() + # Commit to main descriptor + for k,v in tmpDb.hexaDb.tab.pairs: + if not k.isNodeKey: + return err(UnresolvedRepairNode) + ps.hexaDb.tab[k] = v + + elif proof.len == 0: + # There must be a proof for an empty argument list. + return err(LowerBoundProofError) + + ok(dangling) # ------------------------------------------------------------------------------ # Public constructor @@ -187,6 +234,7 @@ proc importStorageSlots*( ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor data: AccountStorageRange; ## Account storage reply from `snap/1` protocol persistent = false; ## store data on disk + noBaseBoundCheck = false; ## Ignore left boundary proof check if `true` ): seq[HexaryNodeReport] = ## Validate and import storage slots (using proofs as received with the snap ## message `StorageRanges`). This function accumulates data in a memory table @@ -213,9 +261,10 @@ proc importStorageSlots*( if 0 <= sTop: try: for n in 0 ..< sTop: - # These ones never come with proof data + # These ones always come without proof data => `NodeTag.default` itemInx = some(n) - let rc = ps.importStorageSlots(data.storages[n], @[]) + let rc = ps.importStorageSlots( + NodeTag.default, data.storages[n], @[]) if rc.isErr: result.add HexaryNodeReport(slot: itemInx, error: rc.error) trace "Storage slots item fails", peer, itemInx=n, nItems, @@ -225,12 +274,15 @@ proc importStorageSlots*( # Final one might come with proof data block: itemInx = some(sTop) - let rc = ps.importStorageSlots(data.storages[sTop], data.proof) + let rc = ps.importStorageSlots( + data.base, data.storages[sTop], data.proof, noBaseBoundCheck) if rc.isErr: result.add HexaryNodeReport(slot: itemInx, error: rc.error) trace "Storage slots last item fails", peer, itemInx=sTop, nItems, nSlots=data.storages[sTop].data.len, proofs=data.proof.len, error=rc.error, nErrors=result.len + elif 0 < rc.value.len: + result.add HexaryNodeReport(slot: itemInx, dangling: rc.value) # Store to disk if persistent and 0 < ps.hexaDb.tab.len: @@ -260,11 +312,12 @@ proc importStorageSlots*( pv: SnapDbRef; ## Base descriptor on `BaseChainDB` peer: Peer; ## For log messages, only data: AccountStorageRange; ## Account storage reply from `snap/1` protocol + noBaseBoundCheck = false; ## Ignore left boundary proof check if `true` ): seq[HexaryNodeReport] = ## Variant of `importStorages()` SnapDbStorageSlotsRef.init( pv, Hash256().to(NodeKey), Hash256(), peer).importStorageSlots( - data, persistent = true) + data, persistent = true, noBaseBoundCheck) proc importRawStorageSlotsNodes*( diff --git a/nimbus/sync/snap/worker/heal_accounts.nim b/nimbus/sync/snap/worker/heal_accounts.nim index cd6990684..9bb1e2241 100644 --- a/nimbus/sync/snap/worker/heal_accounts.nim +++ b/nimbus/sync/snap/worker/heal_accounts.nim @@ -52,7 +52,8 @@ ## the argument list is empty ## * `{leaf-nodes}`: list is optimised out ## * `{check-nodes}`: list implemented as `env.fetchAccounts.checkNodes` -## * `{storage-roots}`: list implemented as `env.fetchStorage` +## * `{storage-roots}`: list implemented as pair of queues +## `env.fetchStorageFull` and `env.fetchStoragePart` ## ## Discussion of flow chart ## ------------------------ @@ -132,10 +133,11 @@ const template logTxt(info: static[string]): static[string] = "Accounts healing " & info -proc healingCtx(buddy: SnapBuddyRef): string = - let - ctx = buddy.ctx - env = buddy.data.pivotEnv +proc healingCtx( + buddy: SnapBuddyRef; + env: SnapPivotRef; + ): string = + let ctx = buddy.ctx "{" & "pivot=" & "#" & $env.stateHeader.blockNumber & "," & "nAccounts=" & $env.nAccounts & "," & @@ -148,7 +150,10 @@ proc healingCtx(buddy: SnapBuddyRef): string = # Private functions # ------------------------------------------------------------------------------ -proc updateMissingNodesList(buddy: SnapBuddyRef) = +proc updateMissingNodesList( + buddy: SnapBuddyRef; + env: SnapPivotRef; + ) = ## Check whether previously missing nodes from the `missingNodes` list ## have been magically added to the database since it was checked last ## time. These nodes will me moved to `checkNodes` for further processing. @@ -156,7 +161,6 @@ proc updateMissingNodesList(buddy: SnapBuddyRef) = ctx = buddy.ctx db = ctx.data.snapDb peer = buddy.peer - env = buddy.data.pivotEnv stateRoot = env.stateHeader.stateRoot var delayed: seq[NodeSpecs] @@ -173,7 +177,10 @@ proc updateMissingNodesList(buddy: SnapBuddyRef) = env.fetchAccounts.missingNodes = env.fetchAccounts.missingNodes & delayed -proc appendMoreDanglingNodesToMissingNodesList(buddy: SnapBuddyRef): bool = +proc appendMoreDanglingNodesToMissingNodesList( + buddy: SnapBuddyRef; + env: SnapPivotRef; + ): bool = ## Starting with a given set of potentially dangling account nodes ## `checkNodes`, this set is filtered and processed. The outcome is ## fed back to the vey same list `checkNodes` @@ -181,7 +188,6 @@ proc appendMoreDanglingNodesToMissingNodesList(buddy: SnapBuddyRef): bool = ctx = buddy.ctx db = ctx.data.snapDb peer = buddy.peer - env = buddy.data.pivotEnv stateRoot = env.stateHeader.stateRoot rc = db.inspectAccountsTrie(peer, stateRoot, env.fetchAccounts.checkNodes) @@ -189,7 +195,7 @@ proc appendMoreDanglingNodesToMissingNodesList(buddy: SnapBuddyRef): bool = if rc.isErr: when extraTraceMessages: error logTxt "failed => stop", peer, - ctx=buddy.healingCtx(), error=rc.error + ctx=buddy.healingCtx(env), error=rc.error # Attempt to switch peers, there is not much else we can do here buddy.ctrl.zombie = true return false @@ -204,6 +210,7 @@ proc appendMoreDanglingNodesToMissingNodesList(buddy: SnapBuddyRef): bool = proc getMissingNodesFromNetwork( buddy: SnapBuddyRef; + env: SnapPivotRef; ): Future[seq[NodeSpecs]] {.async.} = ## Extract from `missingNodes` the next batch of nodes that need @@ -211,7 +218,6 @@ proc getMissingNodesFromNetwork( let ctx = buddy.ctx peer = buddy.peer - env = buddy.data.pivotEnv stateRoot = env.stateHeader.stateRoot pivot = "#" & $env.stateHeader.blockNumber # for logging @@ -257,12 +263,12 @@ proc getMissingNodesFromNetwork( discard when extraTraceMessages: trace logTxt "fetch nodes error => stop", peer, - ctx=buddy.healingCtx(), error + ctx=buddy.healingCtx(env), error else: discard when extraTraceMessages: trace logTxt "fetch nodes error", peer, - ctx=buddy.healingCtx(), error + ctx=buddy.healingCtx(env), error return @[] @@ -270,6 +276,7 @@ proc getMissingNodesFromNetwork( proc kvAccountLeaf( buddy: SnapBuddyRef; node: NodeSpecs; + env: SnapPivotRef; ): (bool,NodeKey,Account) {.gcsafe, raises: [Defect,RlpError]} = ## Re-read leaf node from persistent database (if any) @@ -286,18 +293,19 @@ proc kvAccountLeaf( when extraTraceMessages: trace logTxt "non-leaf node path", peer, - ctx=buddy.healingCtx(), nNibbles=nibbles.len + ctx=buddy.healingCtx(env), nNibbles=nibbles.len proc registerAccountLeaf( buddy: SnapBuddyRef; accKey: NodeKey; - acc: Account) = + acc: Account; + env: SnapPivotRef; + ) = ## Process single account node as would be done with an interval by ## the `storeAccounts()` function let peer = buddy.peer - env = buddy.data.pivotEnv pt = accKey.to(NodeTag) # Find range set (from list) containing `pt` @@ -316,7 +324,7 @@ proc registerAccountLeaf( # Update storage slots batch if acc.storageRoot != emptyRlpHash: - env.fetchStorage.merge AccountSlotsHeader( + env.fetchStorageFull.merge AccountSlotsHeader( acckey: accKey, storageRoot: acc.storageRoot) @@ -324,32 +332,35 @@ proc registerAccountLeaf( # Private functions: do the healing for one round # ------------------------------------------------------------------------------ -proc accountsHealingImpl(buddy: SnapBuddyRef): Future[int] {.async.} = +proc accountsHealingImpl( + buddy: SnapBuddyRef; + env: SnapPivotRef; + ): Future[int] + {.async.} = ## Fetching and merging missing account trie database nodes. It returns the ## number of nodes fetched from the network, and -1 upon error. let ctx = buddy.ctx db = ctx.data.snapDb peer = buddy.peer - env = buddy.data.pivotEnv # Update for changes since last visit - buddy.updateMissingNodesList() + buddy.updateMissingNodesList(env) # If `checkNodes` is empty, healing is at the very start or was # postponed in which case `missingNodes` is non-empty. if env.fetchAccounts.checkNodes.len != 0 or env.fetchAccounts.missingNodes.len == 0: - if not buddy.appendMoreDanglingNodesToMissingNodesList(): + if not buddy.appendMoreDanglingNodesToMissingNodesList(env): return 0 # Check whether the trie is complete. if env.fetchAccounts.missingNodes.len == 0: - trace logTxt "complete", peer, ctx=buddy.healingCtx() + trace logTxt "complete", peer, ctx=buddy.healingCtx(env) return 0 # nothing to do # Get next batch of nodes that need to be merged it into the database - let nodeSpecs = await buddy.getMissingNodesFromNetwork() + let nodeSpecs = await buddy.getMissingNodesFromNetwork(env) if nodeSpecs.len == 0: return 0 @@ -358,7 +369,7 @@ proc accountsHealingImpl(buddy: SnapBuddyRef): Future[int] {.async.} = if 0 < report.len and report[^1].slot.isNone: # Storage error, just run the next lap (not much else that can be done) error logTxt "error updating persistent database", peer, - ctx=buddy.healingCtx(), nNodes=nodeSpecs.len, error=report[^1].error + ctx=buddy.healingCtx(env), nNodes=nodeSpecs.len, error=report[^1].error env.fetchAccounts.missingNodes = env.fetchAccounts.missingNodes & nodeSpecs return -1 @@ -380,17 +391,17 @@ proc accountsHealingImpl(buddy: SnapBuddyRef): Future[int] {.async.} = else: # Node has been stored, double check - let (isLeaf, key, acc) = buddy.kvAccountLeaf(nodeSpecs[inx]) + let (isLeaf, key, acc) = buddy.kvAccountLeaf(nodeSpecs[inx], env) if isLeaf: # Update `uprocessed` registry, collect storage roots (if any) - buddy.registerAccountLeaf(key, acc) + buddy.registerAccountLeaf(key, acc, env) nLeafNodes.inc else: env.fetchAccounts.checkNodes.add nodePath when extraTraceMessages: trace logTxt "merged into database", peer, - ctx=buddy.healingCtx(), nNodes=nodeSpecs.len, nLeafNodes + ctx=buddy.healingCtx(env), nNodes=nodeSpecs.len, nLeafNodes return nodeSpecs.len @@ -417,26 +428,26 @@ proc healAccounts*(buddy: SnapBuddyRef) {.async.} = if env.nAccounts == 0 or ctx.data.coveredAccounts.fullFactor < healAccountsTrigger: #when extraTraceMessages: - # trace logTxt "postponed", peer, ctx=buddy.healingCtx() + # trace logTxt "postponed", peer, ctx=buddy.healingCtx(env) return when extraTraceMessages: - trace logTxt "started", peer, ctx=buddy.healingCtx() + trace logTxt "started", peer, ctx=buddy.healingCtx(env) var nNodesFetched = 0 nFetchLoop = 0 # Stop after `snapAccountsHealBatchFetchMax` nodes have been fetched while nNodesFetched < snapAccountsHealBatchFetchMax: - var nNodes = await buddy.accountsHealingImpl() + var nNodes = await buddy.accountsHealingImpl(env) if nNodes <= 0: break nNodesFetched.inc(nNodes) nFetchLoop.inc when extraTraceMessages: - trace logTxt "job done", peer, ctx=buddy.healingCtx(), - nNodesFetched, nFetchLoop + trace logTxt "job done", peer, ctx=buddy.healingCtx(env), + nNodesFetched, nFetchLoop, runState=buddy.ctrl.state # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/heal_storage_slots.nim b/nimbus/sync/snap/worker/heal_storage_slots.nim index 7fbc8f816..f461aa276 100644 --- a/nimbus/sync/snap/worker/heal_storage_slots.nim +++ b/nimbus/sync/snap/worker/heal_storage_slots.nim @@ -13,7 +13,7 @@ ## ## This module works similar to `heal_accounts` applied to each per-account ## storage slots hexary trie. These per-account trie work items are stored in -## the list `env.fetchStorage`. +## the pair of queues `env.fetchStorageFull` and `env.fetchStoragePart`. ## ## There is one additional short cut for speeding up processing. If a ## per-account storage slots hexary trie is marked inheritable, it will be @@ -55,10 +55,9 @@ template logTxt(info: static[string]): static[string] = proc healingCtx( buddy: SnapBuddyRef; kvp: SnapSlotsQueuePair; + env: SnapPivotRef; ): string = - let - env = buddy.data.pivotEnv - slots = kvp.data.slots + let slots = kvp.data.slots "{" & "pivot=" & "#" & $env.stateHeader.blockNumber & "," & "covered=" & slots.unprocessed.emptyFactor.toPC(0) & "," & @@ -96,7 +95,9 @@ proc acceptWorkItemAsIs( proc updateMissingNodesList( buddy: SnapBuddyRef; - kvp: SnapSlotsQueuePair) = + kvp: SnapSlotsQueuePair; + env: SnapPivotRef; + ) = ## Check whether previously missing nodes from the `missingNodes` list ## have been magically added to the database since it was checked last ## time. These nodes will me moved to `checkNodes` for further processing. @@ -104,7 +105,6 @@ proc updateMissingNodesList( ctx = buddy.ctx db = ctx.data.snapDb peer = buddy.peer - env = buddy.data.pivotEnv accKey = kvp.data.accKey storageRoot = kvp.key slots = kvp.data.slots @@ -126,6 +126,7 @@ proc updateMissingNodesList( proc appendMoreDanglingNodesToMissingNodesList( buddy: SnapBuddyRef; kvp: SnapSlotsQueuePair; + env: SnapPivotRef; ): bool = ## Starting with a given set of potentially dangling intermediate trie nodes ## `checkNodes`, this set is filtered and processed. The outcome is fed back @@ -134,7 +135,6 @@ proc appendMoreDanglingNodesToMissingNodesList( ctx = buddy.ctx db = ctx.data.snapDb peer = buddy.peer - env = buddy.data.pivotEnv accKey = kvp.data.accKey storageRoot = kvp.key slots = kvp.data.slots @@ -143,9 +143,9 @@ proc appendMoreDanglingNodesToMissingNodesList( if rc.isErr: when extraTraceMessages: - error logTxt "failed => stop", peer, - itCtx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists, - nStorageQueue=env.fetchStorage.len, error=rc.error + let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len + error logTxt "failed => stop", peer, itCtx=buddy.healingCtx(kvp,env), + nSlotLists=env.nSlotLists, nStorageQueue, error=rc.error # Attempt to switch peers, there is not much else we can do here buddy.ctrl.zombie = true return false @@ -160,6 +160,7 @@ proc appendMoreDanglingNodesToMissingNodesList( proc getMissingNodesFromNetwork( buddy: SnapBuddyRef; kvp: SnapSlotsQueuePair; + env: SnapPivotRef; ): Future[seq[NodeSpecs]] {.async.} = ## Extract from `missingNodes` the next batch of nodes that need @@ -167,7 +168,6 @@ proc getMissingNodesFromNetwork( let ctx = buddy.ctx peer = buddy.peer - env = buddy.data.pivotEnv accKey = kvp.data.accKey storageRoot = kvp.key pivot = "#" & $env.stateHeader.blockNumber # for logging @@ -217,15 +217,17 @@ proc getMissingNodesFromNetwork( if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): discard when extraTraceMessages: + let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len trace logTxt "fetch nodes error => stop", peer, - itCtx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists, - nStorageQueue=env.fetchStorage.len, error + itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists, + nStorageQueue, error else: discard when extraTraceMessages: + let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len trace logTxt "fetch nodes error", peer, - itCtx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists, - nStorageQueue=env.fetchStorage.len, error + itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists, + nStorageQueue, error return @[] @@ -234,12 +236,12 @@ proc kvStorageSlotsLeaf( buddy: SnapBuddyRef; kvp: SnapSlotsQueuePair; node: NodeSpecs; + env: SnapPivotRef; ): (bool,NodeKey) {.gcsafe, raises: [Defect,RlpError]} = ## Read leaf node from persistent database (if any) let peer = buddy.peer - env = buddy.data.pivotEnv nodeRlp = rlpFromBytes node.data (_,prefix) = hexPrefixDecode node.partialPath @@ -252,12 +254,13 @@ proc kvStorageSlotsLeaf( proc registerStorageSlotsLeaf( buddy: SnapBuddyRef; kvp: SnapSlotsQueuePair; - slotKey: NodeKey) = + slotKey: NodeKey; + env: SnapPivotRef; + ) = ## Process single trie node as would be done with an interval by ## the `storeStorageSlots()` function let peer = buddy.peer - env = buddy.data.pivotEnv slots = kvp.data.slots pt = slotKey.to(NodeTag) @@ -274,9 +277,60 @@ proc registerStorageSlotsLeaf( discard ivSet.reduce(pt,pt) when extraTraceMessages: + let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len trace logTxt "single node", peer, - itCtx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists, - nStorageQueue=env.fetchStorage.len, slotKey=pt + itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists, + nStorageQueue, slotKey=pt + + +proc assembleWorkItemsQueue( + buddy: SnapBuddyRef; + env: SnapPivotRef; + ): (seq[SnapSlotsQueuePair],int) = + ## .. + var + toBeHealed: seq[SnapSlotsQueuePair] + nAcceptedAsIs = 0 + + # Search the current slot item batch list for items to complete via healing + for kvp in env.fetchStoragePart.nextPairs: + # Marked items indicate that a partial sub-trie existsts which might have + # been inherited from an earlier storage root. + if kvp.data.inherit: + + # Remove `kvp` work item from the queue object (which is allowed within a + # `for` loop over a `KeyedQueue` object type.) + env.fetchStorageFull.del(kvp.key) + + # With some luck, the `kvp` work item refers to a complete storage trie + # that can be be accepted as-is in wich case `kvp` can be just dropped. + let rc = buddy.acceptWorkItemAsIs(kvp) + if rc.isOk and rc.value: + env.nSlotLists.inc + nAcceptedAsIs.inc # for logging + continue # dropping `kvp` + + toBeHealed.add kvp + if healStoragesSlotsBatchMax <= toBeHealed.len: + return (toBeHealed, nAcceptedAsIs) + + # Ditto for partial items queue + for kvp in env.fetchStoragePart.nextPairs: + if healSlorageSlotsTrigger <= kvp.data.slots.unprocessed.emptyFactor: + env.fetchStoragePart.del(kvp.key) + + let rc = buddy.acceptWorkItemAsIs(kvp) + if rc.isOk and rc.value: + env.nSlotLists.inc + nAcceptedAsIs.inc # for logging + continue # dropping `kvp` + + # Add to local batch to be processed, below + toBeHealed.add kvp + if healStoragesSlotsBatchMax <= toBeHealed.len: + break + + (toBeHealed, nAcceptedAsIs) # ------------------------------------------------------------------------------ # Private functions: do the healing for one work item (sub-trie) @@ -285,6 +339,7 @@ proc registerStorageSlotsLeaf( proc storageSlotsHealing( buddy: SnapBuddyRef; kvp: SnapSlotsQueuePair; + env: SnapPivotRef; ): Future[bool] {.async.} = ## Returns `true` is the sub-trie is complete (probably inherited), and @@ -293,29 +348,30 @@ proc storageSlotsHealing( ctx = buddy.ctx db = ctx.data.snapDb peer = buddy.peer - env = buddy.data.pivotEnv accKey = kvp.data.accKey slots = kvp.data.slots when extraTraceMessages: - trace logTxt "started", peer, itCtx=buddy.healingCtx(kvp), - nSlotLists=env.nSlotLists, nStorageQueue=env.fetchStorage.len + block: + let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len + trace logTxt "started", peer, itCtx=buddy.healingCtx(kvp,env), + nSlotLists=env.nSlotLists, nStorageQueue # Update for changes since last visit - buddy.updateMissingNodesList(kvp) + buddy.updateMissingNodesList(kvp, env) # ??? if slots.checkNodes.len != 0: - if not buddy.appendMoreDanglingNodesToMissingNodesList(kvp): + if not buddy.appendMoreDanglingNodesToMissingNodesList(kvp,env): return false # Check whether the trie is complete. if slots.missingNodes.len == 0: - trace logTxt "complete", peer, itCtx=buddy.healingCtx(kvp) + trace logTxt "complete", peer, itCtx=buddy.healingCtx(kvp,env) return true # Get next batch of nodes that need to be merged it into the database - let nodeSpecs = await buddy.getMissingNodesFromNetwork(kvp) + let nodeSpecs = await buddy.getMissingNodesFromNetwork(kvp,env) if nodeSpecs.len == 0: return @@ -323,17 +379,19 @@ proc storageSlotsHealing( let report = db.importRawStorageSlotsNodes(peer, accKey, nodeSpecs) if 0 < report.len and report[^1].slot.isNone: # Storage error, just run the next lap (not much else that can be done) + let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len error logTxt "error updating persistent database", peer, - itCtx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists, - nStorageQueue=env.fetchStorage.len, nNodes=nodeSpecs.len, - error=report[^1].error + itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists, + nStorageQueue, nNodes=nodeSpecs.len, error=report[^1].error slots.missingNodes = slots.missingNodes & nodeSpecs return false when extraTraceMessages: - trace logTxt "nodes merged into database", peer, - itCtx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists, - nStorageQueue=env.fetchStorage.len, nNodes=nodeSpecs.len + block: + let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len + trace logTxt "nodes merged into database", peer, + itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists, + nStorageQueue, nNodes=nodeSpecs.len # Filter out error and leaf nodes var nLeafNodes = 0 # for logging @@ -354,23 +412,24 @@ proc storageSlotsHealing( else: # Node has been stored, double check let (isLeaf, slotKey) = - buddy.kvStorageSlotsLeaf(kvp, nodeSpecs[inx]) + buddy.kvStorageSlotsLeaf(kvp, nodeSpecs[inx], env) if isLeaf: # Update `uprocessed` registry, collect storage roots (if any) - buddy.registerStorageSlotsLeaf(kvp, slotKey) + buddy.registerStorageSlotsLeaf(kvp, slotKey, env) nLeafNodes.inc else: slots.checkNodes.add nodePath when extraTraceMessages: - trace logTxt "job done", peer, - itCtx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists, - nStorageQueue=env.fetchStorage.len, nLeafNodes + let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len + trace logTxt "job done", peer, itCtx=buddy.healingCtx(kvp,env), + nSlotLists=env.nSlotLists, nStorageQueue, nLeafNodes proc healingIsComplete( buddy: SnapBuddyRef; kvp: SnapSlotsQueuePair; + env: SnapPivotRef; ): Future[bool] {.async.} = ## Check whether the storage trie can be completely inherited and prepare for @@ -382,7 +441,6 @@ proc healingIsComplete( ctx = buddy.ctx db = ctx.data.snapDb peer = buddy.peer - env = buddy.data.pivotEnv accKey = kvp.data.accKey storageRoot = kvp.key @@ -392,9 +450,9 @@ proc healingIsComplete( if rc.isErr: # Oops, not much we can do here (looping trie?) + let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len error logTxt "problem inspecting storage trie", peer, - nSlotLists=env.nSlotLists, nStorageQueue=env.fetchStorage.len, - storageRoot, error=rc.error + nSlotLists=env.nSlotLists, nStorageQueue, storageRoot, error=rc.error return false # Check whether the hexary trie can be inherited as-is. @@ -413,7 +471,7 @@ proc healingIsComplete( NodeTagRange.new(low(NodeTag),high(NodeTag))) # Proceed with healing - return await buddy.storageSlotsHealing(kvp) + return await buddy.storageSlotsHealing(kvp, env) # ------------------------------------------------------------------------------ # Public functions @@ -427,72 +485,40 @@ proc healStorageSlots*(buddy: SnapBuddyRef) {.async.} = peer = buddy.peer env = buddy.data.pivotEnv var - toBeHealed: seq[SnapSlotsQueuePair] - nAcceptedAsIs = 0 - - # Search the current slot item batch list for items to complete via healing - for kvp in env.fetchStorage.nextPairs: - - # Marked items indicate that a partial sub-trie existsts which might have - # been inherited from an earlier storage root. - if not kvp.data.inherit: - let slots = kvp.data.slots - - # Otherwise check partally fetched sub-tries only if they have a certain - # degree of completeness. - if slots.isNil or slots.unprocessed.emptyFactor < healSlorageSlotsTrigger: - continue - - # Remove `kvp` work item from the queue object (which is allowed within a - # `for` loop over a `KeyedQueue` object type.) - env.fetchStorage.del(kvp.key) - - # With some luck, the `kvp` work item refers to a complete storage trie - # that can be be accepted as-is in wich case `kvp` can be just dropped. - block: - let rc = buddy.acceptWorkItemAsIs(kvp) - if rc.isOk and rc.value: - env.nSlotLists.inc - nAcceptedAsIs.inc # for logging - continue # dropping `kvp` - - # Add to local batch to be processed, below - toBeHealed.add kvp - if healStoragesSlotsBatchMax <= toBeHealed.len: - break + (toBeHealed, nAcceptedAsIs) = buddy.assembleWorkItemsQueue(env) # Run against local batch let nHealerQueue = toBeHealed.len if 0 < nHealerQueue: when extraTraceMessages: - trace logTxt "processing", peer, - nSlotLists=env.nSlotLists, nStorageQueue=env.fetchStorage.len, - nHealerQueue, nAcceptedAsIs + block: + let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len + trace logTxt "processing", peer, + nSlotLists=env.nSlotLists, nStorageQueue, nHealerQueue, nAcceptedAsIs for n in 0 ..< toBeHealed.len: - let - kvp = toBeHealed[n] - isComplete = await buddy.healingIsComplete(kvp) - if isComplete: - env.nSlotLists.inc - nAcceptedAsIs.inc - else: - env.fetchStorage.merge kvp + let kvp = toBeHealed[n] - if buddy.ctrl.stopped: - # Oops, peer has gone - env.fetchStorage.merge toBeHealed[n+1 ..< toBeHealed.len] - break + if buddy.ctrl.running: + if await buddy.healingIsComplete(kvp,env): + env.nSlotLists.inc + nAcceptedAsIs.inc + continue + + if kvp.data.slots.isNil: + env.fetchStorageFull.merge kvp # should be the exception + else: + env.fetchStoragePart.merge kvp when extraTraceMessages: - trace logTxt "done", peer, - nSlotLists=env.nSlotLists, nStorageQueue=env.fetchStorage.len, - nHealerQueue, nAcceptedAsIs + let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len + trace logTxt "done", peer, nSlotLists=env.nSlotLists, nStorageQueue, + nHealerQueue, nAcceptedAsIs, runState=buddy.ctrl.state elif 0 < nAcceptedAsIs: - trace logTxt "work items", peer, - nSlotLists=env.nSlotLists, nStorageQueue=env.fetchStorage.len, - nHealerQueue, nAcceptedAsIs + let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len + trace logTxt "work items", peer, nSlotLists=env.nSlotLists, + nStorageQueue, nHealerQueue, nAcceptedAsIs, runState=buddy.ctrl.state # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/range_fetch_accounts.nim b/nimbus/sync/snap/worker/range_fetch_accounts.nim index d5562da98..dde5a5384 100644 --- a/nimbus/sync/snap/worker/range_fetch_accounts.nim +++ b/nimbus/sync/snap/worker/range_fetch_accounts.nim @@ -37,7 +37,7 @@ import stew/[interval_set, keyed_queue], stint, ../../sync_desc, - ".."/[range_desc, worker_desc], + ".."/[constants, range_desc, worker_desc], ./com/[com_error, get_account_range], ./db/snapdb_accounts @@ -50,12 +50,6 @@ const extraTraceMessages = false or true ## Enabled additional logging noise - numChunksMax = 2000 - ## Bound for `numChunks()` (some fancy number) - - addToFetchLoopMax = 4 - ## Add some extra when calculating number of fetch/store rounds - # ------------------------------------------------------------------------------ # Private logging helpers # ------------------------------------------------------------------------------ @@ -63,75 +57,51 @@ const template logTxt(info: static[string]): static[string] = "Accounts range " & info -# ------------------------------------------------------------------------------ -# Private helpers -# ------------------------------------------------------------------------------ - -proc numChunks(buddy: SnapBuddyRef): int = - var total = 0u64 - for ivSet in buddy.data.pivotEnv.fetchAccounts.unprocessed: - total += ivSet.chunks.uint64 - min(numChunksMax.uint64, total).int - -proc withMaxLen( +proc dumpUnprocessed( buddy: SnapBuddyRef; - iv: NodeTagRange; - maxlen: UInt256; - ): NodeTagRange = - ## Reduce accounts interval to maximal size - if 0 < iv.len and iv.len <= maxLen: - iv - else: - NodeTagRange.new(iv.minPt, iv.minPt + (maxLen - 1.u256)) + env: SnapPivotRef; + ): string = + ## Debugging ... + let + peer = buddy.peer + pivot = "#" & $env.stateHeader.blockNumber # for logging + moan = proc(overlap: UInt256; iv: NodeTagRange) = + trace logTxt "unprocessed => overlap", peer, pivot, overlap, iv + + env.fetchAccounts.unprocessed.dump(moan, 5) # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ -proc getUnprocessed(buddy: SnapBuddyRef): Result[NodeTagRange,void] = +proc getUnprocessed( + buddy: SnapBuddyRef; + env: SnapPivotRef; + ): Result[NodeTagRange,void] = ## Fetch an interval from one of the account range lists. - let - env = buddy.data.pivotEnv - accountRangeMax = high(UInt256) div buddy.ctx.buddiesMax.u256 + let accountRangeMax = high(UInt256) div buddy.ctx.buddiesMax.u256 - for ivSet in env.fetchAccounts.unprocessed: - let rc = ivSet.ge() - if rc.isOk: - let iv = buddy.withMaxLen(rc.value, accountRangeMax) - discard ivSet.reduce(iv) - return ok(iv) - - err() - -proc putUnprocessed(buddy: SnapBuddyRef; iv: NodeTagRange) = - ## Shortcut - discard buddy.data.pivotEnv.fetchAccounts.unprocessed[1].merge(iv) - -proc delUnprocessed(buddy: SnapBuddyRef; iv: NodeTagRange) = - ## Shortcut - discard buddy.data.pivotEnv.fetchAccounts.unprocessed[1].reduce(iv) - -proc markGloballyProcessed(buddy: SnapBuddyRef; iv: NodeTagRange) = - ## Shortcut - discard buddy.ctx.data.coveredAccounts.merge(iv) + env.fetchAccounts.unprocessed.fetch accountRangeMax # ------------------------------------------------------------------------------ # Private functions: do the account fetching for one round # ------------------------------------------------------------------------------ -proc accountsRagefetchImpl(buddy: SnapBuddyRef): Future[bool] {.async.} = +proc accountsRangefetchImpl( + buddy: SnapBuddyRef; + env: SnapPivotRef; + ): Future[bool] {.async.} = ## Fetch accounts and store them in the database. Returns true while more ## data can probably be fetched. let ctx = buddy.ctx peer = buddy.peer - env = buddy.data.pivotEnv stateRoot = env.stateHeader.stateRoot pivot = "#" & $env.stateHeader.blockNumber # for logging # Get a range of accounts to fetch from let iv = block: - let rc = buddy.getUnprocessed() + let rc = buddy.getUnprocessed(env) if rc.isErr: when extraTraceMessages: trace logTxt "currently all processed", peer, pivot @@ -142,7 +112,7 @@ proc accountsRagefetchImpl(buddy: SnapBuddyRef): Future[bool] {.async.} = let dd = block: let rc = await buddy.getAccountRange(stateRoot, iv, pivot) if rc.isErr: - buddy.putUnprocessed(iv) # fail => interval back to pool + env.fetchAccounts.unprocessed.merge iv # fail => interval back to pool let error = rc.error if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): when extraTraceMessages: @@ -161,44 +131,56 @@ proc accountsRagefetchImpl(buddy: SnapBuddyRef): Future[bool] {.async.} = # trace logTxt "fetched", peer, gotAccounts, gotStorage, # pivot, reqLen=iv.len, gotLen=dd.consumed.len - block: - let rc = ctx.data.snapDb.importAccounts(peer, stateRoot, iv.minPt, dd.data) + # Now, as we fully own the scheduler and the original interval can savely be + # placed back for a moment -- to be corrected below. + env.fetchAccounts.unprocessed.merge iv + + # Processed accounts hashes are set up as a set of intervals which is needed + # if the data range returned from the network contains holes. + let processed = NodeTagRangeSet.init() + if 0 < dd.data.accounts.len: + discard processed.merge(iv.minPt, dd.data.accounts[^1].accKey.to(NodeTag)) + else: + discard processed.merge iv + + let dangling = block: + # No left boundary check needed. If there is a gap, the partial path for + # that gap is returned by the import function to be registered, below. + let rc = ctx.data.snapDb.importAccounts( + peer, stateRoot, iv.minPt, dd.data, noBaseBoundCheck = true) if rc.isErr: # Bad data, just try another peer - buddy.putUnprocessed(iv) buddy.ctrl.zombie = true when extraTraceMessages: trace logTxt "import failed => stop", peer, gotAccounts, gotStorage, - pivot, reqLen=iv.len, gotLen=dd.consumed.len, error=rc.error + pivot, reqLen=iv.len, gotLen=processed.total, error=rc.error return + rc.value # Statistics env.nAccounts.inc(gotAccounts) - # Register consumed intervals on the accumulator over all state roots - buddy.markGloballyProcessed(dd.consumed) + # Punch holes into the reproted range from the network if it contains holes. + for w in dangling: + discard processed.reduce( + w.partialPath.min(NodeKey).to(NodeTag), + w.partialPath.max(NodeKey).to(Nodetag)) - # Register consumed and bulk-imported (well, not yet) accounts range - block registerConsumed: - block: - # Both intervals `min(iv)` and `min(dd.consumed)` are equal - let rc = iv - dd.consumed - if rc.isOk: - # Now, `dd.consumed` < `iv`, return some unused range - buddy.putUnprocessed(rc.value) - break registerConsumed - block: - # The processed interval might be a bit larger - let rc = dd.consumed - iv - if rc.isOk: - # Remove from unprocessed data. If it is not unprocessed, anymore - # then it was doubly processed which is ok. - buddy.delUnprocessed(rc.value) - break registerConsumed - # End registerConsumed + # Update book keeping + for w in processed.increasing: + # Remove the processed range from the batch of unprocessed ones. + env.fetchAccounts.unprocessed.reduce w + # Register consumed intervals on the accumulator over all state roots. + discard buddy.ctx.data.coveredAccounts.merge w - # Store accounts on the storage TODO list. - env.fetchStorage.merge dd.withStorage + # Register accounts with storage slots on the storage TODO list. + env.fetchStorageFull.merge dd.withStorage + + when extraTraceMessages: + trace logTxt "request done", peer, pivot, + nCheckNodes=env.fetchAccounts.checkNodes.len, + nMissingNodes=env.fetchAccounts.missingNodes.len, + imported=processed.dump(), unprocessed=buddy.dumpUnprocessed(env) return true @@ -208,27 +190,32 @@ proc accountsRagefetchImpl(buddy: SnapBuddyRef): Future[bool] {.async.} = proc rangeFetchAccounts*(buddy: SnapBuddyRef) {.async.} = ## Fetch accounts and store them in the database. - let numChunks = buddy.numChunks() - if 0 < numChunks: + let env = buddy.data.pivotEnv + if not env.fetchAccounts.unprocessed.isEmpty(): let ctx = buddy.ctx peer = buddy.peer - env = buddy.data.pivotEnv pivot = "#" & $env.stateHeader.blockNumber # for logging - nFetchLoopMax = max(ctx.buddiesMax + 1, numChunks) + addToFetchLoopMax - when extraTraceMessages: - trace logTxt "start", peer, pivot, nFetchLoopMax + trace logTxt "start", peer, pivot var nFetchAccounts = 0 - while nFetchAccounts < nFetchLoopMax: - if not await buddy.accountsRagefetchImpl(): - break + while not env.fetchAccounts.unprocessed.isEmpty() and + buddy.ctrl.running and + env == buddy.data.pivotEnv: nFetchAccounts.inc + if not await buddy.accountsRangefetchImpl(env): + break + + # Clean up storage slots queue first it it becomes too large + let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len + if snapAccountsBuddyStoragesSlotsQuPrioThresh < nStoQu: + break when extraTraceMessages: - trace logTxt "done", peer, pivot, nFetchAccounts, nFetchLoopMax + trace logTxt "done", peer, pivot, nFetchAccounts, + runState=buddy.ctrl.state # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/range_fetch_storage_slots.nim b/nimbus/sync/snap/worker/range_fetch_storage_slots.nim index a78c812f5..410efe281 100644 --- a/nimbus/sync/snap/worker/range_fetch_storage_slots.nim +++ b/nimbus/sync/snap/worker/range_fetch_storage_slots.nim @@ -34,7 +34,8 @@ ## ## Legend: ## * `<..>`: some action, process, etc. -## * `{missing-storage-slots}`: list implemented as `env.fetchStorage` +## * `{missing-storage-slots}`: list implemented as pair of queues +## `env.fetchStorageFull` and `env.fetchStoragePart` ## * `(storage-slots}`: list is optimised out ## * `{completed}`: list is optimised out ## * `{partial}`: list is optimised out @@ -78,76 +79,24 @@ template logTxt(info: static[string]): static[string] = "Storage slots range " & info # ------------------------------------------------------------------------------ -# Private functions +# Private helpers # ------------------------------------------------------------------------------ -proc getNextSlotItems( +proc getNextSlotItemsFull( buddy: SnapBuddyRef; - noSubRange = false; + env: SnapPivotRef; ): seq[AccountSlotsHeader] = - ## Get list of work item from the batch queue. - ## - ## * If the storage slots requested come with an explicit sub-range of slots - ## (i.e. not an implied complete list), then the result has only on work - ## item. An explicit list of slots is only calculated if there was a queue - ## item with a partially completed slots download. - ## - ## * Otherwise, a list of at most `snapStoragesSlotsFetchMax` work items is - ## returned. These work items were checked for that there was no trace of a - ## previously installed (probably partial) storage trie on the database - ## (e.g. inherited from an earlier state root pivot.) - ## - ## If there is an indication that the storage trie may have some data - ## already it is ignored here and marked `inherit` so that it will be - ## picked up by the healing process. + ## Get list of full work item from the batch queue. + ## + ## If there is an indication that the storage trie may have some data + ## already it is ignored here and marked `inherit` so that it will be + ## picked up by the healing process. let ctx = buddy.ctx peer = buddy.peer - env = buddy.data.pivotEnv - - # Assemble first request which might come with a sub-range. - if not noSubRange: - let (reqKey, reqData) = block: - let rc = env.fetchStorage.first # peek - if rc.isErr: - return - (rc.value.key, rc.value.data) - while not reqData.slots.isNil: - # Extract first interval and return single item request queue - for ivSet in reqData.slots.unprocessed: - let rc = ivSet.ge() - if rc.isOk: - - # Extraxt this interval from the range set - discard ivSet.reduce rc.value - - # Delete from batch queue if the range set becomes empty - if reqData.slots.unprocessed.isEmpty: - env.fetchStorage.del(reqKey) - - when extraTraceMessages: - trace logTxt "prepare fetch partial", peer, - nSlotLists=env.nSlotLists, nStorageQueue=env.fetchStorage.len, - nToProcess=1, subRange=rc.value, account=reqData.accKey - - return @[AccountSlotsHeader( - accKey: reqData.accKey, - storageRoot: reqKey, - subRange: some rc.value)] - - # Oops, empty range set? Remove range and move item to the right end - reqData.slots = nil - discard env.fetchStorage.lruFetch(reqKey) - - # Done with partial slot ranges. Assemble maximal request queue. - var nInherit = 0 - for kvp in env.fetchStorage.prevPairs: - if not kvp.data.slots.isNil: - # May happen when `noSubRange` is `true`. As the queue is read from the - # right end and all the partial slot ranges are on the left there will - # be no more non-partial slot ranges on the queue. So this loop is done. - break - + var + nInherit = 0 + for kvp in env.fetchStorageFull.nextPairs: let it = AccountSlotsHeader( accKey: kvp.data.accKey, storageRoot: kvp.key) @@ -160,50 +109,91 @@ proc getNextSlotItems( continue result.add it - env.fetchStorage.del(kvp.key) # ok to delete this item from batch queue + env.fetchStorageFull.del(kvp.key) # ok to delete this item from batch queue # Maximal number of items to fetch if snapStoragesSlotsFetchMax <= result.len: break when extraTraceMessages: - trace logTxt "fetch", peer, nSlotLists=env.nSlotLists, - nStorageQueue=env.fetchStorage.len, nToProcess=result.len, nInherit + trace logTxt "fetch full", peer, nSlotLists=env.nSlotLists, + nStorageQuFull=env.fetchStorageFull.len, nToProcess=result.len, nInherit +proc getNextSlotItemPartial( + buddy: SnapBuddyRef; + env: SnapPivotRef; + ): seq[AccountSlotsHeader] = + ## Get work item from the batch queue. + let + ctx = buddy.ctx + peer = buddy.peer + + for kvp in env.fetchStoragePart.nextPairs: + if not kvp.data.slots.isNil: + # Extract first interval and return single item request queue + for ivSet in kvp.data.slots.unprocessed: + let rc = ivSet.ge() + if rc.isOk: + + # Extraxt this interval from the range set + discard ivSet.reduce rc.value + + # Delete from batch queue if the range set becomes empty + if kvp.data.slots.unprocessed.isEmpty: + env.fetchStoragePart.del(kvp.key) + + when extraTraceMessages: + trace logTxt "fetch partial", peer, + nSlotLists=env.nSlotLists, + nStorageQuPart=env.fetchStoragePart.len, + subRange=rc.value, account=kvp.data.accKey + + return @[AccountSlotsHeader( + accKey: kvp.data.accKey, + storageRoot: kvp.key, + subRange: some rc.value)] + + # Oops, empty range set? Remove range and move item to the full requests + kvp.data.slots = nil + env.fetchStorageFull.merge kvp + + +proc backToSlotItemsQueue(env: SnapPivotRef; req: seq[AccountSlotsHeader]) = + if 0 < req.len: + if req[^1].subRange.isSome: + env.fetchStoragePart.merge req[^1] + env.fetchStorageFull.merge req[0 ..< req.len-1] + else: + env.fetchStorageFull.merge req + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + proc storeStoragesSingleBatch( buddy: SnapBuddyRef; - noSubRange = false; + req: seq[AccountSlotsHeader]; + env: SnapPivotRef; ) {.async.} = ## Fetch account storage slots and store them in the database. let ctx = buddy.ctx peer = buddy.peer - env = buddy.data.pivotEnv stateRoot = env.stateHeader.stateRoot pivot = "#" & $env.stateHeader.blockNumber # for logging - # Fetch storage data and save it on disk. Storage requests are managed by - # a request queue for handling partioal replies and re-fetch issues. For - # all practical puroses, this request queue should mostly be empty. - - # Pull out the next request list from the queue - let req = buddy.getNextSlotItems() - if req.len == 0: - return # currently nothing to do - # Get storages slots data from the network var stoRange = block: let rc = await buddy.getStorageRanges(stateRoot, req, pivot) if rc.isErr: - env.fetchStorage.merge req + env.backToSlotItemsQueue req let error = rc.error if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): - discard + let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len trace logTxt "fetch error => stop", peer, pivot, - nSlotLists=env.nSlotLists, nReq=req.len, - nStorageQueue=env.fetchStorage.len, error + nSlotLists=env.nSlotLists, nReq=req.len, nStorageQueue, error return rc.value @@ -213,25 +203,28 @@ proc storeStoragesSingleBatch( var gotSlotLists = stoRange.data.storages.len #when extraTraceMessages: - # trace logTxt "fetched", peer, pivot, - # nSlotLists=env.nSlotLists, nSlotLists=gotSlotLists, nReq=req.len, - # nStorageQueue=env.fetchStorage.len, nLeftOvers=stoRange.leftOver.len + # let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len + # trace logTxt "fetched", peer, pivot, nSlotLists=env.nSlotLists, + # nSlotLists=gotSlotLists, nReq=req.len, + # nStorageQueue, nLeftOvers=stoRange.leftOver.len if 0 < gotSlotLists: # Verify/process storages data and save it to disk - let report = ctx.data.snapDb.importStorageSlots(peer, stoRange.data) + let report = ctx.data.snapDb.importStorageSlots( + peer, stoRange.data, noBaseBoundCheck = true) if 0 < report.len: let topStoRange = stoRange.data.storages.len - 1 if report[^1].slot.isNone: # Failed to store on database, not much that can be done here - env.fetchStorage.merge req + env.backToSlotItemsQueue req gotSlotLists.dec(report.len - 1) # for logging only + let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len error logTxt "import failed", peer, pivot, nSlotLists=env.nSlotLists, nSlotLists=gotSlotLists, nReq=req.len, - nStorageQueue=env.fetchStorage.len, error=report[^1].error + nStorageQueue, error=report[^1].error return # Push back error entries to be processed later @@ -246,33 +239,37 @@ proc storeStoragesSingleBatch( # Reset any partial result (which would be the last entry) to # requesting the full interval. So all the storage slots are # re-fetched completely for this account. - env.fetchStorage.merge AccountSlotsHeader( + env.fetchStorageFull.merge AccountSlotsHeader( accKey: stoRange.data.storages[inx].account.accKey, storageRoot: stoRange.data.storages[inx].account.storageRoot) # Last entry might be partial if inx == topStoRange: - # No partial result processing anymore to consider + # Forget about partial result processing if the last partial entry + # was reported because + # * either there was an error processing it + # * or there were some gaps reprored as dangling links stoRange.data.proof = @[] # Update local statistics counter for `nSlotLists` counter update gotSlotLists.dec - trace logTxt "processing error", peer, pivot, nSlotLists=env.nSlotLists, + let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len + error logTxt "processing error", peer, pivot, nSlotLists=env.nSlotLists, nSlotLists=gotSlotLists, nReqInx=inx, nReq=req.len, - nStorageQueue=env.fetchStorage.len, error=report[inx].error + nStorageQueue, nDangling=w.dangling.len, error=w.error # Update statistics if gotSlotLists == 1 and req[0].subRange.isSome and - env.fetchStorage.hasKey req[0].storageRoot: + env.fetchStoragePart.hasKey req[0].storageRoot: # Successful partial request, but not completely done with yet. gotSlotLists = 0 env.nSlotLists.inc(gotSlotLists) # Return unprocessed left overs to batch queue - env.fetchStorage.merge stoRange.leftOver + env.backToSlotItemsQueue stoRange.leftOver # ------------------------------------------------------------------------------ # Public functions @@ -287,43 +284,46 @@ proc rangeFetchStorageSlots*(buddy: SnapBuddyRef) {.async.} = let env = buddy.data.pivotEnv peer = buddy.peer + fullRangeLen = env.fetchStorageFull.len + partRangeLen = env.fetchStoragePart.len - if 0 < env.fetchStorage.len: - # Run at most the minimum number of times to get the batch queue cleaned up. - var - fullRangeLoopCount = - 1 + (env.fetchStorage.len - 1) div snapStoragesSlotsFetchMax - subRangeLoopCount = 0 - - # Add additional counts for partial slot range items - for reqData in env.fetchStorage.nextValues: - if reqData.slots.isNil: - break - subRangeLoopCount.inc + # Fetch storage data and save it on disk. Storage requests are managed by + # request queues for handling full/partial replies and re-fetch issues. For + # all practical puroses, this request queue should mostly be empty. + if 0 < fullRangeLen or 0 < partRangeLen: when extraTraceMessages: trace logTxt "start", peer, nSlotLists=env.nSlotLists, - nStorageQueue=env.fetchStorage.len, fullRangeLoopCount, - subRangeLoopCount + nStorageQueue=(fullRangeLen+partRangeLen) # Processing the full range will implicitely handle inheritable storage - # slots first wich each batch item (see `getNextSlotItems()`.) - while 0 < fullRangeLoopCount and - 0 < env.fetchStorage.len and - not buddy.ctrl.stopped: - fullRangeLoopCount.dec - await buddy.storeStoragesSingleBatch(noSubRange = true) + # slots first with each batch item (see `getNextSlotItemsFull()`.) + var fullRangeItemsleft = 1 + (fullRangeLen-1) div snapStoragesSlotsFetchMax + while 0 < fullRangeItemsleft and + buddy.ctrl.running and + env == buddy.data.pivotEnv: + # Pull out the next request list from the queue + let req = buddy.getNextSlotItemsFull(env) + if req.len == 0: + break + fullRangeItemsleft.dec + await buddy.storeStoragesSingleBatch(req, env) - while 0 < subRangeLoopCount and - 0 < env.fetchStorage.len and - not buddy.ctrl.stopped: - subRangeLoopCount.dec - await buddy.storeStoragesSingleBatch(noSubRange = false) + var partialRangeItemsLeft = env.fetchStoragePart.len + while 0 < partialRangeItemsLeft and + buddy.ctrl.running and + env == buddy.data.pivotEnv: + # Pull out the next request list from the queue + let req = buddy.getNextSlotItemPartial(env) + if req.len == 0: + break + partialRangeItemsLeft.dec + await buddy.storeStoragesSingleBatch(req, env) when extraTraceMessages: - trace logTxt "done", peer, nSlotLists=env.nSlotLists, - nStorageQueue=env.fetchStorage.len, fullRangeLoopCount, - subRangeLoopCount + let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len + trace logTxt "done", peer, nSlotLists=env.nSlotLists, nStorageQueue, + fullRangeItemsleft, partialRangeItemsLeft, runState=buddy.ctrl.state # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/ticker.nim b/nimbus/sync/snap/worker/ticker.nim index b8274ad0d..8d1924968 100644 --- a/nimbus/sync/snap/worker/ticker.nim +++ b/nimbus/sync/snap/worker/ticker.nim @@ -53,44 +53,49 @@ const # Private functions: pretty printing # ------------------------------------------------------------------------------ -proc ppMs*(elapsed: times.Duration): string - {.gcsafe, raises: [Defect, ValueError]} = - result = $elapsed.inMilliseconds - let ns = elapsed.inNanoseconds mod 1_000_000 # fraction of a milli second - if ns != 0: - # to rounded deca milli seconds - let dm = (ns + 5_000i64) div 10_000i64 - result &= &".{dm:02}" - result &= "ms" +# proc ppMs*(elapsed: times.Duration): string +# {.gcsafe, raises: [Defect, ValueError]} = +# result = $elapsed.inMilliseconds +# let ns = elapsed.inNanoseconds mod 1_000_000 # fraction of a milli second +# if ns != 0: +# # to rounded deca milli seconds +# let dm = (ns + 5_000i64) div 10_000i64 +# result &= &".{dm:02}" +# result &= "ms" +# +# proc ppSecs*(elapsed: times.Duration): string +# {.gcsafe, raises: [Defect, ValueError]} = +# result = $elapsed.inSeconds +# let ns = elapsed.inNanoseconds mod 1_000_000_000 # fraction of a second +# if ns != 0: +# # round up +# let ds = (ns + 5_000_000i64) div 10_000_000i64 +# result &= &".{ds:02}" +# result &= "s" +# +# proc ppMins*(elapsed: times.Duration): string +# {.gcsafe, raises: [Defect, ValueError]} = +# result = $elapsed.inMinutes +# let ns = elapsed.inNanoseconds mod 60_000_000_000 # fraction of a minute +# if ns != 0: +# # round up +# let dm = (ns + 500_000_000i64) div 1_000_000_000i64 +# result &= &":{dm:02}" +# result &= "m" +# +# proc pp(d: times.Duration): string +# {.gcsafe, raises: [Defect, ValueError]} = +# if 40 < d.inSeconds: +# d.ppMins +# elif 200 < d.inMilliseconds: +# d.ppSecs +# else: +# d.ppMs -proc ppSecs*(elapsed: times.Duration): string - {.gcsafe, raises: [Defect, ValueError]} = - result = $elapsed.inSeconds - let ns = elapsed.inNanoseconds mod 1_000_000_000 # fraction of a second - if ns != 0: - # round up - let ds = (ns + 5_000_000i64) div 10_000_000i64 - result &= &".{ds:02}" - result &= "s" - -proc ppMins*(elapsed: times.Duration): string - {.gcsafe, raises: [Defect, ValueError]} = - result = $elapsed.inMinutes - let ns = elapsed.inNanoseconds mod 60_000_000_000 # fraction of a minute - if ns != 0: - # round up - let dm = (ns + 500_000_000i64) div 1_000_000_000i64 - result &= &":{dm:02}" - result &= "m" - -proc pp(d: times.Duration): string - {.gcsafe, raises: [Defect, ValueError]} = - if 40 < d.inSeconds: - d.ppMins - elif 200 < d.inMilliseconds: - d.ppSecs - else: - d.ppMs +proc pc99(val: float): string = + if 0.99 <= val and val < 1.0: "99%" + elif 0.0 < val and val <= 0.01: "1%" + else: val.toPC(0) # ------------------------------------------------------------------------------ # Private functions: ticking log messages @@ -117,9 +122,9 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} = pivot = "n/a" nStoQue = "n/a" let - accCov = data.accountsFill[0].toPC(0) & - "(" & data.accountsFill[1].toPC(0) & ")" & - "/" & data.accountsFill[2].toPC(0) + accCov = data.accountsFill[0].pc99 & + "(" & data.accountsFill[1].pc99 & ")" & + "/" & data.accountsFill[2].pc99 buddies = t.nBuddies # With `int64`, there are more than 29*10^10 years range for seconds diff --git a/nimbus/sync/snap/worker_desc.nim b/nimbus/sync/snap/worker_desc.nim index 55444b066..6868d4ddd 100644 --- a/nimbus/sync/snap/worker_desc.nim +++ b/nimbus/sync/snap/worker_desc.nim @@ -38,17 +38,14 @@ type slots*: SnapTrieRangeBatchRef ## slots to fetch, nil => all slots inherit*: bool ## mark this trie seen already - SnapSlotsSet* = HashSet[SnapSlotsQueueItemRef] - ## Ditto but without order, to be used as veto set - - SnapAccountRanges* = array[2,NodeTagRangeSet] - ## Pair of account hash range lists. The first entry must be processed - ## first. This allows to coordinate peers working on different state roots - ## to avoid ovelapping accounts as long as they fetch from the first entry. + SnapTodoRanges* = array[2,NodeTagRangeSet] + ## Pair of node range lists. The first entry must be processed first. This + ## allows to coordinate peers working on different state roots to avoid + ## ovelapping accounts as long as they fetch from the first entry. SnapTrieRangeBatch* = object ## `NodeTag` ranges to fetch, healing support - unprocessed*: SnapAccountRanges ## Range of slots not covered, yet + unprocessed*: SnapTodoRanges ## Range of slots not covered, yet checkNodes*: seq[Blob] ## Nodes with prob. dangling child links missingNodes*: seq[NodeSpecs] ## Dangling links to fetch and merge @@ -73,7 +70,8 @@ type accountsState*: SnapHealingState ## All accounts have been processed # Storage slots download - fetchStorage*: SnapSlotsQueue ## Fetch storage for these accounts + fetchStorageFull*: SnapSlotsQueue ## Fetch storage trie for these accounts + fetchStoragePart*: SnapSlotsQueue ## Partial storage trie to com[plete storageDone*: bool ## Done with storage, block sync next # Info @@ -121,7 +119,57 @@ proc hash*(a: Hash256): Hash = a.data.hash # ------------------------------------------------------------------------------ -# Public helpers +# Public helpers: SnapTodoRanges +# ------------------------------------------------------------------------------ + +proc init*(q: var SnapTodoRanges) = + ## Populate node range sets with maximal range in the first range set + q[0] = NodeTagRangeSet.init() + q[1] = NodeTagRangeSet.init() + discard q[0].merge(low(NodeTag),high(NodeTag)) + + +proc merge*(q: var SnapTodoRanges; iv: NodeTagRange) = + ## Unconditionally merge the node range into the account ranges list + discard q[0].reduce(iv) + discard q[1].merge(iv) + +proc merge*(q: var SnapTodoRanges; minPt, maxPt: NodeTag) = + ## Variant of `merge()` + q.merge NodeTagRange.new(minPt, maxPt) + + +proc reduce*(q: var SnapTodoRanges; iv: NodeTagRange) = + ## Unconditionally remove the node range from the account ranges list + discard q[0].reduce(iv) + discard q[1].reduce(iv) + +proc reduce*(q: var SnapTodoRanges; minPt, maxPt: NodeTag) = + ## Variant of `reduce()` + q.reduce NodeTagRange.new(minPt, maxPt) + + +proc fetch*(q: var SnapTodoRanges; maxLen: UInt256): Result[NodeTagRange,void] = + ## Fetch interval from node ranges with maximal size `maxLen` + + # Swap batch queues if the first one is empty + if q[0].isEmpty: + swap(q[0], q[1]) + + # Fetch from first range list + let rc = q[0].ge() + if rc.isErr: + return err() + + let + val = rc.value + iv = if 0 < val.len and val.len <= maxLen: val # val.len==0 => 2^256 + else: NodeTagRange.new(val.minPt, val.minPt + (maxLen - 1.u256)) + discard q[0].reduce(iv) + ok(iv) + +# ------------------------------------------------------------------------------ +# Public helpers: SlotsQueue # ------------------------------------------------------------------------------ proc merge*(q: var SnapSlotsQueue; kvp: SnapSlotsQueuePair) = diff --git a/nimbus/sync/sync_desc.nim b/nimbus/sync/sync_desc.nim index 7b424dd8c..9bdb48cc7 100644 --- a/nimbus/sync/sync_desc.nim +++ b/nimbus/sync/sync_desc.nim @@ -73,7 +73,7 @@ proc running*(ctrl: BuddyCtrlRef): bool = proc stopped*(ctrl: BuddyCtrlRef): bool = ## Getter, if `true`, if `ctrl.state()` is not `Running` - ctrl.runState in {Stopped, ZombieStop, ZombieRun} + ctrl.runState != Running proc zombie*(ctrl: BuddyCtrlRef): bool = ## Getter, `true` if `ctrl.state()` is `Zombie` (i.e. not `running()` and diff --git a/tests/replay/undump_storages.nim b/tests/replay/undump_storages.nim index 968bef3f9..11b778daf 100644 --- a/tests/replay/undump_storages.nim +++ b/tests/replay/undump_storages.nim @@ -189,7 +189,7 @@ iterator undumpNextStorages*(gzFile: string): UndumpStorages = if flds.len == 2: data.data.storages[^1].data.add SnapStorage( slotHash: Hash256.fromHex(flds[0]), - slotData: flds[1].toByteSeq) + slotData: flds[1].toByteSeq) nSlots.dec if 0 < nSlots: continue @@ -211,6 +211,11 @@ iterator undumpNextStorages*(gzFile: string): UndumpStorages = nProofs.dec if nProofs <= 0: state = UndumpCommit + # KLUDGE: set base (field was later added) + if 0 < data.data.storages.len: + let topList = data.data.storages[^1] + if 0 < topList.data.len: + data.data.base = topList.data[0].slotHash.to(NodeTag) continue state = UndumpError say &"*** expected proof data, got {line}" diff --git a/tests/test_sync_snap.nim b/tests/test_sync_snap.nim index 9dad4b316..88338b0a4 100644 --- a/tests/test_sync_snap.nim +++ b/tests/test_sync_snap.nim @@ -66,6 +66,7 @@ let # Forces `check()` to print the error (as opposed when using `isOk()`) OkHexDb = Result[void,HexaryDbError].ok() OkStoDb = Result[void,seq[(int,HexaryDbError)]].ok() + OkImport = Result[seq[NodeSpecs],HexaryDbError].ok(@[]) # There was a problem with the Github/CI which results in spurious crashes # when leaving the `runner()` if the persistent BaseChainDB initialisation @@ -300,7 +301,7 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) = else: SnapDbRef.init(newMemoryDB()) dbDesc = SnapDbAccountsRef.init(dbBase, root, peer) for n,w in accountsList: - check dbDesc.importAccounts(w.base, w.data, persistent) == OkHexDb + check dbDesc.importAccounts(w.base, w.data, persistent) == OkImport test &"Merging {accountsList.len} proofs for state root ..{root.pp}": let dbBase = if persistent: SnapDbRef.init(db.cdb[1]) @@ -313,7 +314,9 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) = packed = PackedAccountRange( accounts: accountsList.mapIt(it.data.accounts).sortMerge, proof: accountsList.mapIt(it.data.proof).flatten) - check desc.importAccounts(lowerBound, packed, true) == OkHexDb + # Merging intervals will produce gaps, so the result is expected OK but + # different from `OkImport` + check desc.importAccounts(lowerBound, packed, true).isOk # check desc.merge(lowerBound, accounts) == OkHexDb desc.assignPrettyKeys() # for debugging, make sure that state root ~ "$0" @@ -430,7 +433,7 @@ proc storagesRunner( test &"Merging {accountsList.len} accounts for state root ..{root.pp}": for w in accountsList: let desc = SnapDbAccountsRef.init(dbBase, root, peer) - check desc.importAccounts(w.base, w.data, persistent) == OkHexDb + check desc.importAccounts(w.base, w.data, persistent) == OkImport test &"Merging {storagesList.len} storages lists": let @@ -512,7 +515,7 @@ proc inspectionRunner( rootKey = root.to(NodeKey) desc = SnapDbAccountsRef.init(memBase, root, peer) for w in accList: - check desc.importAccounts(w.base, w.data, persistent=false) == OkHexDb + check desc.importAccounts(w.base, w.data, persistent=false)==OkImport let rc = desc.inspectAccountsTrie(persistent=false) check rc.isOk let @@ -537,7 +540,7 @@ proc inspectionRunner( dbBase = SnapDbRef.init(db.cdb[2+n]) desc = SnapDbAccountsRef.init(dbBase, root, peer) for w in accList: - check desc.importAccounts(w.base, w.data, persistent) == OkHexDb + check desc.importAccounts(w.base, w.data, persistent) == OkImport let rc = desc.inspectAccountsTrie(persistent=false) check rc.isOk let @@ -557,7 +560,7 @@ proc inspectionRunner( rootKey = root.to(NodeKey) desc = memDesc.dup(root,Peer()) for w in accList: - check desc.importAccounts(w.base, w.data, persistent=false) == OkHexDb + check desc.importAccounts(w.base, w.data, persistent=false)==OkImport let rc = desc.inspectAccountsTrie(persistent=false) check rc.isOk let @@ -580,7 +583,7 @@ proc inspectionRunner( rootSet = [rootKey].toHashSet desc = perDesc.dup(root,Peer()) for w in accList: - check desc.importAccounts(w.base, w.data, persistent) == OkHexDb + check desc.importAccounts(w.base, w.data, persistent) == OkImport let rc = desc.inspectAccountsTrie(persistent=false) check rc.isOk let @@ -607,7 +610,7 @@ proc inspectionRunner( rootKey = root.to(NodeKey) desc = cscDesc.dup(root,Peer()) for w in accList: - check desc.importAccounts(w.base,w.data,persistent=false) == OkHexDb + check desc.importAccounts(w.base,w.data,persistent=false)==OkImport if cscStep.hasKeyOrPut(rootKey,(1,seq[Blob].default)): cscStep[rootKey][0].inc let @@ -639,7 +642,7 @@ proc inspectionRunner( rootKey = root.to(NodeKey) desc = cscDesc.dup(root,Peer()) for w in accList: - check desc.importAccounts(w.base,w.data,persistent) == OkHexDb + check desc.importAccounts(w.base,w.data,persistent) == OkImport if cscStep.hasKeyOrPut(rootKey,(1,seq[Blob].default)): cscStep[rootKey][0].inc let