From 8c7d91512b343073147960f31df28d7d7a09c9ed Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Fri, 14 Oct 2022 17:40:32 +0100 Subject: [PATCH] Prep for full sync after snap mark2 (#1263) * Rename `LeafRange` => `NodeTagRange` * Replacing storage slot partition point by interval why: The partition point only allows to describe slots `[point,high(Uint256)]` for fetching interval slot ranges. This has been generalised for any interval. * Replacing `SnapAccountRanges` by `SnapTrieRangeBatch` why: Generalised healing status for accounts, and later for storage slots. * Improve accounts healing loop * Split `snap_db` into accounts and storage modules why: It is cleaner to have separate session descriptors for accounts and storage slots (based on a common base descriptor.) Also, persistent storage handling might be changed in future which requires the storage slot implementation disentangled from the accounts handling. * Re-model worker queues for storage slots why: There is a dynamic list of storage sub-tries, each one has to be treated similar to the accounts database. This applied to slot interval downloads as well as to healing * Compress some return value report lists for snapdb methods why: No need to report all handling details for work items that are filteres out and discarded, anyway. * Remove inner loop frame from healing function why: The healing function runs as a loop body already. --- nimbus/sync/snap/range_desc.nim | 57 +- nimbus/sync/snap/worker.nim | 52 +- nimbus/sync/snap/worker/com/com_error.nim | 2 + .../snap/worker/com/get_account_range.nim | 16 +- .../snap/worker/com/get_storage_ranges.nim | 68 +- nimbus/sync/snap/worker/db/hexary_desc.nim | 18 +- nimbus/sync/snap/worker/db/hexary_import.nim | 44 +- nimbus/sync/snap/worker/db/hexary_inspect.nim | 28 +- nimbus/sync/snap/worker/db/snap_db.nim | 802 ------------------ .../sync/snap/worker/db/snapdb_accounts.nim | 483 +++++++++++ nimbus/sync/snap/worker/db/snapdb_desc.nim | 252 ++++++ .../snap/worker/db/snapdb_storage_slots.nim | 245 ++++++ nimbus/sync/snap/worker/heal_accounts.nim | 487 ++++++----- nimbus/sync/snap/worker/store_accounts.nim | 32 +- nimbus/sync/snap/worker/store_storages.nim | 194 +++-- nimbus/sync/snap/worker_desc.nim | 80 +- tests/test_sync_snap.nim | 59 +- 17 files changed, 1574 insertions(+), 1345 deletions(-) delete mode 100644 nimbus/sync/snap/worker/db/snap_db.nim create mode 100644 nimbus/sync/snap/worker/db/snapdb_accounts.nim create mode 100644 nimbus/sync/snap/worker/db/snapdb_desc.nim create mode 100644 nimbus/sync/snap/worker/db/snapdb_storage_slots.nim diff --git a/nimbus/sync/snap/range_desc.nim b/nimbus/sync/snap/range_desc.nim index 3df2ec432..71019c90b 100644 --- a/nimbus/sync/snap/range_desc.nim +++ b/nimbus/sync/snap/range_desc.nim @@ -31,14 +31,12 @@ type ## Hash key without the hash wrapper (as opposed to `NodeTag` which is a ## number) - LeafRange* = ##\ + NodeTagRange* = Interval[NodeTag,UInt256] ## Interval `[minPt,maxPt]` of` NodeTag` elements, can be managed in an ## `IntervalSet` data type. - Interval[NodeTag,UInt256] - LeafRangeSet* = ##\ - ## Managed structure to handle non-adjacent `LeafRange` intervals - IntervalSetRef[NodeTag,UInt256] + NodeTagRangeSet* = IntervalSetRef[NodeTag,UInt256] + ## Managed structure to handle non-adjacent `NodeTagRange` intervals PackedAccountRange* = object ## Re-packed version of `SnapAccountRange`. The reason why repacking is @@ -56,15 +54,15 @@ type AccountSlotsHeader* = object ## Storage root header - accHash*: Hash256 ## Owner account, maybe unnecessary - storageRoot*: Hash256 ## Start of storage tree - firstSlot*: Hash256 ## Continuation if non-zero + accHash*: Hash256 ## Owner account, maybe unnecessary + storageRoot*: Hash256 ## Start of storage tree + subRange*: Option[NodeTagRange] ## Sub-range of slot range covered AccountStorageRange* = object ## List of storage descriptors, the last `AccountSlots` storage data might ## be incomplete and tthe `proof` is needed for proving validity. - storages*: seq[AccountSlots] ## List of accounts and storage data - proof*: SnapStorageProof ## Boundary proofs for last entry + storages*: seq[AccountSlots] ## List of accounts and storage data + proof*: SnapStorageProof ## Boundary proofs for last entry AccountSlots* = object ## Account storage descriptor @@ -108,7 +106,7 @@ proc to*(n: SomeUnsignedInt|UInt256; T: type NodeTag): T = # ------------------------------------------------------------------------------ proc init*(key: var NodeKey; data: openArray[byte]): bool = - ## ## Import argument `data` into `key` which must have length either `32`, ot + ## Import argument `data` into `key` which must have length either `32`, or ## `0`. The latter case is equivalent to an all zero byte array of size `32`. if data.len == 32: (addr key.ByteArray32[0]).copyMem(unsafeAddr data[0], data.len) @@ -136,7 +134,7 @@ proc append*(writer: var RlpWriter, nid: NodeTag) = writer.append(nid.to(Hash256)) # ------------------------------------------------------------------------------ -# Public `NodeTag` and `LeafRange` functions +# Public `NodeTag` and `NodeTagRange` functions # ------------------------------------------------------------------------------ proc u256*(lp: NodeTag): UInt256 = lp.UInt256 @@ -161,8 +159,28 @@ proc digestTo*(data: Blob; T: type NodeTag): T = ## Hash the `data` argument keccakHash(data).to(T) +# ------------------------------------------------------------------------------ +# Public functions: `NodeTagRange` helpers +# ------------------------------------------------------------------------------ -proc emptyFactor*(lrs: LeafRangeSet): float = +proc isEmpty*(lrs: NodeTagRangeSet): bool = + ## Returns `true` if the argument set `lrs` of intervals is empty + lrs.total == 0 and lrs.chunks == 0 + +proc isEmpty*(lrs: openArray[NodeTagRangeSet]): bool = + ## Variant of `isEmpty()` where intervals are distributed across several + ## sets. + for ivSet in lrs: + if 0 < ivSet.total or 0 < ivSet.chunks: + return false + +proc isFull*(lrs: NodeTagRangeSet): bool = + ## Returns `true` if the argument set `lrs` contains of the single + ## interval [low(NodeTag),high(NodeTag)]. + lrs.total == 0 and 0 < lrs.chunks + + +proc emptyFactor*(lrs: NodeTagRangeSet): float = ## Relative uncovered total, i.e. `#points-not-covered / 2^256` to be used ## in statistics or triggers. if 0 < lrs.total: @@ -172,7 +190,7 @@ proc emptyFactor*(lrs: LeafRangeSet): float = else: 0.0 # number of points in `lrs` is `2^256 + 1` -proc emptyFactor*(lrs: openArray[LeafRangeSet]): float = +proc emptyFactor*(lrs: openArray[NodeTagRangeSet]): float = ## Variant of `emptyFactor()` where intervals are distributed across several ## sets. This function makes sense only if the interval sets are mutually ## disjunct. @@ -186,9 +204,11 @@ proc emptyFactor*(lrs: openArray[LeafRangeSet]): float = discard else: # number of points in `ivSet` is `2^256 + 1` return 0.0 + if accu == 0.to(NodeTag): + return 1.0 ((high(NodeTag) - accu).u256 + 1).to(float) / (2.0^256) -proc fullFactor*(lrs: LeafRangeSet): float = +proc fullFactor*(lrs: NodeTagRangeSet): float = ## Relative covered total, i.e. `#points-covered / 2^256` to be used ## in statistics or triggers if 0 < lrs.total: @@ -198,8 +218,9 @@ proc fullFactor*(lrs: LeafRangeSet): float = else: 1.0 # number of points in `lrs` is `2^256 + 1` - -# Printing & pretty printing +# ------------------------------------------------------------------------------ +# Public functions: printing & pretty printing +# ------------------------------------------------------------------------------ proc `$`*(nt: NodeTag): string = if nt == high(NodeTag): @@ -221,7 +242,7 @@ proc `$`*(a, b: NodeTag): string = ## Prettyfied prototype leafRangePp(a,b) -proc `$`*(iv: LeafRange): string = +proc `$`*(iv: NodeTagRange): string = leafRangePp(iv.minPt, iv.maxPt) # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker.nim b/nimbus/sync/snap/worker.nim index d55c6eacb..5a878348a 100644 --- a/nimbus/sync/snap/worker.nim +++ b/nimbus/sync/snap/worker.nim @@ -18,7 +18,7 @@ import ".."/[protocol, sync_desc], ./worker/[heal_accounts, store_accounts, store_storages, ticker], ./worker/com/[com_error, get_block_header], - ./worker/db/snap_db, + ./worker/db/snapdb_desc, "."/[range_desc, worker_desc] const @@ -107,38 +107,38 @@ proc pivotStop(buddy: SnapBuddyRef) = # Private functions # ------------------------------------------------------------------------------ -proc init(T: type SnapAccountRanges; ctx: SnapCtxRef): T = - ## Return a pair of account hash range lists with the whole range of - ## smartly spread `[low(NodeTag),high(NodeTag)]` across the mutually - ## disjunct interval sets. - result = [LeafRangeSet.init(),LeafRangeSet.init()] +proc init(batch: var SnapTrieRangeBatch; ctx: SnapCtxRef) = + ## Returns a pair of account hash range lists with the full range of hashes + ## smartly spread across the mutually disjunct interval sets. + for n in 0 ..< batch.unprocessed.len: + batch.unprocessed[n] = NodeTagRangeSet.init() # Initialise accounts range fetch batch, the pair of `fetchAccounts[]` # range sets. if ctx.data.coveredAccounts.total == 0 and ctx.data.coveredAccounts.chunks == 1: - # 100% of accounts covered by range fetch batches for the total - # of pivot environments. Do a random split distributing the range - # `[low(NodeTag),high(NodeTag)]` across the pair of range sats. + # All (i.e. 100%) of accounts hashes are covered by completed range fetch + # processes for all pivot environments. Do a random split distributing the + # full accounts hash range across the pair of range sats. var nodeKey: NodeKey ctx.data.rng[].generate(nodeKey.ByteArray32) let partition = nodeKey.to(NodeTag) - discard result[0].merge(partition, high(NodeTag)) + discard batch.unprocessed[0].merge(partition, high(NodeTag)) if low(NodeTag) < partition: - discard result[1].merge(low(NodeTag), partition - 1.u256) + discard batch.unprocessed[1].merge(low(NodeTag), partition - 1.u256) else: # Not all account hashes are covered, yet. So keep the uncovered # account hashes in the first range set, and the other account hashes # in the second range set. # Pre-filled with the first range set with largest possible interval - discard result[0].merge(low(NodeTag),high(NodeTag)) + discard batch.unprocessed[0].merge(low(NodeTag),high(NodeTag)) # Move covered account ranges (aka intervals) to the second set. for iv in ctx.data.coveredAccounts.increasing: - discard result[0].reduce(iv) - discard result[1].merge(iv) + discard batch.unprocessed[0].reduce(iv) + discard batch.unprocessed[1].merge(iv) proc appendPivotEnv(buddy: SnapBuddyRef; header: BlockHeader) = @@ -161,17 +161,16 @@ proc appendPivotEnv(buddy: SnapBuddyRef; header: BlockHeader) = # where the queue is assumed to have increasing block numbers. if minNumber <= header.blockNumber: # Ok, append a new environment - let env = SnapPivotRef( - stateHeader: header, - fetchAccounts: SnapAccountRanges.init(ctx)) + let env = SnapPivotRef(stateHeader: header) + env.fetchAccounts.init(ctx) # Append per-state root environment to LRU queue discard ctx.data.pivotTable.lruAppend(header.stateRoot, env, ctx.buddiesMax) # Debugging, will go away block: - let ivSet = env.fetchAccounts[0].clone - for iv in env.fetchAccounts[1].increasing: + let ivSet = env.fetchAccounts.unprocessed[0].clone + for iv in env.fetchAccounts.unprocessed[1].increasing: doAssert ivSet.merge(iv) == iv.len doAssert ivSet.chunks == 1 doAssert ivSet.total == 0 @@ -246,7 +245,7 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater = aSqSum += aLen * aLen # Fill utilisation mean & variance - let fill = kvp.data.fetchAccounts.emptyFactor + let fill = kvp.data.fetchAccounts.unprocessed.emptyFactor uSum += fill uSqSum += fill * fill @@ -274,7 +273,7 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater = proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool = ## Global set up - ctx.data.coveredAccounts = LeafRangeSet.init() + ctx.data.coveredAccounts = NodeTagRangeSet.init() ctx.data.snapDb = if ctx.data.dbBackend.isNil: SnapDbRef.init(ctx.chain.getTrieDB) else: SnapDbRef.init(ctx.data.dbBackend) @@ -371,7 +370,7 @@ proc runPool*(buddy: SnapBuddyRef, last: bool) = if not env.serialSync: # Check whether accounts download is complete block checkAccountsComplete: - for ivSet in env.fetchAccounts: + for ivSet in env.fetchAccounts.unprocessed: if ivSet.chunks != 0: break checkAccountsComplete env.accountsDone = true @@ -407,19 +406,22 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} = else: # Snapshot sync processing. Note that *serialSync => accountsDone*. - await buddy.storeStorages() # always pre-clean the queue await buddy.storeAccounts() + if buddy.ctrl.stopped: return + await buddy.storeStorages() + if buddy.ctrl.stopped: return # If the current database is not complete yet - if 0 < env.fetchAccounts[0].chunks or - 0 < env.fetchAccounts[1].chunks: + if 0 < env.fetchAccounts.unprocessed[0].chunks or + 0 < env.fetchAccounts.unprocessed[1].chunks: # Healing applies to the latest pivot only. The pivot might have changed # in the background (while netwoking) due to a new peer worker that has # negotiated another, newer pivot. if env == ctx.data.pivotTable.lastValue.value: await buddy.healAccountsDb() + if buddy.ctrl.stopped: return # TODO: use/apply storage healer diff --git a/nimbus/sync/snap/worker/com/com_error.nim b/nimbus/sync/snap/worker/com/com_error.nim index 0165f694e..3244943a8 100644 --- a/nimbus/sync/snap/worker/com/com_error.nim +++ b/nimbus/sync/snap/worker/com/com_error.nim @@ -30,6 +30,7 @@ type ComAccountsMaxTooLarge ComAccountsMinTooSmall ComEmptyAccountsArguments + ComEmptyPartialRange ComEmptyRequestArguments ComMissingProof ComNetworkProblem @@ -80,6 +81,7 @@ proc stopAfterSeriousComError*( of ComEmptyAccountsArguments, ComEmptyRequestArguments, + ComEmptyPartialRange, ComInspectDbFailed, ComImportAccountsFailed, ComNoDataForProof, diff --git a/nimbus/sync/snap/worker/com/get_account_range.nim b/nimbus/sync/snap/worker/com/get_account_range.nim index 68c8922c6..256474e4a 100644 --- a/nimbus/sync/snap/worker/com/get_account_range.nim +++ b/nimbus/sync/snap/worker/com/get_account_range.nim @@ -28,7 +28,7 @@ logScope: type GetAccountRange* = object - consumed*: LeafRange ## Real accounts interval covered + consumed*: NodeTagRange ## Real accounts interval covered data*: PackedAccountRange ## Re-packed reply data withStorage*: seq[AccountSlotsHeader] ## Accounts with non-idle storage root @@ -39,7 +39,7 @@ type proc getAccountRangeReq( buddy: SnapBuddyRef; root: Hash256; - iv: LeafRange + iv: NodeTagRange; ): Future[Result[Option[SnapAccountRange],void]] {.async.} = let peer = buddy.peer @@ -59,7 +59,7 @@ proc getAccountRangeReq( proc getAccountRange*( buddy: SnapBuddyRef; stateRoot: Hash256; - iv: LeafRange + iv: NodeTagRange; ): Future[Result[GetAccountRange,ComError]] {.async.} = ## Fetch data using the `snap#` protocol, returns the range covered. let @@ -114,7 +114,7 @@ proc getAccountRange*( # So there is no data, otherwise an account beyond the interval end # `iv.maxPt` would have been returned. - dd.consumed = LeafRange.new(iv.minPt, high(NodeTag)) + dd.consumed = NodeTagRange.new(iv.minPt, high(NodeTag)) trace trSnapRecvReceived & "terminal AccountRange", peer, nAccounts, nProof, accRange=dd.consumed, reqRange=iv, stateRoot return ok(dd) @@ -133,14 +133,14 @@ proc getAccountRange*( # across both. if 0.to(NodeTag) < iv.minPt: trace trSnapRecvProtocolViolation & "proof-less AccountRange", peer, - nAccounts, nProof, accRange=LeafRange.new(iv.minPt, accMaxPt), + nAccounts, nProof, accRange=NodeTagRange.new(iv.minPt, accMaxPt), reqRange=iv, stateRoot return err(ComMissingProof) if accMinPt < iv.minPt: # Not allowed trace trSnapRecvProtocolViolation & "min too small in AccountRange", peer, - nAccounts, nProof, accRange=LeafRange.new(accMinPt, accMaxPt), + nAccounts, nProof, accRange=NodeTagRange.new(accMinPt, accMaxPt), reqRange=iv, stateRoot return err(ComAccountsMinTooSmall) @@ -157,11 +157,11 @@ proc getAccountRange*( if iv.maxPt < dd.data.accounts[^2].accHash.to(NodeTag): # The segcond largest should not excceed the top one requested. trace trSnapRecvProtocolViolation & "AccountRange top exceeded", peer, - nAccounts, nProof, accRange=LeafRange.new(iv.minPt, accMaxPt), + nAccounts, nProof, accRange=NodeTagRange.new(iv.minPt, accMaxPt), reqRange=iv, stateRoot return err(ComAccountsMaxTooLarge) - dd.consumed = LeafRange.new(iv.minPt, max(iv.maxPt,accMaxPt)) + dd.consumed = NodeTagRange.new(iv.minPt, max(iv.maxPt,accMaxPt)) trace trSnapRecvReceived & "AccountRange", peer, nAccounts, nProof, accRange=dd.consumed, reqRange=iv, stateRoot diff --git a/nimbus/sync/snap/worker/com/get_storage_ranges.nim b/nimbus/sync/snap/worker/com/get_storage_ranges.nim index 1bc8dc318..67e3d958d 100644 --- a/nimbus/sync/snap/worker/com/get_storage_ranges.nim +++ b/nimbus/sync/snap/worker/com/get_storage_ranges.nim @@ -33,7 +33,7 @@ type # proof*: SnapStorageProof GetStorageRanges* = object - leftOver*: seq[SnapSlotQueueItemRef] + leftOver*: seq[AccountSlotsHeader] data*: AccountStorageRange const @@ -47,7 +47,7 @@ proc getStorageRangesReq( buddy: SnapBuddyRef; root: Hash256; accounts: seq[Hash256], - iv: Option[LeafRange] + iv: Option[NodeTagRange] ): Future[Result[Option[SnapStorageRanges],void]] {.async.} = let @@ -78,18 +78,6 @@ proc getStorageRangesReq( # Public functions # ------------------------------------------------------------------------------ -proc addLeftOver*( - dd: var GetStorageRanges; ## Descriptor - accounts: seq[AccountSlotsHeader]; ## List of items to re-queue - forceNew = false; ## Begin new block regardless - ) = - ## Helper for maintaining the `leftOver` queue - if 0 < accounts.len: - if accounts[0].firstSlot != Hash256() or dd.leftOver.len == 0: - dd.leftOver.add SnapSlotQueueItemRef(q: accounts) - else: - dd.leftOver[^1].q = dd.leftOver[^1].q & accounts - proc getStorageRanges*( buddy: SnapBuddyRef; stateRoot: Hash256; @@ -106,14 +94,9 @@ proc getStorageRanges*( peer = buddy.peer var nAccounts = accounts.len - maybeIv = none(LeafRange) if nAccounts == 0: return err(ComEmptyAccountsArguments) - if accounts[0].firstSlot != Hash256(): - # Set up for range - maybeIv = some(LeafRange.new( - accounts[0].firstSlot.to(NodeTag), high(NodeTag))) if trSnapTracePacketsOk: trace trSnapSendSending & "GetStorageRanges", peer, @@ -121,7 +104,7 @@ proc getStorageRanges*( let snStoRanges = block: let rc = await buddy.getStorageRangesReq( - stateRoot, accounts.mapIt(it.accHash), maybeIv) + stateRoot, accounts.mapIt(it.accHash), accounts[0].subRange) if rc.isErr: return err(ComNetworkProblem) if rc.value.isNone: @@ -153,42 +136,41 @@ proc getStorageRanges*( # Assemble return structure for given peer response var dd = GetStorageRanges(data: AccountStorageRange(proof: snStoRanges.proof)) - # Filter `slots` responses: + # Filter remaining `slots` responses: # * Accounts for empty ones go back to the `leftOver` list. for n in 0 ..< nSlots: # Empty data for a slot indicates missing data if snStoRanges.slots[n].len == 0: - dd.addLeftOver @[accounts[n]] + dd.leftOver.add accounts[n] else: dd.data.storages.add AccountSlots( account: accounts[n], # known to be no fewer accounts than slots - data: snStoRanges.slots[n]) + data: snStoRanges.slots[n]) # Complete the part that was not answered by the peer if nProof == 0: - dd.addLeftOver accounts[nSlots ..< nAccounts] # empty slice is ok + dd.leftOver = dd.leftOver & accounts[nSlots ..< nAccounts] # empty slice ok + + # Ok, we have a proof now. What was it to be proved? + elif snStoRanges.slots[^1].len == 0: + return err(ComNoDataForProof) # Now way to prove an empty node set + else: - if snStoRanges.slots[^1].len == 0: - # `dd.data.storages.len == 0` => `snStoRanges.slots[^1].len == 0` as - # it was confirmed that `0 < nSlots == snStoRanges.slots.len` - return err(ComNoDataForProof) - - # If the storage data for the last account comes with a proof, then it is - # incomplete. So record the missing part on the `dd.leftOver` list. - let top = dd.data.storages[^1].data[^1].slotHash.to(NodeTag) - - # Contrived situation with `top==high()`: any right proof will be useless - # so it is just ignored (i.e. `firstSlot` is zero in first slice.) - if top < high(NodeTag): - dd.addLeftOver @[AccountSlotsHeader( - firstSlot: (top + 1.u256).to(Hash256), + # If the storage data for the last account comes with a proof, then the data + # set is incomplete. So record the missing part on the `dd.leftOver` list. + let + reqTop = if accounts[0].subRange.isNone: high(NodeTag) + else: accounts[0].subRange.unsafeGet.maxPt + respTop = dd.data.storages[^1].data[^1].slotHash.to(NodeTag) + if respTop < reqTop: + dd.leftOver.add AccountSlotsHeader( + subRange: some(NodeTagRange.new(respTop + 1.u256, reqTop)), accHash: accounts[nSlots-1].accHash, - storageRoot: accounts[nSlots-1].storageRoot)] - dd.addLeftOver accounts[nSlots ..< nAccounts] # empty slice is ok + storageRoot: accounts[nSlots-1].storageRoot) + dd.leftOver = dd.leftOver & accounts[nSlots ..< nAccounts] # empty slice ok - let nLeftOver = dd.leftOver.foldl(a + b.q.len, 0) - trace trSnapRecvReceived & "StorageRanges", peer, - nAccounts, nSlots, nProof, nLeftOver, stateRoot + trace trSnapRecvReceived & "StorageRanges", peer, nAccounts, nSlots, nProof, + nLeftOver=dd.leftOver.len, stateRoot return ok(dd) diff --git a/nimbus/sync/snap/worker/db/hexary_desc.nim b/nimbus/sync/snap/worker/db/hexary_desc.nim index 4e816f5ff..0e94fafb7 100644 --- a/nimbus/sync/snap/worker/db/hexary_desc.nim +++ b/nimbus/sync/snap/worker/db/hexary_desc.nim @@ -12,7 +12,8 @@ import std/[algorithm, hashes, sequtils, sets, strutils, tables], eth/[common/eth_types, p2p, trie/nibbles], stint, - ../../range_desc + ../../range_desc, + ./hexary_error {.push raises: [Defect].} @@ -136,7 +137,6 @@ type TrieNodeStat* = object ## Trie inspection report dangling*: seq[Blob] ## Paths from nodes with incomplete refs - leaves*: seq[NodeKey] ## Paths to leave nodes (if any) level*: int ## Maximim nesting depth of dangling nodes stopped*: bool ## Potential loop detected if `true` @@ -150,6 +150,11 @@ type ## Persistent database get() function. For read-only cacses, this function ## can be seen as the persistent alternative to `HexaryTreeDbRef`. + HexaryNodeReport* = object + ## Return code for single node operations + slot*: Option[int] ## May refer to indexed argument slots + kind*: Option[NodeKind] ## Node type (if any) + error*: HexaryDbError ## Error code, or `NothingSerious` const EmptyNodeBlob* = seq[byte].default @@ -286,12 +291,6 @@ proc ppDangling(a: seq[Blob]; maxItems = 30): string = andMore = if maxItems < a.len: ", ..[#" & $a.len & "].." else: "" "{" & q.join(",") & andMore & "}" -proc ppLeaves(a: openArray[NodeKey]; db: HexaryTreeDbRef; maxItems=30): string = - let - q = a.mapIt(it.ppImpl(db))[0 ..< min(maxItems,a.len)] - andMore = if maxItems < a.len: ", ..[#" & $a.len & "].." else: "" - "{" & q.join(",") & andMore & "}" - # ------------------------------------------------------------------------------ # Public debugging helpers # ------------------------------------------------------------------------------ @@ -339,8 +338,7 @@ proc pp*(a: TrieNodeStat; db: HexaryTreeDbRef; maxItems = 30): string = if a.stopped: result &= "stopped," result &= $a.dangling.len & "," & - a.dangling.ppDangling(maxItems) & "," & - a.leaves.ppLeaves(db, maxItems) & ")" + a.dangling.ppDangling(maxItems) & ")" # ------------------------------------------------------------------------------ # Public constructor (or similar) diff --git a/nimbus/sync/snap/worker/db/hexary_import.nim b/nimbus/sync/snap/worker/db/hexary_import.nim index 3319c9f18..97078b186 100644 --- a/nimbus/sync/snap/worker/db/hexary_import.nim +++ b/nimbus/sync/snap/worker/db/hexary_import.nim @@ -33,10 +33,12 @@ proc hexaryImport*( recData: Blob; ## Node to add unrefNodes: var HashSet[RepairKey]; ## Keep track of freestanding nodes nodeRefs: var HashSet[RepairKey]; ## Ditto - ): Result[void,HexaryDbError] + ): HexaryNodeReport {.gcsafe, raises: [Defect, RlpError, KeyError].} = ## Decode a single trie item for adding to the table and add it to the ## database. Branch and exrension record links are collected. + if recData.len == 0: + return HexaryNodeReport(error: RlpNonEmptyBlobExpected) let nodeKey = recData.digestTo(NodeKey) repairKey = nodeKey.to(RepairKey) # for repair table @@ -53,29 +55,29 @@ proc hexaryImport*( case top of 0, 1: if not w.isBlob: - return err(RlpBlobExpected) + return HexaryNodeReport(error: RlpBlobExpected) blobs[top] = rlp.read(Blob) of 2 .. 15: var key: NodeKey if not key.init(rlp.read(Blob)): - return err(RlpBranchLinkExpected) + return HexaryNodeReport(error: RlpBranchLinkExpected) # Update ref pool links[top] = key.to(RepairKey) unrefNodes.excl links[top] # is referenced, now (if any) nodeRefs.incl links[top] of 16: if not w.isBlob: - return err(RlpBlobExpected) + return HexaryNodeReport(error: RlpBlobExpected) blob16 = rlp.read(Blob) else: - return err(Rlp2Or17ListEntries) + return HexaryNodeReport(error: Rlp2Or17ListEntries) top.inc # Verify extension data case top of 2: if blobs[0].len == 0: - return err(RlpNonEmptyBlobExpected) + return HexaryNodeReport(error: RlpNonEmptyBlobExpected) let (isLeaf, pathSegment) = hexPrefixDecode blobs[0] if isLeaf: rNode = RNodeRef( @@ -85,7 +87,7 @@ proc hexaryImport*( else: var key: NodeKey if not key.init(blobs[1]): - return err(RlpExtPathEncoding) + return HexaryNodeReport(error: RlpExtPathEncoding) # Update ref pool rNode = RNodeRef( kind: Extension, @@ -97,7 +99,7 @@ proc hexaryImport*( for n in [0,1]: var key: NodeKey if not key.init(blobs[n]): - return err(RlpBranchLinkExpected) + return HexaryNodeReport(error: RlpBranchLinkExpected) # Update ref pool links[n] = key.to(RepairKey) unrefNodes.excl links[n] # is referenced, now (if any) @@ -118,17 +120,19 @@ proc hexaryImport*( unrefNodes.incl repairKey # keep track of stray nodes elif db.tab[repairKey].convertTo(Blob) != recData: - return err(DifferentNodeValueExists) + return HexaryNodeReport(error: DifferentNodeValueExists) - ok() + HexaryNodeReport(kind: some(rNode.kind)) proc hexaryImport*( db: HexaryTreeDbRef; ## Contains node table recData: Blob; ## Node to add - ): Result[void,HexaryDbError] + ): HexaryNodeReport {.gcsafe, raises: [Defect, RlpError, KeyError].} = ## Ditto without referece checks + if recData.len == 0: + return HexaryNodeReport(error: RlpNonEmptyBlobExpected) let nodeKey = recData.digestTo(NodeKey) repairKey = nodeKey.to(RepairKey) # for repair table @@ -145,27 +149,27 @@ proc hexaryImport*( case top of 0, 1: if not w.isBlob: - return err(RlpBlobExpected) + return HexaryNodeReport(error: RlpBlobExpected) blobs[top] = rlp.read(Blob) of 2 .. 15: var key: NodeKey if not key.init(rlp.read(Blob)): - return err(RlpBranchLinkExpected) + return HexaryNodeReport(error: RlpBranchLinkExpected) # Update ref pool links[top] = key.to(RepairKey) of 16: if not w.isBlob: - return err(RlpBlobExpected) + return HexaryNodeReport(error: RlpBlobExpected) blob16 = rlp.read(Blob) else: - return err(Rlp2Or17ListEntries) + return HexaryNodeReport(error: Rlp2Or17ListEntries) top.inc # Verify extension data case top of 2: if blobs[0].len == 0: - return err(RlpNonEmptyBlobExpected) + return HexaryNodeReport(error: RlpNonEmptyBlobExpected) let (isLeaf, pathSegment) = hexPrefixDecode blobs[0] if isLeaf: rNode = RNodeRef( @@ -175,7 +179,7 @@ proc hexaryImport*( else: var key: NodeKey if not key.init(blobs[1]): - return err(RlpExtPathEncoding) + return HexaryNodeReport(error: RlpExtPathEncoding) # Update ref pool rNode = RNodeRef( kind: Extension, @@ -185,7 +189,7 @@ proc hexaryImport*( for n in [0,1]: var key: NodeKey if not key.init(blobs[n]): - return err(RlpBranchLinkExpected) + return HexaryNodeReport(error: RlpBranchLinkExpected) # Update ref pool links[n] = key.to(RepairKey) rNode = RNodeRef( @@ -200,9 +204,9 @@ proc hexaryImport*( db.tab[repairKey] = rNode elif db.tab[repairKey].convertTo(Blob) != recData: - return err(DifferentNodeValueExists) + return HexaryNodeReport(error: DifferentNodeValueExists) - ok() + HexaryNodeReport(kind: some(rNode.kind)) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/db/hexary_inspect.nim b/nimbus/sync/snap/worker/db/hexary_inspect.nim index 9ca1edd2c..1fcbf609b 100644 --- a/nimbus/sync/snap/worker/db/hexary_inspect.nim +++ b/nimbus/sync/snap/worker/db/hexary_inspect.nim @@ -210,7 +210,6 @@ proc hexaryInspectTrie*( db: HexaryTreeDbRef; ## Database root: NodeKey; ## State root paths: seq[Blob]; ## Starting paths for search - maxLeafPaths = 0; ## Record leaves with proper 32 bytes path stopAtLevel = 32; ## Instead of loop detector ): TrieNodeStat {.gcsafe, raises: [Defect,KeyError]} = @@ -218,22 +217,19 @@ proc hexaryInspectTrie*( ## the hexary trie which have at least one node key reference missing in ## the trie database. The references for these nodes are collected and ## returned. - ## * At most `maxLeafPaths` leaf node references are collected along the way. ## * Search list `paths` argument entries that do not refer to a hexary node ## are ignored. ## * For any search list `paths` argument entry, this function stops if ## the search depth exceeds `stopAtLevel` levels of linked sub-nodes. - ## * Argument `paths` list entries that do not refer to a valid node are - ## silently ignored. + ## * Argument `paths` list entries and partial paths on the way that do not + ## refer to a valid extension or branch type node are silently ignored. ## let rootKey = root.to(RepairKey) if not db.tab.hasKey(rootKey): return TrieNodeStat() # Initialise TODO list - var - leafSlots = maxLeafPaths - reVisit = newTable[RepairKey,NibblesSeq]() + var reVisit = newTable[RepairKey,NibblesSeq]() if paths.len == 0: reVisit[rootKey] = EmptyNibbleRange else: @@ -269,12 +265,8 @@ proc hexaryInspectTrie*( child = node.bLink[n] db.processLink(stats=result, inspect=again, parent, trail, child) of Leaf: - if 0 < leafSlots: - let trail = parentTrail & node.lPfx - if trail.len == 64: - result.leaves.add trail.getBytes.convertTo(NodeKey) - leafSlots.dec - # Done with this link + # Ooops, forget node and key + discard # End `for` @@ -287,7 +279,6 @@ proc hexaryInspectTrie*( getFn: HexaryGetFn; ## Database abstraction rootKey: NodeKey; ## State root paths: seq[Blob]; ## Starting paths for search - maxLeafPaths = 0; ## Record leaves with proper 32 bytes path stopAtLevel = 32; ## Instead of loop detector ): TrieNodeStat {.gcsafe, raises: [Defect,RlpError,KeyError]} = @@ -303,9 +294,7 @@ proc hexaryInspectTrie*( return TrieNodeStat() # Initialise TODO list - var - leafSlots = maxLeafPaths - reVisit = newTable[NodeKey,NibblesSeq]() + var reVisit = newTable[NodeKey,NibblesSeq]() if paths.len == 0: reVisit[rootKey] = EmptyNibbleRange else: @@ -343,11 +332,6 @@ proc hexaryInspectTrie*( trail = parentTrail & xPfx child = nodeRlp.listElem(1) getFn.processLink(stats=result, inspect=again, parent, trail, child) - elif 0 < leafSlots: - let trail = parentTrail & xPfx - if trail.len == 64: - result.leaves.add trail.getBytes.convertTo(NodeKey) - leafSlots.dec of 17: for n in 0 ..< 16: let diff --git a/nimbus/sync/snap/worker/db/snap_db.nim b/nimbus/sync/snap/worker/db/snap_db.nim deleted file mode 100644 index 79fc24a2f..000000000 --- a/nimbus/sync/snap/worker/db/snap_db.nim +++ /dev/null @@ -1,802 +0,0 @@ -# nimbus-eth1 -# Copyright (c) 2021 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or distributed -# except according to those terms. - -import - std/[algorithm, sequtils, sets, strutils, tables, times], - chronos, - eth/[common/eth_types, p2p, rlp], - eth/trie/[db, nibbles], - stew/byteutils, - stint, - rocksdb, - ../../../../constants, - ../../../../db/[kvstore_rocksdb, select_backend], - "../../.."/[protocol, types], - ../../range_desc, - "."/[bulk_storage, hexary_desc, hexary_error, hexary_import, - hexary_interpolate, hexary_inspect, hexary_paths, rocky_bulk_load] - -{.push raises: [Defect].} - -logScope: - topics = "snap-db" - -export - HexaryDbError, - TrieNodeStat - -const - extraTraceMessages = false or true - -type - SnapDbRef* = ref object - ## Global, re-usable descriptor - db: TrieDatabaseRef ## General database - rocky: RocksStoreRef ## Set if rocksdb is available - - SnapDbSessionRef* = ref object - ## Database session descriptor - keyMap: Table[RepairKey,uint] ## For debugging only (will go away) - base: SnapDbRef ## Back reference to common parameters - peer: Peer ## For log messages - accRoot: NodeKey ## Current accounts root node - accDb: HexaryTreeDbRef ## Accounts database - stoDb: HexaryTreeDbRef ## Storage database - -# ------------------------------------------------------------------------------ -# Private helpers -# ------------------------------------------------------------------------------ - -proc newHexaryTreeDbRef(ps: SnapDbSessionRef): HexaryTreeDbRef = - HexaryTreeDbRef(keyPp: ps.stoDb.keyPp) # for debugging, will go away - -proc to(h: Hash256; T: type NodeKey): T = - h.data.T - -proc convertTo(data: openArray[byte]; T: type Hash256): T = - discard result.data.NodeKey.init(data) # size error => zero - -template noKeyError(info: static[string]; code: untyped) = - try: - code - except KeyError as e: - raiseAssert "Not possible (" & info & "): " & e.msg - -template noRlpExceptionOops(info: static[string]; code: untyped) = - try: - code - except RlpError: - return err(RlpEncoding) - except KeyError as e: - raiseAssert "Not possible (" & info & "): " & e.msg - except Defect as e: - raise e - except Exception as e: - raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg - -# ------------------------------------------------------------------------------ -# Private debugging helpers -# ------------------------------------------------------------------------------ - -template noPpError(info: static[string]; code: untyped) = - try: - code - except ValueError as e: - raiseAssert "Inconveivable (" & info & "): " & e.msg - except KeyError as e: - raiseAssert "Not possible (" & info & "): " & e.msg - except Defect as e: - raise e - except Exception as e: - raiseAssert "Ooops (" & info & ") " & $e.name & ": " & e.msg - -proc toKey(a: RepairKey; ps: SnapDbSessionRef): uint = - if not a.isZero: - noPpError("pp(RepairKey)"): - if not ps.keyMap.hasKey(a): - ps.keyMap[a] = ps.keyMap.len.uint + 1 - result = ps.keyMap[a] - -proc toKey(a: NodeKey; ps: SnapDbSessionRef): uint = - a.to(RepairKey).toKey(ps) - -proc toKey(a: NodeTag; ps: SnapDbSessionRef): uint = - a.to(NodeKey).toKey(ps) - - -proc pp(a: NodeKey; ps: SnapDbSessionRef): string = - if a.isZero: "ø" else:"$" & $a.toKey(ps) - -proc pp(a: RepairKey; ps: SnapDbSessionRef): string = - if a.isZero: "ø" elif a.isNodeKey: "$" & $a.toKey(ps) else: "@" & $a.toKey(ps) - -proc pp(a: NodeTag; ps: SnapDbSessionRef): string = - a.to(NodeKey).pp(ps) - -# ------------------------------------------------------------------------------ -# Private functions -# ------------------------------------------------------------------------------ - -proc mergeProofs( - peer: Peer, ## For log messages - db: HexaryTreeDbRef; ## Database table - root: NodeKey; ## Root for checking nodes - proof: seq[Blob]; ## Node records - freeStandingOk = false; ## Remove freestanding nodes - ): Result[void,HexaryDbError] - {.gcsafe, raises: [Defect, RlpError, KeyError].} = - ## Import proof records (as received with snap message) into a hexary trie - ## of the repair table. These hexary trie records can be extended to a full - ## trie at a later stage and used for validating account data. - var - nodes: HashSet[RepairKey] - refs = @[root.to(RepairKey)].toHashSet - - for n,rlpRec in proof: - let rc = db.hexaryImport(rlpRec, nodes, refs) - if rc.isErr: - let error = rc.error - trace "mergeProofs()", peer, item=n, proofs=proof.len, error - return err(error) - - # Remove free standing nodes (if any) - if 0 < nodes.len: - let rest = nodes - refs - if 0 < rest.len: - if freeStandingOk: - trace "mergeProofs() detected unrelated nodes", peer, nodes=nodes.len - discard - else: - # Delete unreferenced nodes - for nodeKey in nodes: - db.tab.del(nodeKey) - trace "mergeProofs() ignoring unrelated nodes", peer, nodes=nodes.len - - ok() - - -proc persistentAccounts( - db: HexaryTreeDbRef; ## Current table - pv: SnapDbRef; ## Persistent database - ): Result[void,HexaryDbError] - {.gcsafe, raises: [Defect,OSError,KeyError].} = - ## Store accounts trie table on databse - if pv.rocky.isNil: - let rc = db.bulkStorageAccounts(pv.db) - if rc.isErr: return rc - else: - let rc = db.bulkStorageAccountsRocky(pv.rocky) - if rc.isErr: return rc - ok() - -proc persistentStorages( - db: HexaryTreeDbRef; ## Current table - pv: SnapDbRef; ## Persistent database - ): Result[void,HexaryDbError] - {.gcsafe, raises: [Defect,OSError,KeyError].} = - ## Store accounts trie table on databse - if pv.rocky.isNil: - let rc = db.bulkStorageStorages(pv.db) - if rc.isErr: return rc - else: - let rc = db.bulkStorageStoragesRocky(pv.rocky) - if rc.isErr: return rc - ok() - - -proc collectAccounts( - peer: Peer, ## for log messages - base: NodeTag; - acc: seq[PackedAccount]; - ): Result[seq[RLeafSpecs],HexaryDbError] - {.gcsafe, raises: [Defect, RlpError].} = - ## Repack account records into a `seq[RLeafSpecs]` queue. The argument data - ## `acc` are as received with the snap message `AccountRange`). - ## - ## The returned list contains leaf node information for populating a repair - ## table. The accounts, together with some hexary trie records for proofs - ## can be used for validating the argument account data. - var rcAcc: seq[RLeafSpecs] - - if acc.len != 0: - let pathTag0 = acc[0].accHash.to(NodeTag) - - # Verify lower bound - if pathTag0 < base: - let error = HexaryDbError.AccountSmallerThanBase - trace "collectAccounts()", peer, base, accounts=acc.len, error - return err(error) - - # Add base for the records (no payload). Note that the assumption - # holds: `rcAcc[^1].tag <= base` - if base < pathTag0: - rcAcc.add RLeafSpecs(pathTag: base) - - # Check for the case that accounts are appended - elif 0 < rcAcc.len and pathTag0 <= rcAcc[^1].pathTag: - let error = HexaryDbError.AccountsNotSrictlyIncreasing - trace "collectAccounts()", peer, base, accounts=acc.len, error - return err(error) - - # Add first account - rcAcc.add RLeafSpecs(pathTag: pathTag0, payload: acc[0].accBlob) - - # Veify & add other accounts - for n in 1 ..< acc.len: - let nodeTag = acc[n].accHash.to(NodeTag) - - if nodeTag <= rcAcc[^1].pathTag: - let error = AccountsNotSrictlyIncreasing - trace "collectAccounts()", peer, item=n, base, accounts=acc.len, error - return err(error) - - rcAcc.add RLeafSpecs(pathTag: nodeTag, payload: acc[n].accBlob) - - ok(rcAcc) - - -proc collectStorageSlots( - peer: Peer; - slots: seq[SnapStorage]; - ): Result[seq[RLeafSpecs],HexaryDbError] - {.gcsafe, raises: [Defect, RlpError].} = - ## Similar to `collectAccounts()` - var rcSlots: seq[RLeafSpecs] - - if slots.len != 0: - # Add initial account - rcSlots.add RLeafSpecs( - pathTag: slots[0].slotHash.to(NodeTag), - payload: slots[0].slotData) - - # Veify & add other accounts - for n in 1 ..< slots.len: - let nodeTag = slots[n].slotHash.to(NodeTag) - - if nodeTag <= rcSlots[^1].pathTag: - let error = SlotsNotSrictlyIncreasing - trace "collectStorageSlots()", peer, item=n, slots=slots.len, error - return err(error) - - rcSlots.add RLeafSpecs(pathTag: nodeTag, payload: slots[n].slotData) - - ok(rcSlots) - - -proc importStorageSlots*( - ps: SnapDbSessionRef; ## Re-usable session descriptor - data: AccountSlots; ## Account storage descriptor - proof: SnapStorageProof; ## Account storage proof - ): Result[void,HexaryDbError] - {.gcsafe, raises: [Defect, RlpError,KeyError].} = - ## Preocess storage slots for a particular storage root - let - stoRoot = data.account.storageRoot.to(NodeKey) - var - slots: seq[RLeafSpecs] - db = ps.newHexaryTreeDbRef() - - if 0 < proof.len: - let rc = ps.peer.mergeProofs(db, stoRoot, proof) - if rc.isErr: - return err(rc.error) - block: - let rc = ps.peer.collectStorageSlots(data.data) - if rc.isErr: - return err(rc.error) - slots = rc.value - block: - let rc = db.hexaryInterpolate(stoRoot, slots, bootstrap = (proof.len == 0)) - if rc.isErr: - return err(rc.error) - - # Commit to main descriptor - for k,v in db.tab.pairs: - if not k.isNodeKey: - return err(UnresolvedRepairNode) - ps.stoDb.tab[k] = v - - ok() - -# ------------------------------------------------------------------------------ -# Public constructor -# ------------------------------------------------------------------------------ - -proc init*( - T: type SnapDbRef; - db: TrieDatabaseRef - ): T = - ## Main object constructor - T(db: db) - -proc init*( - T: type SnapDbRef; - db: ChainDb - ): T = - ## Variant of `init()` allowing bulk import on rocksdb backend - result = T(db: db.trieDB, rocky: db.rocksStoreRef) - if not result.rocky.bulkStorageClearRockyCacheFile(): - result.rocky = nil - -proc init*( - T: type SnapDbSessionRef; - pv: SnapDbRef; - root: Hash256; - peer: Peer = nil - ): T = - ## Start a new session, do some actions an then discard the session - ## descriptor (probably after commiting data.) - let desc = SnapDbSessionRef( - base: pv, - peer: peer, - accRoot: root.to(NodeKey), - accDb: HexaryTreeDbRef(), - stoDb: HexaryTreeDbRef()) - - # Debugging, might go away one time ... - desc.accDb.keyPp = proc(key: RepairKey): string = key.pp(desc) - desc.stoDb.keyPp = desc.accDb.keyPp - - return desc - -proc dup*( - ps: SnapDbSessionRef; - root: Hash256; - peer: Peer; - ): SnapDbSessionRef = - ## Resume a session with different `root` key and `peer`. This new session - ## will access the same memory database as the `ps` argument session. - SnapDbSessionRef( - base: ps.base, - peer: peer, - accRoot: root.to(NodeKey), - accDb: ps.accDb, - stoDb: ps.stoDb) - -proc dup*( - ps: SnapDbSessionRef; - root: Hash256; - ): SnapDbSessionRef = - ## Variant of `dup()` without the `peer` argument. - ps.dup(root, ps.peer) - -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ - -proc dbBackendRocksDb*(pv: SnapDbRef): bool = - ## Returns `true` if rocksdb features are available - not pv.rocky.isNil - -proc dbBackendRocksDb*(ps: SnapDbSessionRef): bool = - ## Returns `true` if rocksdb features are available - not ps.base.rocky.isNil - -proc importAccounts*( - ps: SnapDbSessionRef; ## Re-usable session descriptor - base: NodeTag; ## before or at first account entry in `data` - data: PackedAccountRange; ## re-packed `snap/1 ` reply data - persistent = false; ## store data on disk - ): Result[void,HexaryDbError] = - ## Validate and import accounts (using proofs as received with the snap - ## message `AccountRange`). This function accumulates data in a memory table - ## which can be written to disk with the argument `persistent` set `true`. The - ## memory table is held in the descriptor argument`ps`. - ## - ## Note that the `peer` argument is for log messages, only. - var accounts: seq[RLeafSpecs] - try: - if 0 < data.proof.len: - let rc = ps.peer.mergeProofs(ps.accDb, ps.accRoot, data.proof) - if rc.isErr: - return err(rc.error) - block: - let rc = ps.peer.collectAccounts(base, data.accounts) - if rc.isErr: - return err(rc.error) - accounts = rc.value - block: - let rc = ps.accDb.hexaryInterpolate( - ps.accRoot, accounts, bootstrap = (data.proof.len == 0)) - if rc.isErr: - return err(rc.error) - - if persistent and 0 < ps.accDb.tab.len: - let rc = ps.accDb.persistentAccounts(ps.base) - if rc.isErr: - return err(rc.error) - - except RlpError: - return err(RlpEncoding) - except KeyError as e: - raiseAssert "Not possible @ importAccounts: " & e.msg - except OSError as e: - trace "Import Accounts exception", peer=ps.peer, name=($e.name), msg=e.msg - return err(OSErrorException) - - when extraTraceMessages: - trace "Accounts and proofs ok", peer=ps.peer, - root=ps.accRoot.ByteArray32.toHex, - proof=data.proof.len, base, accounts=data.accounts.len - ok() - -proc importAccounts*( - pv: SnapDbRef; ## Base descriptor on `BaseChainDB` - peer: Peer, ## for log messages - root: Hash256; ## state root - base: NodeTag; ## before or at first account entry in `data` - data: PackedAccountRange; ## re-packed `snap/1 ` reply data - ): Result[void,HexaryDbError] = - ## Variant of `importAccounts()` - SnapDbSessionRef.init( - pv, root, peer).importAccounts(base, data, persistent=true) - - - -proc importStorages*( - ps: SnapDbSessionRef; ## Re-usable session descriptor - data: AccountStorageRange; ## Account storage reply from `snap/1` protocol - persistent = false; ## store data on disk - ): Result[void,seq[(int,HexaryDbError)]] = - ## Validate and import storage slots (using proofs as received with the snap - ## message `StorageRanges`). This function accumulates data in a memory table - ## which can be written to disk with the argument `persistent` set `true`. The - ## memory table is held in the descriptor argument`ps`. - ## - ## Note that the `peer` argument is for log messages, only. - ## - ## On error, the function returns a non-empty list of slot IDs and error - ## codes for the entries that could not be processed. If the slot ID is -1, - ## the error returned is not related to a slot. If any, this -1 entry is - ## always the last in the list. - let - nItems = data.storages.len - sTop = nItems - 1 - if 0 <= sTop: - var - errors: seq[(int,HexaryDbError)] - slotID = -1 # so excepions see the current solt ID - try: - for n in 0 ..< sTop: - # These ones never come with proof data - slotID = n - let rc = ps.importStorageSlots(data.storages[slotID], @[]) - if rc.isErr: - let error = rc.error - trace "Storage slots item fails", peer=ps.peer, slotID, nItems, - slots=data.storages[slotID].data.len, proofs=0, error - errors.add (slotID,error) - - # Final one might come with proof data - block: - slotID = sTop - let rc = ps.importStorageSlots(data.storages[slotID], data.proof) - if rc.isErr: - let error = rc.error - trace "Storage slots last item fails", peer=ps.peer, nItems, - slots=data.storages[sTop].data.len, proofs=data.proof.len, error - errors.add (slotID,error) - - # Store to disk - if persistent and 0 < ps.stoDb.tab.len: - slotID = -1 - let rc = ps.stoDb.persistentStorages(ps.base) - if rc.isErr: - errors.add (slotID,rc.error) - - except RlpError: - errors.add (slotID,RlpEncoding) - except KeyError as e: - raiseAssert "Not possible @ importAccounts: " & e.msg - except OSError as e: - trace "Import Accounts exception", peer=ps.peer, name=($e.name), msg=e.msg - errors.add (slotID,RlpEncoding) - - if 0 < errors.len: - # So non-empty error list is guaranteed - return err(errors) - - when extraTraceMessages: - trace "Storage slots imported", peer=ps.peer, - slots=data.storages.len, proofs=data.proof.len - ok() - -proc importStorages*( - pv: SnapDbRef; ## Base descriptor on `BaseChainDB` - peer: Peer, ## For log messages, only - data: AccountStorageRange; ## Account storage reply from `snap/1` protocol - ): Result[void,seq[(int,HexaryDbError)]] = - ## Variant of `importStorages()` - SnapDbSessionRef.init( - pv, Hash256(), peer).importStorages(data, persistent=true) - - - -proc importRawNodes*( - ps: SnapDbSessionRef; ## Re-usable session descriptor - nodes: openArray[Blob]; ## Node records - persistent = false; ## store data on disk - ): Result[void,seq[(int,HexaryDbError)]] = - ## ... - var - errors: seq[(int,HexaryDbError)] - nodeID = -1 - let - db = ps.newHexaryTreeDbRef() - try: - # Import nodes - for n,rec in nodes: - nodeID = n - let rc = db.hexaryImport(rec) - if rc.isErr: - let error = rc.error - trace "importRawNodes()", peer=ps.peer, item=n, nodes=nodes.len, error - errors.add (nodeID,error) - - # Store to disk - if persistent and 0 < db.tab.len: - nodeID = -1 - let rc = db.persistentAccounts(ps.base) - if rc.isErr: - errors.add (nodeID,rc.error) - - except RlpError: - errors.add (nodeID,RlpEncoding) - except KeyError as e: - raiseAssert "Not possible @ importAccounts: " & e.msg - except OSError as e: - trace "Import Accounts exception", peer=ps.peer, name=($e.name), msg=e.msg - errors.add (nodeID,RlpEncoding) - - if 0 < errors.len: - return err(errors) - - trace "Raw nodes imported", peer=ps.peer, nodes=nodes.len - ok() - -proc importRawNodes*( - pv: SnapDbRef; ## Base descriptor on `BaseChainDB` - peer: Peer, ## For log messages, only - nodes: openArray[Blob]; ## Node records - ): Result[void,seq[(int,HexaryDbError)]] = - ## Variant of `importRawNodes()` for persistent storage. - SnapDbSessionRef.init( - pv, Hash256(), peer).importRawNodes(nodes, persistent=true) - - -proc inspectAccountsTrie*( - ps: SnapDbSessionRef; ## Re-usable session descriptor - pathList = seq[Blob].default; ## Starting nodes for search - maxLeafPaths = 0; ## Record leaves with proper 32 bytes path - persistent = false; ## Read data from disk - ignoreError = false; ## Always return partial results if available - ): Result[TrieNodeStat, HexaryDbError] = - ## Starting with the argument list `pathSet`, find all the non-leaf nodes in - ## the hexary trie which have at least one node key reference missing in - ## the trie database. Argument `pathSet` list entries that do not refer to a - ## valid node are silently ignored. - ## - let peer = ps.peer - var stats: TrieNodeStat - noRlpExceptionOops("inspectAccountsTrie()"): - if persistent: - let getFn: HexaryGetFn = proc(key: Blob): Blob = ps.base.db.get(key) - stats = getFn.hexaryInspectTrie(ps.accRoot, pathList, maxLeafPaths) - else: - stats = ps.accDb.hexaryInspectTrie(ps.accRoot, pathList, maxLeafPaths) - - block checkForError: - let error = block: - if stats.stopped: - TrieLoopAlert - elif stats.level == 0: - TrieIsEmpty - else: - break checkForError - trace "Inspect account trie failed", peer, nPathList=pathList.len, - nDangling=stats.dangling.len, leaves=stats.leaves.len, - maxleaves=maxLeafPaths, stoppedAt=stats.level, error - return err(error) - - when extraTraceMessages: - trace "Inspect account trie ok", peer, nPathList=pathList.len, - nDangling=stats.dangling.len, leaves=stats.leaves.len, - maxleaves=maxLeafPaths, level=stats.level - return ok(stats) - -proc inspectAccountsTrie*( - pv: SnapDbRef; ## Base descriptor on `BaseChainDB` - peer: Peer; ## For log messages, only - root: Hash256; ## state root - pathList = seq[Blob].default; ## Starting paths for search - maxLeafPaths = 0; ## Record leaves with proper 32 bytes path - ignoreError = false; ## Always return partial results when avail. - ): Result[TrieNodeStat, HexaryDbError] = - ## Variant of `inspectAccountsTrie()` for persistent storage. - SnapDbSessionRef.init( - pv, root, peer).inspectAccountsTrie( - pathList, maxLeafPaths, persistent=true, ignoreError) - - -proc getAccountNodeKey*( - ps: SnapDbSessionRef; ## Re-usable session descriptor - path: Blob; ## Partial node path - persistent = false; ## Read data from disk - ): Result[NodeKey,HexaryDbError] = - ## For a partial node path argument `path`, return the raw node key. - var rc: Result[NodeKey,void] - noRlpExceptionOops("inspectAccountsPath()"): - if persistent: - let getFn: HexaryGetFn = proc(key: Blob): Blob = ps.base.db.get(key) - rc = getFn.hexaryInspectPath(ps.accRoot, path) - else: - rc = ps.accDb.hexaryInspectPath(ps.accRoot, path) - if rc.isOk: - return ok(rc.value) - err(NodeNotFound) - -proc getAccountNodeKey*( - pv: SnapDbRef; ## Base descriptor on `BaseChainDB` - peer: Peer; ## For log messages, only - root: Hash256; ## state root - path: Blob; ## Partial node path - ): Result[NodeKey,HexaryDbError] = - ## Variant of `inspectAccountsPath()` for persistent storage. - SnapDbSessionRef.init( - pv, root, peer).getAccountNodeKey(path, persistent=true) - - -proc getAccountData*( - ps: SnapDbSessionRef; ## Re-usable session descriptor - path: NodeKey; ## Account to visit - persistent = false; ## Read data from disk - ): Result[Account,HexaryDbError] = - ## Fetch account data. - ## - ## Caveat: There is no unit test yet for the non-persistent version - let peer = ps.peer - var acc: Account - - noRlpExceptionOops("getAccountData()"): - var leaf: Blob - if persistent: - let getFn: HexaryGetFn = proc(key: Blob): Blob = ps.base.db.get(key) - leaf = path.hexaryPath(ps.accRoot, getFn).leafData - else: - leaf = path.hexaryPath(ps.accRoot.to(RepairKey),ps.accDb).leafData - - if leaf.len == 0: - return err(AccountNotFound) - acc = rlp.decode(leaf,Account) - - return ok(acc) - -proc getAccountData*( - pv: SnapDbRef; ## Base descriptor on `BaseChainDB` - peer: Peer, ## For log messages, only - root: Hash256; ## state root - path: NodeKey; ## Account to visit - ): Result[Account,HexaryDbError] = - ## Variant of `getAccount()` for persistent storage. - SnapDbSessionRef.init(pv, root, peer).getAccountData(path, persistent=true) - -# ------------------------------------------------------------------------------ -# Public functions: additional helpers -# ------------------------------------------------------------------------------ - -proc sortMerge*(base: openArray[NodeTag]): NodeTag = - ## Helper for merging several `(NodeTag,seq[PackedAccount])` data sets - ## so that there are no overlap which would be rejected by `merge()`. - ## - ## This function selects a `NodeTag` from a list. - result = high(NodeTag) - for w in base: - if w < result: - result = w - -proc sortMerge*(acc: openArray[seq[PackedAccount]]): seq[PackedAccount] = - ## Helper for merging several `(NodeTag,seq[PackedAccount])` data sets - ## so that there are no overlap which would be rejected by `merge()`. - ## - ## This function flattens and sorts the argument account lists. - noKeyError("sortMergeAccounts"): - var accounts: Table[NodeTag,PackedAccount] - for accList in acc: - for item in accList: - accounts[item.accHash.to(NodeTag)] = item - result = toSeq(accounts.keys).sorted(cmp).mapIt(accounts[it]) - -proc getChainDbAccount*( - ps: SnapDbSessionRef; - accHash: Hash256 - ): Result[Account,HexaryDbError] = - ## Fetch account via `BaseChainDB` - ps.getAccountData(accHash.to(NodeKey),persistent=true) - -proc nextChainDbKey*( - ps: SnapDbSessionRef; - accHash: Hash256 - ): Result[Hash256,HexaryDbError] = - ## Fetch the account path on the `BaseChainDB`, the one next to the - ## argument account. - noRlpExceptionOops("getChainDbAccount()"): - let - getFn: HexaryGetFn = proc(key: Blob): Blob = ps.base.db.get(key) - path = accHash.to(NodeKey) - .hexaryPath(ps.accRoot, getFn) - .next(getFn) - .getNibbles - if 64 == path.len: - return ok(path.getBytes.convertTo(Hash256)) - - err(AccountNotFound) - -proc prevChainDbKey*( - ps: SnapDbSessionRef; - accHash: Hash256 - ): Result[Hash256,HexaryDbError] = - ## Fetch the account path on the `BaseChainDB`, the one before to the - ## argument account. - noRlpExceptionOops("getChainDbAccount()"): - let - getFn: HexaryGetFn = proc(key: Blob): Blob = ps.base.db.get(key) - path = accHash.to(NodeKey) - .hexaryPath(ps.accRoot, getFn) - .prev(getFn) - .getNibbles - if 64 == path.len: - return ok(path.getBytes.convertTo(Hash256)) - - err(AccountNotFound) - -# ------------------------------------------------------------------------------ -# Debugging (and playing with the hexary database) -# ------------------------------------------------------------------------------ - -proc assignPrettyKeys*(ps: SnapDbSessionRef) = - ## Prepare foe pretty pringing/debugging. Run early enough this function - ## sets the root key to `"$"`, for instance. - noPpError("validate(1)"): - # Make keys assigned in pretty order for printing - var keysList = toSeq(ps.accDb.tab.keys) - let rootKey = ps.accRoot.to(RepairKey) - discard rootKey.toKey(ps) - if ps.accDb.tab.hasKey(rootKey): - keysList = @[rootKey] & keysList - for key in keysList: - let node = ps.accDb.tab[key] - discard key.toKey(ps) - case node.kind: - of Branch: (for w in node.bLink: discard w.toKey(ps)) - of Extension: discard node.eLink.toKey(ps) - of Leaf: discard - -proc dumpPath*(ps: SnapDbSessionRef; key: NodeTag): seq[string] = - ## Pretty print helper compiling the path into the repair tree for the - ## argument `key`. - noPpError("dumpPath"): - let rPath= key.to(NodeKey).hexaryPath(ps.accRoot.to(RepairKey), ps.accDb) - result = rPath.path.mapIt(it.pp(ps.accDb)) & @["(" & rPath.tail.pp & ")"] - -proc dumpAccDB*(ps: SnapDbSessionRef; indent = 4): string = - ## Dump the entries from the a generic accounts trie. - ps.accDb.pp(ps.accRoot,indent) - -proc getAcc*(ps: SnapDbSessionRef): HexaryTreeDbRef = - ## Low level access to accounts DB - ps.accDb - -proc hexaryPpFn*(ps: SnapDbSessionRef): HexaryPpFn = - ## Key mapping function used in `HexaryTreeDB` - ps.accDb.keyPp - -# ------------------------------------------------------------------------------ -# End -# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/db/snapdb_accounts.nim b/nimbus/sync/snap/worker/db/snapdb_accounts.nim new file mode 100644 index 000000000..b718461b2 --- /dev/null +++ b/nimbus/sync/snap/worker/db/snapdb_accounts.nim @@ -0,0 +1,483 @@ +# nimbus-eth1 +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +import + std/[algorithm, sequtils, strutils, tables], + chronicles, + eth/[common/eth_types, p2p, rlp, trie/nibbles], + stew/byteutils, + ../../range_desc, + "."/[bulk_storage, hexary_desc, hexary_error, hexary_import, + hexary_interpolate, hexary_inspect, hexary_paths, snapdb_desc] + +{.push raises: [Defect].} + +logScope: + topics = "snap-db" + +type + SnapDbAccountsRef* = ref object of SnapDbBaseRef + getFn: HexaryGetFn ## Persistent database `get()` closure + +const + extraTraceMessages = false or true + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +proc to(h: Hash256; T: type NodeKey): T = + h.data.T + +proc convertTo(data: openArray[byte]; T: type Hash256): T = + discard result.data.NodeKey.init(data) # size error => zero + + +template noKeyError(info: static[string]; code: untyped) = + try: + code + except KeyError as e: + raiseAssert "Not possible (" & info & "): " & e.msg + +template noRlpExceptionOops(info: static[string]; code: untyped) = + try: + code + except RlpError: + return err(RlpEncoding) + except KeyError as e: + raiseAssert "Not possible (" & info & "): " & e.msg + except Defect as e: + raise e + except Exception as e: + raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc persistentAccounts( + db: HexaryTreeDbRef; ## Current table + ps: SnapDbAccountsRef; ## For persistent database + ): Result[void,HexaryDbError] + {.gcsafe, raises: [Defect,OSError,KeyError].} = + ## Store accounts trie table on databse + if ps.rockDb.isNil: + let rc = db.bulkStorageAccounts(ps.kvDb) + if rc.isErr: return rc + else: + let rc = db.bulkStorageAccountsRocky(ps.rockDb) + if rc.isErr: return rc + ok() + + +proc collectAccounts( + peer: Peer, ## for log messages + base: NodeTag; + acc: seq[PackedAccount]; + ): Result[seq[RLeafSpecs],HexaryDbError] + {.gcsafe, raises: [Defect, RlpError].} = + ## Repack account records into a `seq[RLeafSpecs]` queue. The argument data + ## `acc` are as received with the snap message `AccountRange`). + ## + ## The returned list contains leaf node information for populating a repair + ## table. The accounts, together with some hexary trie records for proofs + ## can be used for validating the argument account data. + var rcAcc: seq[RLeafSpecs] + + if acc.len != 0: + let pathTag0 = acc[0].accHash.to(NodeTag) + + # Verify lower bound + if pathTag0 < base: + let error = HexaryDbError.AccountSmallerThanBase + trace "collectAccounts()", peer, base, accounts=acc.len, error + return err(error) + + # Add base for the records (no payload). Note that the assumption + # holds: `rcAcc[^1].tag <= base` + if base < pathTag0: + rcAcc.add RLeafSpecs(pathTag: base) + + # Check for the case that accounts are appended + elif 0 < rcAcc.len and pathTag0 <= rcAcc[^1].pathTag: + let error = HexaryDbError.AccountsNotSrictlyIncreasing + trace "collectAccounts()", peer, base, accounts=acc.len, error + return err(error) + + # Add first account + rcAcc.add RLeafSpecs(pathTag: pathTag0, payload: acc[0].accBlob) + + # Veify & add other accounts + for n in 1 ..< acc.len: + let nodeTag = acc[n].accHash.to(NodeTag) + + if nodeTag <= rcAcc[^1].pathTag: + let error = AccountsNotSrictlyIncreasing + trace "collectAccounts()", peer, item=n, base, accounts=acc.len, error + return err(error) + + rcAcc.add RLeafSpecs(pathTag: nodeTag, payload: acc[n].accBlob) + + ok(rcAcc) + +# ------------------------------------------------------------------------------ +# Public constructor +# ------------------------------------------------------------------------------ + +proc init*( + T: type SnapDbAccountsRef; + pv: SnapDbRef; + root: Hash256; + peer: Peer = nil + ): T = + ## Constructor, starts a new accounts session. + let db = pv.kvDb + new result + result.init(pv, root.to(NodeKey), peer) + result.getFn = proc(key: Blob): Blob = db.get(key) + +proc dup*( + ps: SnapDbAccountsRef; + root: Hash256; + peer: Peer; + ): SnapDbAccountsRef = + ## Resume an accounts session with different `root` key and `peer`. + new result + result[].shallowCopy(ps[]) + result.root = root.to(NodeKey) + result.peer = peer + +proc dup*( + ps: SnapDbAccountsRef; + root: Hash256; + ): SnapDbAccountsRef = + ## Variant of `dup()` without the `peer` argument. + new result + result[].shallowCopy(ps[]) + result.root = root.to(NodeKey) + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc importAccounts*( + ps: SnapDbAccountsRef; ## Re-usable session descriptor + base: NodeTag; ## before or at first account entry in `data` + data: PackedAccountRange; ## re-packed `snap/1 ` reply data + persistent = false; ## store data on disk + ): Result[void,HexaryDbError] = + ## Validate and import accounts (using proofs as received with the snap + ## message `AccountRange`). This function accumulates data in a memory table + ## which can be written to disk with the argument `persistent` set `true`. The + ## memory table is held in the descriptor argument`ps`. + ## + ## Note that the `peer` argument is for log messages, only. + var accounts: seq[RLeafSpecs] + try: + if 0 < data.proof.len: + let rc = ps.mergeProofs(ps.root, data.proof) + if rc.isErr: + return err(rc.error) + block: + let rc = ps.peer.collectAccounts(base, data.accounts) + if rc.isErr: + return err(rc.error) + accounts = rc.value + block: + let rc = ps.hexaDb.hexaryInterpolate( + ps.root, accounts, bootstrap = (data.proof.len == 0)) + if rc.isErr: + return err(rc.error) + + if persistent and 0 < ps.hexaDb.tab.len: + let rc = ps.hexaDb.persistentAccounts(ps) + if rc.isErr: + return err(rc.error) + + except RlpError: + return err(RlpEncoding) + except KeyError as e: + raiseAssert "Not possible @ importAccounts: " & e.msg + except OSError as e: + trace "Import Accounts exception", peer=ps.peer, name=($e.name), msg=e.msg + return err(OSErrorException) + + when extraTraceMessages: + trace "Accounts and proofs ok", peer=ps.peer, + root=ps.root.ByteArray32.toHex, + proof=data.proof.len, base, accounts=data.accounts.len + ok() + +proc importAccounts*( + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` + peer: Peer, ## for log messages + root: Hash256; ## state root + base: NodeTag; ## before or at first account entry in `data` + data: PackedAccountRange; ## re-packed `snap/1 ` reply data + ): Result[void,HexaryDbError] = + ## Variant of `importAccounts()` + SnapDbAccountsRef.init( + pv, root, peer).importAccounts(base, data, persistent=true) + + +proc importRawAccountNodes*( + ps: SnapDbAccountsRef; ## Re-usable session descriptor + nodes: openArray[Blob]; ## Node records + reportNodes = {Leaf}; ## Additional node types to report + persistent = false; ## store data on disk + ): seq[HexaryNodeReport] = + ## Store data nodes given as argument `nodes` on the persistent database. + ## + ## If there were an error when processing a particular argument `notes` item, + ## it will be reported with the return value providing argument slot/index, + ## node type, end error code. + ## + ## If there was an error soring persistent data, the last report item will + ## have an error code, only. + ## + ## Additional node items might be reported if the node type is in the + ## argument set `reportNodes`. These reported items will have no error + ## code set (i.e. `NothingSerious`.) + ## + let + peer = ps.peer + db = HexaryTreeDbRef.init(ps) + nItems = nodes.len + var + nErrors = 0 + slot: Option[int] + try: + # Import nodes + for n,rec in nodes: + if 0 < rec.len: # otherwise ignore empty placeholder + slot = some(n) + var rep = db.hexaryImport(rec) + if rep.error != NothingSerious: + rep.slot = slot + result.add rep + nErrors.inc + trace "Error importing account nodes", peer, inx=n, nItems, + error=rep.error, nErrors + elif rep.kind.isSome and rep.kind.unsafeGet in reportNodes: + rep.slot = slot + result.add rep + + # Store to disk + if persistent and 0 < db.tab.len: + slot = none(int) + let rc = db.persistentAccounts(ps) + if rc.isErr: + result.add HexaryNodeReport(slot: slot, error: rc.error) + + except RlpError: + result.add HexaryNodeReport(slot: slot, error: RlpEncoding) + nErrors.inc + trace "Error importing account nodes", peer, slot, nItems, + error=RlpEncoding, nErrors + except KeyError as e: + raiseAssert "Not possible @ importRawAccountNodes: " & e.msg + except OSError as e: + result.add HexaryNodeReport(slot: slot, error: OSErrorException) + nErrors.inc + trace "Import account nodes exception", peer, slot, nItems, + name=($e.name), msg=e.msg, nErrors + + when extraTraceMessages: + if nErrors == 0: + trace "Raw account nodes imported", peer, slot, nItems, report=result.len + +proc importRawAccountNodes*( + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` + peer: Peer, ## For log messages, only + nodes: openArray[Blob]; ## Node records + reportNodes = {Leaf}; ## Additional node types to report + ): seq[HexaryNodeReport] = + ## Variant of `importRawNodes()` for persistent storage. + SnapDbAccountsRef.init( + pv, Hash256(), peer).importRawAccountNodes( + nodes, reportNodes, persistent=true) + + +proc inspectAccountsTrie*( + ps: SnapDbAccountsRef; ## Re-usable session descriptor + pathList = seq[Blob].default; ## Starting nodes for search + persistent = false; ## Read data from disk + ignoreError = false; ## Always return partial results if available + ): Result[TrieNodeStat, HexaryDbError] = + ## Starting with the argument list `pathSet`, find all the non-leaf nodes in + ## the hexary trie which have at least one node key reference missing in + ## the trie database. Argument `pathSet` list entries that do not refer to a + ## valid node are silently ignored. + ## + let peer = ps.peer + var stats: TrieNodeStat + noRlpExceptionOops("inspectAccountsTrie()"): + if persistent: + stats = ps.getFn.hexaryInspectTrie(ps.root, pathList) + else: + stats = ps.hexaDb.hexaryInspectTrie(ps.root, pathList) + + block checkForError: + let error = block: + if stats.stopped: + TrieLoopAlert + elif stats.level == 0: + TrieIsEmpty + else: + break checkForError + trace "Inspect account trie failed", peer, nPathList=pathList.len, + nDangling=stats.dangling.len, stoppedAt=stats.level, error + return err(error) + + when extraTraceMessages: + trace "Inspect account trie ok", peer, nPathList=pathList.len, + nDangling=stats.dangling.len, level=stats.level + return ok(stats) + +proc inspectAccountsTrie*( + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` + peer: Peer; ## For log messages, only + root: Hash256; ## state root + pathList = seq[Blob].default; ## Starting paths for search + ignoreError = false; ## Always return partial results when avail. + ): Result[TrieNodeStat, HexaryDbError] = + ## Variant of `inspectAccountsTrie()` for persistent storage. + SnapDbAccountsRef.init( + pv, root, peer).inspectAccountsTrie(pathList, persistent=true, ignoreError) + + +proc getAccountNodeKey*( + ps: SnapDbAccountsRef; ## Re-usable session descriptor + path: Blob; ## Partial node path + persistent = false; ## Read data from disk + ): Result[NodeKey,HexaryDbError] = + ## For a partial node path argument `path`, return the raw node key. + var rc: Result[NodeKey,void] + noRlpExceptionOops("inspectAccountsPath()"): + if persistent: + rc = ps.getFn.hexaryInspectPath(ps.root, path) + else: + rc = ps.hexaDb.hexaryInspectPath(ps.root, path) + if rc.isOk: + return ok(rc.value) + err(NodeNotFound) + +proc getAccountNodeKey*( + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` + peer: Peer; ## For log messages, only + root: Hash256; ## state root + path: Blob; ## Partial node path + ): Result[NodeKey,HexaryDbError] = + ## Variant of `inspectAccountsPath()` for persistent storage. + SnapDbAccountsRef.init( + pv, root, peer).getAccountNodeKey(path, persistent=true) + + +proc getAccountData*( + ps: SnapDbAccountsRef; ## Re-usable session descriptor + path: NodeKey; ## Account to visit + persistent = false; ## Read data from disk + ): Result[Account,HexaryDbError] = + ## Fetch account data. + ## + ## Caveat: There is no unit test yet for the non-persistent version + let peer = ps.peer + var acc: Account + + noRlpExceptionOops("getAccountData()"): + var leaf: Blob + if persistent: + leaf = path.hexaryPath(ps.root, ps.getFn).leafData + else: + leaf = path.hexaryPath(ps.root.to(RepairKey),ps.hexaDb).leafData + + if leaf.len == 0: + return err(AccountNotFound) + acc = rlp.decode(leaf,Account) + + return ok(acc) + +proc getAccountData*( + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` + peer: Peer, ## For log messages, only + root: Hash256; ## state root + path: NodeKey; ## Account to visit + ): Result[Account,HexaryDbError] = + ## Variant of `getAccount()` for persistent storage. + SnapDbAccountsRef.init(pv, root, peer).getAccountData(path, persistent=true) + +# ------------------------------------------------------------------------------ +# Public functions: additional helpers +# ------------------------------------------------------------------------------ + +proc sortMerge*(base: openArray[NodeTag]): NodeTag = + ## Helper for merging several `(NodeTag,seq[PackedAccount])` data sets + ## so that there are no overlap which would be rejected by `merge()`. + ## + ## This function selects a `NodeTag` from a list. + result = high(NodeTag) + for w in base: + if w < result: + result = w + +proc sortMerge*(acc: openArray[seq[PackedAccount]]): seq[PackedAccount] = + ## Helper for merging several `(NodeTag,seq[PackedAccount])` data sets + ## so that there are no overlap which would be rejected by `merge()`. + ## + ## This function flattens and sorts the argument account lists. + noKeyError("sortMergeAccounts"): + var accounts: Table[NodeTag,PackedAccount] + for accList in acc: + for item in accList: + accounts[item.accHash.to(NodeTag)] = item + result = toSeq(accounts.keys).sorted(cmp).mapIt(accounts[it]) + +proc getChainDbAccount*( + ps: SnapDbAccountsRef; + accHash: Hash256 + ): Result[Account,HexaryDbError] = + ## Fetch account via `BaseChainDB` + ps.getAccountData(accHash.to(NodeKey),persistent=true) + +proc nextChainDbKey*( + ps: SnapDbAccountsRef; + accHash: Hash256 + ): Result[Hash256,HexaryDbError] = + ## Fetch the account path on the `BaseChainDB`, the one next to the + ## argument account. + noRlpExceptionOops("getChainDbAccount()"): + let path = accHash.to(NodeKey) + .hexaryPath(ps.root, ps.getFn) + .next(ps.getFn) + .getNibbles + if 64 == path.len: + return ok(path.getBytes.convertTo(Hash256)) + + err(AccountNotFound) + +proc prevChainDbKey*( + ps: SnapDbAccountsRef; + accHash: Hash256 + ): Result[Hash256,HexaryDbError] = + ## Fetch the account path on the `BaseChainDB`, the one before to the + ## argument account. + noRlpExceptionOops("getChainDbAccount()"): + let path = accHash.to(NodeKey) + .hexaryPath(ps.root, ps.getFn) + .prev(ps.getFn) + .getNibbles + if 64 == path.len: + return ok(path.getBytes.convertTo(Hash256)) + + err(AccountNotFound) + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/db/snapdb_desc.nim b/nimbus/sync/snap/worker/db/snapdb_desc.nim new file mode 100644 index 000000000..9e5a1f7b3 --- /dev/null +++ b/nimbus/sync/snap/worker/db/snapdb_desc.nim @@ -0,0 +1,252 @@ +# nimbus-eth1 +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +import + std/[sequtils, tables], + chronicles, + eth/[common/eth_types, p2p], + ../../../../db/select_backend, + ../../range_desc, + "."/[bulk_storage, hexary_desc, hexary_error, hexary_import, hexary_paths, + rocky_bulk_load] + +{.push raises: [Defect].} + +logScope: + topics = "snap-db" + +type + SnapDbRef* = ref object + ## Global, re-usable descriptor + keyMap: Table[RepairKey,uint] ## For debugging only (will go away) + db: TrieDatabaseRef ## General database + rocky: RocksStoreRef ## Set if rocksdb is available + + SnapDbBaseRef* = ref object of RootRef + ## Session descriptor + xDb: HexaryTreeDbRef ## Hexary database + base: SnapDbRef ## Back reference to common parameters + peer*: Peer ## For log messages + root*: NodeKey ## Session DB root node key + +# ------------------------------------------------------------------------------ +# Private debugging helpers +# ------------------------------------------------------------------------------ + +template noPpError(info: static[string]; code: untyped) = + try: + code + except ValueError as e: + raiseAssert "Inconveivable (" & info & "): " & e.msg + except KeyError as e: + raiseAssert "Not possible (" & info & "): " & e.msg + except Defect as e: + raise e + except Exception as e: + raiseAssert "Ooops (" & info & ") " & $e.name & ": " & e.msg + +proc toKey(a: RepairKey; pv: SnapDbRef): uint = + if not a.isZero: + noPpError("pp(RepairKey)"): + if not pv.keyMap.hasKey(a): + pv.keyMap[a] = pv.keyMap.len.uint + 1 + result = pv.keyMap[a] + +proc toKey(a: RepairKey; ps: SnapDbBaseRef): uint = + a.toKey(ps.base) + +proc toKey(a: NodeKey; ps: SnapDbBaseRef): uint = + a.to(RepairKey).toKey(ps) + +proc toKey(a: NodeTag; ps: SnapDbBaseRef): uint = + a.to(NodeKey).toKey(ps) + +# ------------------------------------------------------------------------------ +# Debugging, pretty printing +# ------------------------------------------------------------------------------ + +proc pp*(a: NodeKey; ps: SnapDbBaseRef): string = + if a.isZero: "ø" else:"$" & $a.toKey(ps) + +proc pp*(a: RepairKey; ps: SnapDbBaseRef): string = + if a.isZero: "ø" elif a.isNodeKey: "$" & $a.toKey(ps) else: "@" & $a.toKey(ps) + +proc pp*(a: NodeTag; ps: SnapDbBaseRef): string = + a.to(NodeKey).pp(ps) + +# ------------------------------------------------------------------------------ +# Public constructor +# ------------------------------------------------------------------------------ + +proc init*( + T: type SnapDbRef; + db: TrieDatabaseRef + ): T = + ## Main object constructor + T(db: db) + +proc init*( + T: type SnapDbRef; + db: ChainDb + ): T = + ## Variant of `init()` allowing bulk import on rocksdb backend + result = T(db: db.trieDB, rocky: db.rocksStoreRef) + if not result.rocky.bulkStorageClearRockyCacheFile(): + result.rocky = nil + +proc init*( + T: type HexaryTreeDbRef; + pv: SnapDbRef; + ): T = + ## Constructor for inner hexary trie database + let xDb = HexaryTreeDbRef() + xDb.keyPp = proc(key: RepairKey): string = key.pp(xDb) # will go away + return xDb + +proc init*( + T: type HexaryTreeDbRef; + ps: SnapDbBaseRef; + ): T = + ## Constructor variant + HexaryTreeDbRef.init(ps.base) + +# --------------- + +proc init*( + ps: SnapDbBaseRef; + pv: SnapDbRef; + root: NodeKey; + peer: Peer = nil) = + ## Session base constructor + ps.base = pv + ps.peer = peer + ps.root = root + ps.xDb = HexaryTreeDbRef.init(pv) + +proc init*( + T: type SnapDbBaseRef; + ps: SnapDbBaseRef; + root: NodeKey; + peer: Peer = nil): T = + ## Variant of session base constructor + new result + result.init(ps.base, root, peer) + +# ------------------------------------------------------------------------------ +# Public getters +# ------------------------------------------------------------------------------ + +proc hexaDb*(ps: SnapDbBaseRef): HexaryTreeDbRef = + ## Getter, low level access to underlying session DB + ps.xDb + +proc rockDb*(ps: SnapDbBaseRef): RocksStoreRef = + ## Getter, low level access to underlying persistent rock DB interface + ps.base.rocky + +proc kvDb*(ps: SnapDbBaseRef): TrieDatabaseRef = + ## Getter, low level access to underlying persistent key-value DB + ps.base.db + +proc kvDb*(pv: SnapDbRef): TrieDatabaseRef = + ## Getter, low level access to underlying persistent key-value DB + pv.db + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc dbBackendRocksDb*(pv: SnapDbRef): bool = + ## Returns `true` if rocksdb features are available + not pv.rocky.isNil + +proc dbBackendRocksDb*(ps: SnapDbBaseRef): bool = + ## Returns `true` if rocksdb features are available + not ps.base.rocky.isNil + +proc mergeProofs*( + ps: SnapDbBaseRef; ## Session database + root: NodeKey; ## Root for checking nodes + proof: seq[Blob]; ## Node records + freeStandingOk = false; ## Remove freestanding nodes + ): Result[void,HexaryDbError] + {.gcsafe, raises: [Defect,RlpError,KeyError].} = + ## Import proof records (as received with snap message) into a hexary trie + ## of the repair table. These hexary trie records can be extended to a full + ## trie at a later stage and used for validating account data. + let + db = ps.hexaDb + peer = ps.peer + var + nodes: HashSet[RepairKey] + refs = @[root.to(RepairKey)].toHashSet + + for n,rlpRec in proof: + let report = db.hexaryImport(rlpRec, nodes, refs) + if report.error != NothingSerious: + let error = report.error + trace "mergeProofs()", peer, item=n, proofs=proof.len, error + return err(error) + + # Remove free standing nodes (if any) + if 0 < nodes.len: + let rest = nodes - refs + if 0 < rest.len: + if freeStandingOk: + trace "mergeProofs() detected unrelated nodes", peer, nodes=nodes.len + discard + else: + # Delete unreferenced nodes + for nodeKey in nodes: + db.tab.del(nodeKey) + trace "mergeProofs() ignoring unrelated nodes", peer, nodes=nodes.len + + ok() + +# ------------------------------------------------------------------------------ +# Debugging (and playing with the hexary database) +# ------------------------------------------------------------------------------ + +proc assignPrettyKeys*(ps: SnapDbBaseRef) = + ## Prepare for pretty pringing/debugging. Run early enough this function + ## sets the root key to `"$"`, for instance. + noPpError("validate(1)"): + # Make keys assigned in pretty order for printing + var keysList = toSeq(ps.hexaDb.tab.keys) + let rootKey = ps.root.to(RepairKey) + discard rootKey.toKey(ps) + if ps.hexaDb.tab.hasKey(rootKey): + keysList = @[rootKey] & keysList + for key in keysList: + let node = ps.hexaDb.tab[key] + discard key.toKey(ps) + case node.kind: + of Branch: (for w in node.bLink: discard w.toKey(ps)) + of Extension: discard node.eLink.toKey(ps) + of Leaf: discard + +proc dumpPath*(ps: SnapDbBaseRef; key: NodeTag): seq[string] = + ## Pretty print helper compiling the path into the repair tree for the + ## argument `key`. + noPpError("dumpPath"): + let rPath= key.to(NodeKey).hexaryPath(ps.root.to(RepairKey), ps.hexaDb) + result = rPath.path.mapIt(it.pp(ps.hexaDb)) & @["(" & rPath.tail.pp & ")"] + +proc dumpHexaDB*(ps: SnapDbBaseRef; indent = 4): string = + ## Dump the entries from the a generic accounts trie. + ps.hexaDb.pp(ps.root,indent) + +proc hexaryPpFn*(ps: SnapDbBaseRef): HexaryPpFn = + ## Key mapping function used in `HexaryTreeDB` + ps.hexaDb.keyPp + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim b/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim new file mode 100644 index 000000000..31daf0431 --- /dev/null +++ b/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim @@ -0,0 +1,245 @@ +# nimbus-eth1 +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +import + std/[tables], + chronicles, + eth/[common/eth_types, p2p], + ../../../protocol, + ../../range_desc, + "."/[bulk_storage, hexary_desc, hexary_error, hexary_interpolate, snapdb_desc] + +{.push raises: [Defect].} + +logScope: + topics = "snap-db" + +const + extraTraceMessages = false or true + +type + SnapDbStorageSlotsRef* = ref object of SnapDbBaseRef + accHash*: Hash256 ## Accounts address hash (curr.unused) + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +proc to(h: Hash256; T: type NodeKey): T = + h.data.T + +proc convertTo(data: openArray[byte]; T: type Hash256): T = + discard result.data.NodeKey.init(data) # size error => zero + +template noKeyError(info: static[string]; code: untyped) = + try: + code + except KeyError as e: + raiseAssert "Not possible (" & info & "): " & e.msg + +template noRlpExceptionOops(info: static[string]; code: untyped) = + try: + code + except RlpError: + return err(RlpEncoding) + except KeyError as e: + raiseAssert "Not possible (" & info & "): " & e.msg + except Defect as e: + raise e + except Exception as e: + raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc persistentStorages( + db: HexaryTreeDbRef; ## Current table + ps: SnapDbStorageSlotsRef; ## For persistent database + ): Result[void,HexaryDbError] + {.gcsafe, raises: [Defect,OSError,KeyError].} = + ## Store accounts trie table on databse + if ps.rockDb.isNil: + let rc = db.bulkStorageStorages(ps.kvDb) + if rc.isErr: return rc + else: + let rc = db.bulkStorageStoragesRocky(ps.rockDb) + if rc.isErr: return rc + ok() + + +proc collectStorageSlots( + peer: Peer; + slots: seq[SnapStorage]; + ): Result[seq[RLeafSpecs],HexaryDbError] + {.gcsafe, raises: [Defect, RlpError].} = + ## Similar to `collectAccounts()` + var rcSlots: seq[RLeafSpecs] + + if slots.len != 0: + # Add initial account + rcSlots.add RLeafSpecs( + pathTag: slots[0].slotHash.to(NodeTag), + payload: slots[0].slotData) + + # Veify & add other accounts + for n in 1 ..< slots.len: + let nodeTag = slots[n].slotHash.to(NodeTag) + + if nodeTag <= rcSlots[^1].pathTag: + let error = SlotsNotSrictlyIncreasing + trace "collectStorageSlots()", peer, item=n, slots=slots.len, error + return err(error) + + rcSlots.add RLeafSpecs(pathTag: nodeTag, payload: slots[n].slotData) + + ok(rcSlots) + + +proc importStorageSlots( + ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor + data: AccountSlots; ## Account storage descriptor + proof: SnapStorageProof; ## Account storage proof + ): Result[void,HexaryDbError] + {.gcsafe, raises: [Defect,RlpError,KeyError].} = + ## Preocess storage slots for a particular storage root + let + root = data.account.storageRoot.to(NodeKey) + tmpDb = SnapDbBaseRef.init(ps, ps.root, ps.peer) + var + slots: seq[RLeafSpecs] + if 0 < proof.len: + let rc = tmpDb.mergeProofs(root, proof) + if rc.isErr: + return err(rc.error) + block: + let rc = ps.peer.collectStorageSlots(data.data) + if rc.isErr: + return err(rc.error) + slots = rc.value + block: + let rc = tmpDb.hexaDb.hexaryInterpolate( + root, slots, bootstrap = (proof.len == 0)) + if rc.isErr: + return err(rc.error) + + # Commit to main descriptor + for k,v in tmpDb.hexaDb.tab.pairs: + if not k.isNodeKey: + return err(UnresolvedRepairNode) + ps.hexaDb.tab[k] = v + + ok() + +# ------------------------------------------------------------------------------ +# Public constructor +# ------------------------------------------------------------------------------ + +proc init*( + T: type SnapDbStorageSlotsRef; + pv: SnapDbRef; + account = Hash256(); + root = Hash256(); + peer: Peer = nil + ): T = + ## Constructor, starts a new accounts session. + new result + result.init(pv, root.to(NodeKey), peer) + result.accHash = account + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc importStorages*( + ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor + data: AccountStorageRange; ## Account storage reply from `snap/1` protocol + persistent = false; ## store data on disk + ): seq[HexaryNodeReport] = + ## Validate and import storage slots (using proofs as received with the snap + ## message `StorageRanges`). This function accumulates data in a memory table + ## which can be written to disk with the argument `persistent` set `true`. The + ## memory table is held in the descriptor argument`ps`. + ## + ## If there were an error when processing a particular argument `data` item, + ## it will be reported with the return value providing argument slot/index + ## end error code. + ## + ## If there was an error soring persistent data, the last report item will + ## have an error code, only. + ## + ## TODO: + ## Reconsider how to handle the persistant storage trie, see + ## github.com/status-im/nim-eth/issues/9#issuecomment-814573755 + ## + let + peer = ps.peer + nItems = data.storages.len + sTop = nItems - 1 + var + slot: Option[int] + if 0 <= sTop: + try: + for n in 0 ..< sTop: + # These ones never come with proof data + slot = some(n) + let rc = ps.importStorageSlots(data.storages[n], @[]) + if rc.isErr: + result.add HexaryNodeReport(slot: slot, error: rc.error) + trace "Storage slots item fails", peer, inx=n, nItems, + slots=data.storages[n].data.len, proofs=0, + error=rc.error, nErrors=result.len + + # Final one might come with proof data + block: + slot = some(sTop) + let rc = ps.importStorageSlots(data.storages[sTop], data.proof) + if rc.isErr: + result.add HexaryNodeReport(slot: slot, error: rc.error) + trace "Storage slots last item fails", peer, inx=sTop, nItems, + slots=data.storages[sTop].data.len, proofs=data.proof.len, + error=rc.error, nErrors=result.len + + # Store to disk + if persistent and 0 < ps.hexaDb.tab.len: + slot = none(int) + let rc = ps.hexaDb.persistentStorages(ps) + if rc.isErr: + result.add HexaryNodeReport(slot: slot, error: rc.error) + + except RlpError: + result.add HexaryNodeReport(slot: slot, error: RlpEncoding) + trace "Storage slot node error", peer, slot, nItems, + slots=data.storages[sTop].data.len, proofs=data.proof.len, + error=RlpEncoding, nErrors=result.len + except KeyError as e: + raiseAssert "Not possible @ importStorages: " & e.msg + except OSError as e: + result.add HexaryNodeReport(slot: slot, error: OSErrorException) + trace "Import storage slots exception", peer, slot, nItems, + name=($e.name), msg=e.msg, nErrors=result.len + + when extraTraceMessages: + if result.len == 0: + trace "Storage slots imported", peer, nItems, + slots=data.storages.len, proofs=data.proof.len + +proc importStorages*( + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` + peer: Peer, ## For log messages, only + data: AccountStorageRange; ## Account storage reply from `snap/1` protocol + ): seq[HexaryNodeReport] = + ## Variant of `importStorages()` + SnapDbStorageSlotsRef.init( + pv, peer=peer).importStorages(data, persistent=true) + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/heal_accounts.nim b/nimbus/sync/snap/worker/heal_accounts.nim index 59acd96c5..a4fc9a54a 100644 --- a/nimbus/sync/snap/worker/heal_accounts.nim +++ b/nimbus/sync/snap/worker/heal_accounts.nim @@ -16,127 +16,138 @@ ## :: ## START with {state-root} ## | -## | +------------------------------------------------+ -## | | | -## v v | -## --> {missing-account-nodes} | -## | | | -## v v | -## {leaf-nodes} | -## | | | -## v v | -## | -## | | | -## v v | -## {storage-roots} {check-account-nodes} ---------+ +## | +--------------------------------+ +## | | | +## v v | +## | +## | | +## | +--------------------------+ | +## | | +--------------------+ | | +## | | | | | | +## v v v | | | +## {missing-nodes} | | | +## | | | | +## v | | | +## ---+ | | +## | | | +## v | | +## -----+ | +## | | | +## v v | +## {leaf-nodes} {check-nodes} -------+ ## | -## v -## +## v \ +## | +## | | similar actions for single leaf +## v \ nodes that otherwise would be +## {storage-roots} / done for account hash ranges in +## | | the function storeAccounts() +## v | +## / ## ## Legend: -## * `<..>` some action, process, etc. -## * `{..}` some data set, list, or queue etc. +## * `<..>`: some action, process, etc. +## * `{missing-nodes}`: list implemented as `env.fetchAccounts.missingNodes` +## * `(state-root}`: implicit argument for `getAccountNodeKey()` when +## the argument list is empty +## * `{leaf-nodes}`: list is optimised out +## * `{check-nodes}`: list implemented as `env.fetchAccounts.checkNodes` +## * `{storage-roots}`: list implemented as `env.fetchStorage` ## ## Discussion of flow chart ## ------------------------ -## * Input nodes for `` are checked for dangling child -## node links which in turn are collected as output. +## * Input nodes for `` are checked for dangling child node +## links which in turn are collected as output. ## -## * Nodes of the `{missing-account-nodes}` list are fetched from the network -## and merged into the accounts trie database. Successfully processed nodes -## are collected in the `{check-account-nodes}` list which is fed back into -## the `` process. +## * Nodes of the `{missing-nodes}` list are fetched from the network and +## merged into the persistent accounts trie database. +## + Successfully merged non-leaf nodes are collected in the `{check-nodes}` +## list which is fed back into the `` process. +## + Successfully merged leaf nodes are processed as single entry accounts +## node ranges. ## ## * If there is a problem with a node travelling from the source list -## `{missing-account-nodes}` towards the target list `{check-account-nodes}`, -## this problem node will simply held back in the source list. +## `{missing-nodes}` towards either target list `{leaf-nodes}` or +## `{check-nodes}`, this problem node will fed back to the `{missing-nodes}` +## source list. ## -## In order to avoid unnecessary stale entries, the `{missing-account-nodes}` -## list is regularly checked for whether nodes are still missing or some -## other process has done the magic work of merging some of then into the +## * In order to avoid double processing, the `{missing-nodes}` list is +## regularly checked for whether nodes are still missing or some other +## process has done the magic work of merging some of then into the ## trie database. ## ## Competing with other trie algorithms ## ------------------------------------ -## * Healing runs (semi-)parallel to processing `GetAccountRange` network -## messages from the `snap/1` protocol. This is more network bandwidth -## efficient in comparison with the healing algorithm. Here, leaf nodes are -## transferred wholesale while with the healing algorithm, only the top node -## of a sub-trie can be transferred at once (but for multiple sub-tries). +## * Healing runs (semi-)parallel to processing *GetAccountRange* network +## messages from the `snap/1` protocol (see `storeAccounts()`). Considering +## network bandwidth, the *GetAccountRange* message processing is way more +## efficient in comparison with the healing algorithm as there are no +## intermediate trie nodes involved. ## ## * The healing algorithm visits all nodes of a complete trie unless it is ## stopped in between. ## ## * If a trie node is missing, it can be fetched directly by the healing ## algorithm or one can wait for another process to do the job. Waiting for -## other processes to do the job also applies to problem nodes as indicated -## in the last bullet item of the previous chapter. +## other processes to do the job also applies to problem nodes (and vice +## versa.) ## -## * Network bandwidth can be saved if nodes are fetched by a more efficient -## process (if that is available.) This suggests that fetching missing trie -## nodes by the healing algorithm should kick in very late when the trie -## database is nearly complete. -## -## * Healing applies to a trie database associated with the currently latest -## *state root*, which may change occasionally. It suggests to start the -## healing algorithm very late altogether (not fetching nodes, only) because -## most trie databases will never be completed by healing. +## * Network bandwidth can be saved if nodes are fetched by the more efficient +## *GetAccountRange* message processing (if that is available.) This suggests +## that fetching missing trie nodes by the healing algorithm should kick in +## very late when the trie database is nearly complete. ## +## * Healing applies to a hexary trie database associated with the currently +## latest *state root*, where tha latter may change occasionally. This +## suggests to start the healing algorithm very late at a time when most of +## the accounts have been updated by any *state root*, already. There is a +## good chance that the healing algorithm detects and activates account data +## from previous *state roots* that have not changed. import std/sequtils, chronicles, chronos, - eth/[common/eth_types, p2p, trie/trie_defs], + eth/[common/eth_types, p2p, trie/nibbles, trie/trie_defs, rlp], stew/[interval_set, keyed_queue], ../../../utils/prettify, ../../sync_desc, ".."/[range_desc, worker_desc], - ./com/get_trie_nodes, - ./db/snap_db + ./com/[com_error, get_trie_nodes], + ./db/[hexary_desc, hexary_error, snapdb_accounts] {.push raises: [Defect].} logScope: - topics = "snap-fetch" + topics = "snap-heal" const extraTraceMessages = false or true ## Enabled additional logging noise # ------------------------------------------------------------------------------ -# Helpers +# Private logging helpers # ------------------------------------------------------------------------------ -proc coverageInfo(buddy: SnapBuddyRef): string = - ## Logging helper ... +proc healingCtx(buddy: SnapBuddyRef): string = let ctx = buddy.ctx env = buddy.data.pivotEnv - env.fetchAccounts.emptyFactor.toPC(0) & - "/" & - ctx.data.coveredAccounts.fullFactor.toPC(0) - -proc getCoveringLeafRangeSet(buddy: SnapBuddyRef; pt: NodeTag): LeafRangeSet = - ## Helper ... - let env = buddy.data.pivotEnv - for ivSet in env.fetchAccounts: - if 0 < ivSet.covered(pt,pt): - return ivSet - -proc commitLeafAccount(buddy: SnapBuddyRef; ivSet: LeafRangeSet; pt: NodeTag) = - ## Helper ... - discard ivSet.reduce(pt,pt) - discard buddy.ctx.data.coveredAccounts.merge(pt,pt) + "[" & + "nAccounts=" & $env.nAccounts & "," & + ("covered=" & env.fetchAccounts.unprocessed.emptyFactor.toPC(0) & "/" & + ctx.data.coveredAccounts.fullFactor.toPC(0)) & "," & + "nCheckNodes=" & $env.fetchAccounts.checkNodes.len & "," & + "nMissingNodes=" & $env.fetchAccounts.missingNodes.len & "]" # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ proc updateMissingNodesList(buddy: SnapBuddyRef) = - ## Check whether previously missing nodes from the `missingAccountNodes` list - ## have been magically added to the database since it was checked last time. - ## These nodes will me moved to `checkAccountNodes` for further processing. + ## Check whether previously missing nodes from the `missingNodes` list + ## have been magically added to the database since it was checked last + ## time. These nodes will me moved to `checkNodes` for further processing. let ctx = buddy.ctx peer = buddy.peer @@ -145,97 +156,155 @@ proc updateMissingNodesList(buddy: SnapBuddyRef) = var nodes: seq[Blob] - for accKey in env.missingAccountNodes: + when extraTraceMessages: + trace "Start accounts healing", peer, ctx=buddy.healingCtx() + + for accKey in env.fetchAccounts.missingNodes: let rc = ctx.data.snapDb.getAccountNodeKey(peer, stateRoot, accKey) if rc.isOk: # Check nodes for dangling links - env.checkAccountNodes.add acckey + env.fetchAccounts.checkNodes.add accKey else: # Node is still missing nodes.add acckey - env.missingAccountNodes = nodes + env.fetchAccounts.missingNodes = nodes -proc mergeIsolatedAccounts( - buddy: SnapBuddyRef; - paths: openArray[NodeKey]; - ): seq[AccountSlotsHeader] = - ## Process leaves found with nodes inspection, returns a list of - ## storage slots for these nodes. - let - ctx = buddy.ctx - peer = buddy.peer - env = buddy.data.pivotEnv - stateRoot = env.stateHeader.stateRoot - - # Remove reported leaf paths from the accounts interval - for accKey in paths: - let - pt = accKey.to(NodeTag) - ivSet = buddy.getCoveringLeafRangeSet(pt) - if not ivSet.isNil: - let - rc = ctx.data.snapDb.getAccountData(peer, stateRoot, accKey) - accountHash = Hash256(data: accKey.ByteArray32) - if rc.isOk: - let storageRoot = rc.value.storageRoot - when extraTraceMessages: - let stRootStr = if storageRoot != emptyRlpHash: $storageRoot - else: "emptyRlpHash" - trace "Registered isolated persistent account", peer, accountHash, - storageRoot=stRootStr - if storageRoot != emptyRlpHash: - result.add AccountSlotsHeader( - accHash: accountHash, - storageRoot: storageRoot) - buddy.commitLeafAccount(ivSet, pt) - env.nAccounts.inc - continue - - when extraTraceMessages: - let error = rc.error - trace "Get persistent account problem", peer, accountHash, error - - -proc fetchDanglingNodesList( - buddy: SnapBuddyRef - ): Result[TrieNodeStat,HexaryDbError] = +proc appendMoreDanglingNodesToMissingNodesList(buddy: SnapBuddyRef): bool = ## Starting with a given set of potentially dangling account nodes - ## `checkAccountNodes`, this set is filtered and processed. The outcome - ## is fed back to the vey same list `checkAccountNodes` + ## `checkNodes`, this set is filtered and processed. The outcome is + ## fed back to the vey same list `checkNodes` let ctx = buddy.ctx peer = buddy.peer env = buddy.data.pivotEnv stateRoot = env.stateHeader.stateRoot - maxLeaves = if env.checkAccountNodes.len == 0: 0 - else: maxHealingLeafPaths - rc = ctx.data.snapDb.inspectAccountsTrie( - peer, stateRoot, env.checkAccountNodes, maxLeaves) + peer, stateRoot, env.fetchAccounts.checkNodes) if rc.isErr: + when extraTraceMessages: + error "Accounts healing failed => stop", peer, + ctx=buddy.healingCtx(), error=rc.error # Attempt to switch peers, there is not much else we can do here buddy.ctrl.zombie = true - return err(rc.error) + return # Global/env batch list to be replaced by by `rc.value.leaves` return value - env.checkAccountNodes.setLen(0) + env.fetchAccounts.checkNodes.setLen(0) + env.fetchAccounts.missingNodes = + env.fetchAccounts.missingNodes & rc.value.dangling - # Store accounts leaves on the storage batch list. - let withStorage = buddy.mergeIsolatedAccounts(rc.value.leaves) - if 0 < withStorage.len: - discard env.fetchStorage.append SnapSlotQueueItemRef(q: withStorage) + true + + +proc getMissingNodesFromNetwork( + buddy: SnapBuddyRef; + ): Future[seq[Blob]] + {.async.} = + ## Extract from `missingNodes` the next batch of nodes that need + ## to be merged it into the database + let + ctx = buddy.ctx + peer = buddy.peer + env = buddy.data.pivotEnv + stateRoot = env.stateHeader.stateRoot + + nMissingNodes = env.fetchAccounts.missingNodes.len + inxLeft = max(0, nMissingNodes - maxTrieNodeFetch) + + # There is no point in processing too many nodes at the same time. So leave + # the rest on the `missingNodes` queue to be handled later. + let fetchNodes = env.fetchAccounts.missingNodes[inxLeft ..< nMissingNodes] + env.fetchAccounts.missingNodes.setLen(inxLeft) + + # Fetch nodes from the network. Note that the remainder of the `missingNodes` + # list might be used by another process that runs semi-parallel. + let rc = await buddy.getTrieNodes(stateRoot, fetchNodes.mapIt(@[it])) + if rc.isOk: + # Register unfetched missing nodes for the next pass + env.fetchAccounts.missingNodes = + env.fetchAccounts.missingNodes & rc.value.leftOver.mapIt(it[0]) + return rc.value.nodes + + # Restore missing nodes list now so that a task switch in the error checker + # allows other processes to access the full `missingNodes` list. + env.fetchAccounts.missingNodes = env.fetchAccounts.missingNodes & fetchNodes + + let error = rc.error + if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): + discard when extraTraceMessages: - trace "Accounts healing storage nodes", peer, - nAccounts=env.nAccounts, - covered=buddy.coverageInfo(), - nWithStorage=withStorage.len, - nDangling=rc.value.dangling + trace "Error fetching account nodes for healing => stop", peer, + ctx=buddy.healingCtx(), error + else: + discard + when extraTraceMessages: + trace "Error fetching account nodes for healing", peer, + ctx=buddy.healingCtx(), error - return ok(rc.value) + return @[] + + +proc kvAccountLeaf( + buddy: SnapBuddyRef; + partialPath: Blob; + node: Blob; + ): (bool,NodeKey,Account) + {.gcsafe, raises: [Defect,RlpError]} = + ## Read leaf node from persistent database (if any) + let + peer = buddy.peer + env = buddy.data.pivotEnv + + nodeRlp = rlpFromBytes node + (_,prefix) = hexPrefixDecode partialPath + (_,segment) = hexPrefixDecode nodeRlp.listElem(0).toBytes + nibbles = prefix & segment + if nibbles.len == 64: + let data = nodeRlp.listElem(1).toBytes + return (true, nibbles.getBytes.convertTo(NodeKey), rlp.decode(data,Account)) + + when extraTraceMessages: + trace "Isolated node path for healing => ignored", peer, + ctx=buddy.healingCtx() + + +proc registerAccountLeaf( + buddy: SnapBuddyRef; + accKey: NodeKey; + acc: Account) = + ## Process single account node as would be done with an interval by + ## the `storeAccounts()` functoon + let + peer = buddy.peer + env = buddy.data.pivotEnv + pt = accKey.to(NodeTag) + + # Find range set (from list) containing `pt` + var ivSet: NodeTagRangeSet + block foundCoveringRange: + for w in env.fetchAccounts.unprocessed: + if 0 < w.covered(pt,pt): + ivSet = w + break foundCoveringRange + return # already processed, forget this account leaf + + # Register this isolated leaf node that was added + env.nAccounts.inc + discard ivSet.reduce(pt,pt) + discard buddy.ctx.data.coveredAccounts.merge(pt,pt) + + # Update storage slots batch + if acc.storageRoot != emptyRlpHash: + env.fetchStorage.merge AccountSlotsHeader( + accHash: Hash256(data: accKey.ByteArray32), + storageRoot: acc.storageRoot) + + when extraTraceMessages: + trace "Isolated node for healing", peer, ctx=buddy.healingCtx(), accKey=pt # ------------------------------------------------------------------------------ # Public functions @@ -247,7 +316,6 @@ proc healAccountsDb*(buddy: SnapBuddyRef) {.async.} = ctx = buddy.ctx peer = buddy.peer env = buddy.data.pivotEnv - stateRoot = env.stateHeader.stateRoot # Only start healing if there is some completion level, already. # @@ -261,119 +329,68 @@ proc healAccountsDb*(buddy: SnapBuddyRef) {.async.} = if env.nAccounts == 0 or ctx.data.coveredAccounts.fullFactor < healAccountsTrigger: when extraTraceMessages: - trace "Accounts healing postponed", peer, - nAccounts=env.nAccounts, - covered=buddy.coverageInfo(), - nCheckAccountNodes=env.checkAccountNodes.len, - nMissingAccountNodes=env.missingAccountNodes.len + trace "Accounts healing postponed", peer, ctx=buddy.healingCtx() return - when extraTraceMessages: - trace "Start accounts healing", peer, - nAccounts=env.nAccounts, - covered=buddy.coverageInfo(), - nCheckAccountNodes=env.checkAccountNodes.len, - nMissingAccountNodes=env.missingAccountNodes.len - # Update for changes since last visit buddy.updateMissingNodesList() - # If `checkAccountNodes` is empty, healing is at the very start or - # was postponed in which case `missingAccountNodes` is non-empty. - var - nodesMissing: seq[Blob] # Nodes to process by this instance - nLeaves = 0 # For logging - if 0 < env.checkAccountNodes.len or env.missingAccountNodes.len == 0: - let rc = buddy.fetchDanglingNodesList() - if rc.isErr: - error "Accounts healing failed => stop", peer, - nAccounts=env.nAccounts, - covered=buddy.coverageInfo(), - nCheckAccountNodes=env.checkAccountNodes.len, - nMissingAccountNodes=env.missingAccountNodes.len, - error=rc.error + # If `checkNodes` is empty, healing is at the very start or was + # postponed in which case `missingNodes` is non-empty. + if env.fetchAccounts.checkNodes.len != 0 or + env.fetchAccounts.missingNodes.len == 0: + if not buddy.appendMoreDanglingNodesToMissingNodesList(): return - nodesMissing = rc.value.dangling - nLeaves = rc.value.leaves.len - # Check whether the trie is complete. - if nodesMissing.len == 0 and env.missingAccountNodes.len == 0: - when extraTraceMessages: - trace "Accounts healing complete", peer, - nAccounts=env.nAccounts, - covered=buddy.coverageInfo(), - nCheckAccountNodes=0, - nMissingAccountNodes=0, - nNodesMissing=0, - nLeaves + if env.fetchAccounts.missingNodes.len == 0: + trace "Accounts healing complete", peer, ctx=buddy.healingCtx() return # nothing to do - # Ok, clear global `env.missingAccountNodes` list and process `nodesMissing`. - nodesMissing = nodesMissing & env.missingAccountNodes - env.missingAccountNodes.setlen(0) + # Get next batch of nodes that need to be merged it into the database + let nodesData = await buddy.getMissingNodesFromNetwork() + if nodesData.len == 0: + return - # Fetch nodes, merge it into database and feed back results - while 0 < nodesMissing.len: - var fetchNodes: seq[Blob] - # There is no point in processing too many nodes at the same time. So - # leave the rest on the `nodesMissing` queue for a moment. - if maxTrieNodeFetch < nodesMissing.len: - let inxLeft = nodesMissing.len - maxTrieNodeFetch - fetchNodes = nodesMissing[inxLeft ..< nodesMissing.len] - nodesMissing.setLen(inxLeft) - else: - fetchNodes = nodesMissing - nodesMissing.setLen(0) - - when extraTraceMessages: - trace "Accounts healing loop", peer, - nAccounts=env.nAccounts, - covered=buddy.coverageInfo(), - nCheckAccountNodes=env.checkAccountNodes.len, - nMissingAccountNodes=env.missingAccountNodes.len, - nNodesMissing=nodesMissing.len, - nLeaves - - # Fetch nodes from the network - let dd = block: - let rc = await buddy.getTrieNodes(stateRoot, fetchNodes.mapIt(@[it])) - if rc.isErr: - env.missingAccountNodes = env.missingAccountNodes & fetchNodes - when extraTraceMessages: - trace "Error fetching account nodes for healing", peer, - nAccounts=env.nAccounts, - covered=buddy.coverageInfo(), - nCheckAccountNodes=env.checkAccountNodes.len, - nMissingAccountNodes=env.missingAccountNodes.len, - nNodesMissing=nodesMissing.len, - nLeaves, - error=rc.error - # Just run the next lap - continue - rc.value - - # Store to disk and register left overs for the next pass - block: - let rc = ctx.data.snapDb.importRawNodes(peer, dd.nodes) - if rc.isOk: - env.checkAccountNodes = env.checkAccountNodes & dd.leftOver.mapIt(it[0]) - elif 0 < rc.error.len and rc.error[^1][0] < 0: - # negative index => storage error - env.missingAccountNodes = env.missingAccountNodes & fetchNodes - else: - env.missingAccountNodes = env.missingAccountNodes & - dd.leftOver.mapIt(it[0]) & rc.error.mapIt(dd.nodes[it[0]]) - - # End while + # Store nodes to disk + let report = ctx.data.snapDb.importRawAccountNodes(peer, nodesData) + if 0 < report.len and report[^1].slot.isNone: + # Storage error, just run the next lap (not much else that can be done) + error "Accounts healing, error updating persistent database", peer, + ctx=buddy.healingCtx(), nNodes=nodesData.len, error=report[^1].error + env.fetchAccounts.missingNodes = env.fetchAccounts.missingNodes & nodesData + return when extraTraceMessages: - trace "Done accounts healing", peer, - nAccounts=env.nAccounts, - covered=buddy.coverageInfo(), - nCheckAccountNodes=env.checkAccountNodes.len, - nMissingAccountNodes=env.missingAccountNodes.len, - nLeaves + trace "Accounts healing, nodes merged into database", peer, + ctx=buddy.healingCtx(), nNodes=nodesData.len + + # Filter out error and leaf nodes + for w in report: + if w.slot.isSome: # non-indexed entries appear typically at the end, though + let + inx = w.slot.unsafeGet + nodePath = nodesData[inx] + + if w.error != NothingSerious or w.kind.isNone: + # error, try downloading again + env.fetchAccounts.missingNodes.add nodePath + + elif w.kind.unsafeGet != Leaf: + # re-check this node + env.fetchAccounts.checkNodes.add nodePath + + else: + # Node has been stored, double check + let (isLeaf, key, acc) = buddy.kvAccountLeaf(nodePath, nodesData[inx]) + if isLeaf: + # Update `uprocessed` registry, collect storage roots (if any) + buddy.registerAccountLeaf(key, acc) + else: + env.fetchAccounts.checkNodes.add nodePath + + when extraTraceMessages: + trace "Accounts healing job done", peer, ctx=buddy.healingCtx() # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/store_accounts.nim b/nimbus/sync/snap/worker/store_accounts.nim index 64ae92202..be292b051 100644 --- a/nimbus/sync/snap/worker/store_accounts.nim +++ b/nimbus/sync/snap/worker/store_accounts.nim @@ -1,4 +1,3 @@ - # Nimbus # Copyright (c) 2021 Status Research & Development GmbH # Licensed under either of @@ -38,7 +37,7 @@ import ../../sync_desc, ".."/[range_desc, worker_desc], ./com/[com_error, get_account_range], - ./db/snap_db + ./db/snapdb_accounts {.push raises: [Defect].} @@ -55,26 +54,26 @@ const proc withMaxLen( buddy: SnapBuddyRef; - iv: LeafRange; + iv: NodeTagRange; maxlen: UInt256; - ): LeafRange = + ): NodeTagRange = ## Reduce accounts interval to maximal size if 0 < iv.len and iv.len <= maxLen: iv else: - LeafRange.new(iv.minPt, iv.minPt + (maxLen - 1.u256)) + NodeTagRange.new(iv.minPt, iv.minPt + (maxLen - 1.u256)) # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ -proc getUnprocessed(buddy: SnapBuddyRef): Result[LeafRange,void] = +proc getUnprocessed(buddy: SnapBuddyRef): Result[NodeTagRange,void] = ## Fetch an interval from one of the account range lists. let env = buddy.data.pivotEnv accountRangeMax = high(UInt256) div buddy.ctx.buddiesMax.u256 - for ivSet in env.fetchAccounts: + for ivSet in env.fetchAccounts.unprocessed: let rc = ivSet.ge() if rc.isOk: let iv = buddy.withMaxLen(rc.value, accountRangeMax) @@ -83,15 +82,15 @@ proc getUnprocessed(buddy: SnapBuddyRef): Result[LeafRange,void] = err() -proc putUnprocessed(buddy: SnapBuddyRef; iv: LeafRange) = +proc putUnprocessed(buddy: SnapBuddyRef; iv: NodeTagRange) = ## Shortcut - discard buddy.data.pivotEnv.fetchAccounts[1].merge(iv) + discard buddy.data.pivotEnv.fetchAccounts.unprocessed[1].merge(iv) -proc delUnprocessed(buddy: SnapBuddyRef; iv: LeafRange) = +proc delUnprocessed(buddy: SnapBuddyRef; iv: NodeTagRange) = ## Shortcut - discard buddy.data.pivotEnv.fetchAccounts[1].reduce(iv) + discard buddy.data.pivotEnv.fetchAccounts.unprocessed[1].reduce(iv) -proc markGloballyProcessed(buddy: SnapBuddyRef; iv: LeafRange) = +proc markGloballyProcessed(buddy: SnapBuddyRef; iv: NodeTagRange) = ## Shortcut discard buddy.ctx.data.coveredAccounts.merge(iv) @@ -111,7 +110,8 @@ proc storeAccounts*(buddy: SnapBuddyRef) {.async.} = let iv = block: let rc = buddy.getUnprocessed() if rc.isErr: - trace "Currently no unprocessed accounts", peer, stateRoot + when extraTraceMessages: + trace "Currently no unprocessed accounts", peer, stateRoot return rc.value @@ -175,11 +175,11 @@ proc storeAccounts*(buddy: SnapBuddyRef) {.async.} = # End registerConsumed # Store accounts on the storage TODO list. - discard env.fetchStorage.append SnapSlotQueueItemRef(q: dd.withStorage) + env.fetchStorage.merge dd.withStorage when extraTraceMessages: - let withStorage = dd.withStorage.len - trace "Done fetching accounts", peer, stateRoot, nAccounts, withStorage, iv + trace "Done fetching accounts", peer, stateRoot, nAccounts, + withStorage=dd.withStorage.len, iv # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/store_storages.nim b/nimbus/sync/snap/worker/store_storages.nim index 0ee607168..94cd44183 100644 --- a/nimbus/sync/snap/worker/store_storages.nim +++ b/nimbus/sync/snap/worker/store_storages.nim @@ -31,12 +31,12 @@ import chronicles, chronos, eth/[common/eth_types, p2p], - stew/keyed_queue, + stew/[interval_set, keyed_queue], stint, ../../sync_desc, ".."/[range_desc, worker_desc], ./com/[com_error, get_storage_ranges], - ./db/snap_db + ./db/[hexary_error, snapdb_storage_slots] {.push raises: [Defect].} @@ -45,70 +45,49 @@ logScope: const extraTraceMessages = false or true - ## Enabled additional logging noise # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ -proc getNextSlotItem(buddy: SnapBuddyRef): Result[SnapSlotQueueItemRef,void] = - let env = buddy.data.pivotEnv - for w in env.fetchStorage.nextKeys: - # Make sure that this item was not fetched and rejected earlier - if w notin buddy.data.vetoSlots: - env.fetchStorage.del(w) - return ok(w) - err() - -proc fetchAndImportStorageSlots( - buddy: SnapBuddyRef; - reqSpecs: seq[AccountSlotsHeader]; - ): Future[Result[seq[SnapSlotQueueItemRef],ComError]] - {.async.} = - ## Fetch storage slots data from the network, store it on disk and - ## return data to process in the next cycle. +proc getNextSlotItems(buddy: SnapBuddyRef): seq[AccountSlotsHeader] = let - ctx = buddy.ctx - peer = buddy.peer env = buddy.data.pivotEnv - stateRoot = env.stateHeader.stateRoot - - # Get storage slots - var stoRange = block: - let rc = await buddy.getStorageRanges(stateRoot, reqSpecs) - if rc.isErr: - return err(rc.error) - rc.value - - if 0 < stoRange.data.storages.len: - # Verify/process data and save to disk - block: - let rc = ctx.data.snapDb.importStorages(peer, stoRange.data) - + (reqKey, reqData) = block: + let rc = env.fetchStorage.shift if rc.isErr: - # Push back parts of the error item - var once = false - for w in rc.error: - if 0 <= w[0]: - # Reset any partial requests by not copying the `firstSlot` field. - # So all the storage slots are re-fetched completely for this - # account. - stoRange.addLeftOver( - @[AccountSlotsHeader( - accHash: stoRange.data.storages[w[0]].account.accHash, - storageRoot: stoRange.data.storages[w[0]].account.storageRoot)], - forceNew = not once) - once = true - # Do not ask for the same entries again on this `peer` - if once: - buddy.data.vetoSlots.incl stoRange.leftOver[^1] + return + (rc.value.key, rc.value.data) - if rc.error[^1][0] < 0: - discard - # TODO: disk storage failed or something else happend, so what? + # Assemble first request + result.add AccountSlotsHeader( + accHash: reqData.accHash, + storageRoot: Hash256(data: reqKey)) - # Return the remaining part to be processed later - return ok(stoRange.leftOver) + # Check whether it comes with a sub-range + if not reqData.slots.isNil: + # Extract some interval and return single item request queue + for ivSet in reqData.slots.unprocessed: + let rc = ivSet.ge() + if rc.isOk: + + # Extraxt interval => done + result[0].subRange = some rc.value + discard ivSet.reduce rc.value + + # Puch back on batch queue unless it becomes empty + if not reqData.slots.unprocessed.isEmpty: + discard env.fetchStorage.unshift(reqKey, reqData) + return + + # Append more full requests to returned list + while result.len < maxStoragesFetch: + let rc = env.fetchStorage.shift + if rc.isErr: + return + result.add AccountSlotsHeader( + accHash: rc.value.data.accHash, + storageRoot: Hash256(data: rc.value.key)) # ------------------------------------------------------------------------------ # Public functions @@ -121,56 +100,73 @@ proc storeStorages*(buddy: SnapBuddyRef) {.async.} = peer = buddy.peer env = buddy.data.pivotEnv stateRoot = env.stateHeader.stateRoot - var - once = true # for logging # Fetch storage data and save it on disk. Storage requests are managed by # a request queue for handling partioal replies and re-fetch issues. For # all practical puroses, this request queue should mostly be empty. - while true: - # Pull out the next request item from the queue - let req = block: - let rc = buddy.getNextSlotItem() - if rc.isErr: - return # currently nothing to do - rc.value - when extraTraceMessages: - if once: - once = false - let nAccounts = 1 + env.fetchStorage.len - trace "Start fetching storage slotss", peer, - nAccounts, nVetoSlots=buddy.data.vetoSlots.len - - block: - # Fetch and store account storage slots. On success, the `rc` value will - # contain a list of left-over items to be re-processed. - let rc = await buddy.fetchAndImportStorageSlots(req.q) - if rc.isErr: - # Save accounts/storage list to be processed later, then stop - discard env.fetchStorage.append req - let error = rc.error - if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): - trace "Error fetching storage slots => stop", peer, error - discard - return - - # Reset error counts for detecting repeated timeouts - buddy.data.errors.nTimeouts = 0 - - for qLo in rc.value: - # Handle queue left-overs for processing in the next cycle - if qLo.q[0].firstSlot == Hash256() and 0 < env.fetchStorage.len: - # Appending to last queue item is preferred over adding new item - let item = env.fetchStorage.first.value - item.q = item.q & qLo.q - else: - # Put back as-is. - discard env.fetchStorage.append qLo - # End while + # Pull out the next request list from the queue + let req = buddy.getNextSlotItems() + if req.len == 0: + return # currently nothing to do when extraTraceMessages: - trace "Done fetching storage slots", peer, nAccounts=env.fetchStorage.len + trace "Start fetching storage slots", peer, + nSlots=env.fetchStorage.len, + nReq=req.len + + # Get storages slots data from the network + var stoRange = block: + let rc = await buddy.getStorageRanges(stateRoot, req) + if rc.isErr: + let error = rc.error + if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): + trace "Error fetching storage slots => stop", peer, + nSlots=env.fetchStorage.len, + nReq=req.len, + error + discard + env.fetchStorage.merge req + return + rc.value + + # Reset error counts for detecting repeated timeouts + buddy.data.errors.nTimeouts = 0 + + if 0 < stoRange.data.storages.len: + # Verify/process storages data and save it to disk + let report = ctx.data.snapDb.importStorages(peer, stoRange.data) + if 0 < report.len: + + if report[^1].slot.isNone: + # Failed to store on database, not much that can be done here + trace "Error writing storage slots to database", peer, + nSlots=env.fetchStorage.len, + nReq=req.len, + error=report[^1].error + env.fetchStorage.merge req + return + + # Push back error entries to be processed later + for w in report: + if w.slot.isSome: + let n = w.slot.unsafeGet + # if w.error in {RootNodeMismatch, RightBoundaryProofFailed}: + # ??? + trace "Error processing storage slots", peer, + nSlots=env.fetchStorage.len, + nReq=req.len, + nReqInx=n, + error=report[n].error + # Reset any partial requests to requesting the full interval. So + # all the storage slots are re-fetched completely for this account. + env.fetchStorage.merge AccountSlotsHeader( + accHash: stoRange.data.storages[n].account.accHash, + storageRoot: stoRange.data.storages[n].account.storageRoot) + + when extraTraceMessages: + trace "Done fetching storage slots", peer, + nSlots=env.fetchStorage.len # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker_desc.nim b/nimbus/sync/snap/worker_desc.nim index dd702de9a..7bcafd8b7 100644 --- a/nimbus/sync/snap/worker_desc.nim +++ b/nimbus/sync/snap/worker_desc.nim @@ -11,10 +11,10 @@ import std/[hashes, sequtils, strutils], eth/[common/eth_types, p2p], - stew/[byteutils, keyed_queue], + stew/[byteutils, interval_set, keyed_queue], "../.."/[constants, db/select_backend], ".."/[sync_desc, types], - ./worker/[com/com_error, db/snap_db, ticker], + ./worker/[com/com_error, db/snapdb_desc, ticker], ./range_desc {.push raises: [Defect].} @@ -58,6 +58,9 @@ const ## all account ranges retrieved for all pivot state roots (see ## `coveredAccounts` in `CtxData`.) + maxStoragesFetch* = 128 + ## Maximal number of storage tries to fetch with a signe message. + maxTrieNodeFetch* = 1024 ## Informal maximal number of trie nodes to fetch at once. This is nor ## an official limit but found on several implementations (e.g. geth.) @@ -81,36 +84,49 @@ const ## Internal size of LRU cache (for debugging) type - WorkerSeenBlocks = KeyedQueue[array[32,byte],BlockNumber] + WorkerSeenBlocks = KeyedQueue[ByteArray32,BlockNumber] ## Temporary for pretty debugging, `BlockHash` keyed lru cache - SnapSlotQueueItemRef* = ref object - ## Accounts storage request data. - q*: seq[AccountSlotsHeader] - - SnapSlotsQueue* = KeyedQueueNV[SnapSlotQueueItemRef] - ## Handles list of storage data for re-fetch. + SnapSlotsQueue* = KeyedQueue[ByteArray32,SnapSlotQueueItemRef] + ## Handles list of storage slots data for fetch indexed by storage root. ## - ## This construct is the is a nested queue rather than a flat one because - ## only the first element of a `seq[AccountSlotsHeader]` queue can have an - ## effective sub-range specification (later ones will be ignored.) + ## Typically, storage data requests cover the full storage slots trie. If + ## there is only a partial list of slots to fetch, the queue entry is + ## stored left-most for easy access. + + SnapSlotQueueItemRef* = ref object + ## Storage slots request data. This entry is similar to `AccountSlotsHeader` + ## where the optional `subRange` interval has been replaced by an interval + ## range + healing support. + accHash*: Hash256 ## Owner account, maybe unnecessary + slots*: SnapTrieRangeBatchRef ## slots to fetch, nil => all slots SnapSlotsSet* = HashSet[SnapSlotQueueItemRef] ## Ditto but without order, to be used as veto set - SnapAccountRanges* = array[2,LeafRangeSet] + SnapAccountRanges* = array[2,NodeTagRangeSet] ## Pair of account hash range lists. The first entry must be processed ## first. This allows to coordinate peers working on different state roots ## to avoid ovelapping accounts as long as they fetch from the first entry. + SnapTrieRangeBatch* = object + ## `NodeTag` ranges to fetch, healing support + unprocessed*: SnapAccountRanges ## Range of slots not covered, yet + checkNodes*: seq[Blob] ## Nodes with prob. dangling child links + missingNodes*: seq[Blob] ## Dangling links to fetch and merge + + SnapTrieRangeBatchRef* = ref SnapTrieRangeBatch + ## Referenced object, so it can be made optional for the storage + ## batch list + + SnapPivotRef* = ref object ## Per-state root cache for particular snap data environment stateHeader*: BlockHeader ## Pivot state, containg state root # Accounts download - fetchAccounts*: SnapAccountRanges ## Sets of accounts ranges to fetch - checkAccountNodes*: seq[Blob] ## Nodes with prob. dangling child links - missingAccountNodes*: seq[Blob] ## Dangling links to fetch and merge + fetchAccounts*: SnapTrieRangeBatch ## Set of accounts ranges to fetch + # vetoSlots*: SnapSlotsSet ## Do not ask for these slots, again accountsDone*: bool ## All accounts have been processed # Storage slots download @@ -130,7 +146,6 @@ type errors*: ComErrorStatsRef ## For error handling pivotFinder*: RootRef ## Opaque object reference for sub-module pivotEnv*: SnapPivotRef ## Environment containing state root - vetoSlots*: SnapSlotsSet ## Do not ask for these slots, again CtxData* = object ## Globally shared data extension @@ -140,7 +155,7 @@ type pivotTable*: SnapPivotTable ## Per state root environment pivotFinderCtx*: RootRef ## Opaque object reference for sub-module snapDb*: SnapDbRef ## Accounts snapshot DB - coveredAccounts*: LeafRangeSet ## Derived from all available accounts + coveredAccounts*: NodeTagRangeSet ## Derived from all available accounts # Info ticker*: TickerRef ## Ticker, logger @@ -167,6 +182,35 @@ proc hash*(a: Hash256): Hash = ## Table/KeyedQueue mixin a.data.hash +# ------------------------------------------------------------------------------ +# Public helpers +# ------------------------------------------------------------------------------ + +proc merge*(q: var SnapSlotsQueue; fetchReq: AccountSlotsHeader) = + ## Append/prepend a slot header record into the batch queue. + let reqKey = fetchReq.storageRoot.data + + if not q.hasKey(reqKey): + let reqData = SnapSlotQueueItemRef(accHash: fetchReq.accHash) + + # Only add non-existing entries + if fetchReq.subRange.isNone: + # Append full range to the right of the list + discard q.append(reqKey, reqData) + + else: + # Partial range, add healing support and interval + reqData.slots = SnapTrieRangeBatchRef() + for n in 0 ..< reqData.slots.unprocessed.len: + reqData.slots.unprocessed[n] = NodeTagRangeSet.init() + discard reqData.slots.unprocessed[0].merge(fetchReq.subRange.unsafeGet) + discard q.unshift(reqKey, reqData) + +proc merge*(q: var SnapSlotsQueue; reqList: openArray[AccountSlotsHeader]) = + ## Variant fof `merge()` for a list argument + for w in reqList: + q.merge w + # ------------------------------------------------------------------------------ # Public functions, debugging helpers (will go away eventually) # ------------------------------------------------------------------------------ diff --git a/tests/test_sync_snap.nim b/tests/test_sync_snap.nim index 4616a276e..fd9f58799 100644 --- a/tests/test_sync_snap.nim +++ b/tests/test_sync_snap.nim @@ -25,8 +25,9 @@ import ../nimbus/p2p/chain, ../nimbus/sync/types, ../nimbus/sync/snap/range_desc, - ../nimbus/sync/snap/worker/db/[hexary_desc, hexary_inspect, - rocky_bulk_load, snap_db,], + ../nimbus/sync/snap/worker/db/[ + hexary_desc, hexary_error, hexary_inspect, rocky_bulk_load, + snapdb_accounts, snapdb_desc, snapdb_storage_slots], ../nimbus/utils/prettify, ./replay/[pp, undump_blocks, undump_accounts, undump_storages], ./test_sync_snap/[bulk_test_xx, snap_test_xx, test_types] @@ -86,6 +87,12 @@ var proc isOk(rc: ValidationResult): bool = rc == ValidationResult.OK +proc toStoDbRc(r: seq[HexaryNodeReport]): Result[void,seq[(int,HexaryDbError)]]= + ## Kludge: map error report to (older version) return code + if r.len != 0: + return err(r.mapIt((it.slot.get(otherwise = -1),it.error))) + ok() + proc findFilePath(file: string; baseDir, repoDir: openArray[string]): Result[string,void] = for dir in baseDir: @@ -115,11 +122,8 @@ proc pp(rc: Result[Account,HexaryDbError]): string = proc pp(rc: Result[Hash256,HexaryDbError]): string = if rc.isErr: $rc.error else: $rc.value.to(NodeTag) -proc pp( - rc: Result[TrieNodeStat,HexaryDbError]; - db: SnapDbSessionRef - ): string = - if rc.isErr: $rc.error else: rc.value.pp(db.getAcc) +proc pp(rc: Result[TrieNodeStat,HexaryDbError]; db: SnapDbBaseRef): string = + if rc.isErr: $rc.error else: rc.value.pp(db.hexaDb) proc ppKvPc(w: openArray[(string,int)]): string = w.mapIt(&"{it[0]}={it[1]}%").join(", ") @@ -284,21 +288,21 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) = suite &"SyncSnap: {fileInfo} accounts and proofs for {info}": var - desc: SnapDbSessionRef + desc: SnapDbAccountsRef accKeys: seq[Hash256] test &"Snap-proofing {accountsList.len} items for state root ..{root.pp}": let dbBase = if persistent: SnapDbRef.init(db.cdb[0]) else: SnapDbRef.init(newMemoryDB()) - dbDesc = SnapDbSessionRef.init(dbBase, root, peer) + dbDesc = SnapDbAccountsRef.init(dbBase, root, peer) for n,w in accountsList: check dbDesc.importAccounts(w.base, w.data, persistent) == OkHexDb test &"Merging {accountsList.len} proofs for state root ..{root.pp}": let dbBase = if persistent: SnapDbRef.init(db.cdb[1]) else: SnapDbRef.init(newMemoryDB()) - desc = SnapDbSessionRef.init(dbBase, root, peer) + desc = SnapDbAccountsRef.init(dbBase, root, peer) # Load/accumulate data from several samples (needs some particular sort) let @@ -419,17 +423,15 @@ proc storagesRunner( let dbBase = if persistent: SnapDbRef.init(db.cdb[0]) else: SnapDbRef.init(newMemoryDB()) - var - desc = SnapDbSessionRef.init(dbBase, root, peer) test &"Merging {accountsList.len} accounts for state root ..{root.pp}": for w in accountsList: - let desc = SnapDbSessionRef.init(dbBase, root, peer) + let desc = SnapDbAccountsRef.init(dbBase, root, peer) check desc.importAccounts(w.base, w.data, persistent) == OkHexDb test &"Merging {storagesList.len} storages lists": let - dbDesc = SnapDbSessionRef.init(dbBase, root, peer) + dbDesc = SnapDbStorageSlotsRef.init(dbBase, peer=peer) ignore = knownFailures.toTable for n,w in storagesList: let @@ -438,8 +440,7 @@ proc storagesRunner( Result[void,seq[(int,HexaryDbError)]].err(ignore[testId]) else: OkStoDb - check dbDesc.importStorages(w.data, persistent) == expRc - + check dbDesc.importStorages(w.data, persistent).toStoDbRc == expRc proc inspectionRunner( noisy = true; @@ -467,17 +468,17 @@ proc inspectionRunner( suite &"SyncSnap: inspect {fileInfo} lists for {info} for healing": let memBase = SnapDbRef.init(newMemoryDB()) - memDesc = SnapDbSessionRef.init(memBase, Hash256(), peer) + memDesc = SnapDbAccountsRef.init(memBase, Hash256(), peer) var singleStats: seq[(int,TrieNodeStat)] accuStats: seq[(int,TrieNodeStat)] perBase,altBase: SnapDbRef - perDesc,altDesc: SnapDbSessionRef + perDesc,altDesc: SnapDbAccountsRef if persistent: perBase = SnapDbRef.init(db.cdb[0]) - perDesc = SnapDbSessionRef.init(perBase, Hash256(), peer) + perDesc = SnapDbAccountsRef.init(perBase, Hash256(), peer) altBase = SnapDbRef.init(db.cdb[1]) - altDesc = SnapDbSessionRef.init(altBase, Hash256(), peer) + altDesc = SnapDbAccountsRef.init(altBase, Hash256(), peer) test &"Fingerprinting {inspectList.len} single accounts lists " & "for in-memory-db": @@ -486,17 +487,17 @@ proc inspectionRunner( let root = accList[0].root rootKey = root.to(NodeKey) - desc = SnapDbSessionRef.init(memBase, root, peer) + desc = SnapDbAccountsRef.init(memBase, root, peer) for w in accList: check desc.importAccounts(w.base, w.data, persistent=false) == OkHexDb let rc = desc.inspectAccountsTrie(persistent=false) check rc.isOk let dangling = rc.value.dangling - keys = desc.getAcc.hexaryInspectToKeys( + keys = desc.hexaDb.hexaryInspectToKeys( rootKey, dangling.toHashSet.toSeq) check dangling.len == keys.len - singleStats.add (desc.getAcc.tab.len,rc.value) + singleStats.add (desc.hexaDb.tab.len,rc.value) test &"Fingerprinting {inspectList.len} single accounts lists " & "for persistent db": @@ -511,14 +512,14 @@ proc inspectionRunner( root = accList[0].root rootKey = root.to(NodeKey) dbBase = SnapDbRef.init(db.cdb[2+n]) - desc = SnapDbSessionRef.init(dbBase, root, peer) + desc = SnapDbAccountsRef.init(dbBase, root, peer) for w in accList: check desc.importAccounts(w.base, w.data, persistent) == OkHexDb let rc = desc.inspectAccountsTrie(persistent=false) check rc.isOk let dangling = rc.value.dangling - keys = desc.getAcc.hexaryInspectToKeys( + keys = desc.hexaDb.hexaryInspectToKeys( rootKey, dangling.toHashSet.toSeq) check dangling.len == keys.len # Must be the same as the in-memory fingerprint @@ -538,10 +539,10 @@ proc inspectionRunner( check rc.isOk let dangling = rc.value.dangling - keys = desc.getAcc.hexaryInspectToKeys( + keys = desc.hexaDb.hexaryInspectToKeys( rootKey, dangling.toHashSet.toSeq) check dangling.len == keys.len - accuStats.add (desc.getAcc.tab.len,rc.value) + accuStats.add (desc.hexaDb.tab.len,rc.value) test &"Fingerprinting {inspectList.len} accumulated accounts lists " & "for persistent db": @@ -561,7 +562,7 @@ proc inspectionRunner( check rc.isOk let dangling = rc.value.dangling - keys = desc.getAcc.hexaryInspectToKeys( + keys = desc.hexaDb.hexaryInspectToKeys( rootKey, dangling.toHashSet.toSeq) check dangling.len == keys.len check accuStats[n][1] == rc.value @@ -573,7 +574,7 @@ proc inspectionRunner( else: let cscBase = SnapDbRef.init(newMemoryDB()) - cscDesc = SnapDbSessionRef.init(cscBase, Hash256(), peer) + cscDesc = SnapDbAccountsRef.init(cscBase, Hash256(), peer) var cscStep: Table[NodeKey,(int,seq[Blob])] for n,accList in inspectList: