Snap sync interval complement method to speed up trie perusal (#1328)
* Add quick hexary trie inspector, called `dismantle()` why: + Full hexary trie perusal is slow if running down leaf nodes + For known range of leaf nodes, work out the UInt126-complement of partial sub-trie paths (for existing nodes). The result should cover no (or only a few) sub-tries with leaf nodes. * Extract common healing methods => `sub_tries_helper.nim` details: Also apply quick hexary trie inspection tool `dismantle()` Replace `inspectAccountsTrie()` wrapper by `hexaryInspectTrie()` * Re-arrange task dispatching in main peer worker * Refactor accounts and storage slots downloaders * Rename `HexaryDbError` => `HexaryError`
This commit is contained in:
parent
bc3f164b97
commit
44a57496d9
|
@ -83,7 +83,7 @@ const
|
|||
## nodes to allow for a pseudo -task switch.
|
||||
|
||||
|
||||
healAccountsTrigger* = 0.99
|
||||
healAccountsCoverageTrigger* = 0.999
|
||||
## Apply accounts healing if the global snap download coverage factor
|
||||
## exceeds this setting. The global coverage factor is derived by merging
|
||||
## all account ranges retrieved for all pivot state roots (see
|
||||
|
@ -95,6 +95,23 @@ const
|
|||
## over the network. More requests might be a disadvantage if peers only
|
||||
## serve a maximum number requests (rather than data.)
|
||||
|
||||
healAccountsPivotTriggerMinFactor* = 0.17
|
||||
## Additional condition to meed before starting healing. The current
|
||||
## pivot must have at least this much processed as recorded in the
|
||||
## `processed` ranges set. This is the minimim value (see below.)
|
||||
|
||||
healAccountsPivotTriggerWeight* = 0.01
|
||||
healAccountsPivotTriggerNMax* = 10
|
||||
## Enable healing not before the `processed` ranges set fill factor has
|
||||
## at least the following value.
|
||||
## ::
|
||||
## MinFactor + max(0, NMax - pivotTable.len) * Weight
|
||||
##
|
||||
## (the `healAccountsPivotTrigger` prefix of the constant names is ommited.)
|
||||
##
|
||||
## This effects in favouring late healing when more pivots have been
|
||||
## downloaded.
|
||||
|
||||
healAccountsBatchFetchMax* = 10 * 1024
|
||||
## Keep on gloing in healing task up until this many nodes have been
|
||||
## fetched from the network or some error contition terminates the task.
|
||||
|
@ -142,7 +159,7 @@ const
|
|||
## Set 0 to disable.
|
||||
|
||||
static:
|
||||
doAssert healAccountsTrigger < 1.0 # larger values make no sense
|
||||
doAssert healAccountsCoverageTrigger < 1.0 # larger values make no sense
|
||||
doAssert snapStorageSlotsQuPrioThresh < snapAccountsSaveStorageSlotsMax
|
||||
doAssert snapStorageSlotsFetchMax < healAccountsBatchFetchMax
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@
|
|||
|
||||
import
|
||||
std/[math, sequtils, strutils, hashes],
|
||||
eth/[common, trie/nibbles],
|
||||
eth/common,
|
||||
stew/[byteutils, interval_set],
|
||||
stint,
|
||||
../../constants,
|
||||
|
@ -82,27 +82,6 @@ type
|
|||
account*: AccountSlotsHeader
|
||||
data*: seq[SnapStorage]
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc padPartialPath(partialPath: NibblesSeq; dblNibble: byte): NodeKey =
|
||||
## Extend (or cut) `partialPath` nibbles sequence and generate `NodeKey`
|
||||
# Pad with zeroes
|
||||
var padded: NibblesSeq
|
||||
|
||||
let padLen = 64 - partialPath.len
|
||||
if 0 <= padLen:
|
||||
padded = partialPath & dblNibble.repeat(padlen div 2).initNibbleRange
|
||||
if (padLen and 1) == 1:
|
||||
padded = padded & @[dblNibble].initNibbleRange.slice(1)
|
||||
else:
|
||||
let nope = seq[byte].default.initNibbleRange
|
||||
padded = partialPath.slice(0,63) & nope # nope forces re-alignment
|
||||
|
||||
let bytes = padded.getBytes
|
||||
(addr result.ByteArray32[0]).copyMem(unsafeAddr bytes[0], bytes.len)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -139,12 +118,6 @@ proc to*(n: SomeUnsignedInt|UInt256; T: type NodeTag): T =
|
|||
## Syntactic sugar
|
||||
n.u256.T
|
||||
|
||||
proc min*(partialPath: Blob; T: type NodeKey): T =
|
||||
(hexPrefixDecode partialPath)[1].padPartialPath(0)
|
||||
|
||||
proc max*(partialPath: Blob; T: type NodeKey): T =
|
||||
(hexPrefixDecode partialPath)[1].padPartialPath(0xff)
|
||||
|
||||
proc digestTo*(data: Blob; T: type NodeKey): T =
|
||||
keccakHash(data).data.T
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ import
|
|||
".."/[protocol, sync_desc],
|
||||
./worker/[pivot_helper, ticker],
|
||||
./worker/com/com_error,
|
||||
./worker/db/[hexary_desc, snapdb_check, snapdb_desc, snapdb_pivot],
|
||||
./worker/db/[hexary_desc, snapdb_desc, snapdb_pivot],
|
||||
"."/[constants, range_desc, worker_desc]
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
@ -221,10 +221,6 @@ proc runDaemon*(ctx: SnapCtxRef) {.async.} =
|
|||
ctx.data.ticker.stopRecovery()
|
||||
return
|
||||
|
||||
# Update logging
|
||||
if not ctx.data.ticker.isNil:
|
||||
ctx.data.ticker.stopRecovery()
|
||||
|
||||
|
||||
proc runSingle*(buddy: SnapBuddyRef) {.async.} =
|
||||
## Enabled while
|
||||
|
@ -249,38 +245,6 @@ proc runPool*(buddy: SnapBuddyRef, last: bool): bool =
|
|||
ctx.poolMode = false
|
||||
result = true
|
||||
|
||||
block:
|
||||
let rc = ctx.data.pivotTable.lastValue
|
||||
if rc.isOk:
|
||||
|
||||
# Check whether last pivot accounts and storage are complete.
|
||||
let
|
||||
env = rc.value
|
||||
peer = buddy.peer
|
||||
pivot = "#" & $env.stateHeader.blockNumber # for logging
|
||||
|
||||
if not env.storageDone:
|
||||
|
||||
# Check whether accounts download is complete
|
||||
if env.fetchAccounts.unprocessed.isEmpty():
|
||||
|
||||
# FIXME: This check might not be needed. It will visit *every* node
|
||||
# in the hexary trie for checking the account leaves.
|
||||
#
|
||||
# Note: This is insane on main net
|
||||
if buddy.checkAccountsTrieIsComplete(env):
|
||||
env.accountsState = HealerDone
|
||||
|
||||
# Check whether storage slots are complete
|
||||
if env.fetchStorageFull.len == 0 and
|
||||
env.fetchStoragePart.len == 0:
|
||||
env.storageDone = true
|
||||
|
||||
when extraTraceMessages:
|
||||
trace "Checked for pivot DB completeness", peer, pivot,
|
||||
nAccounts=env.nAccounts, accountsState=env.accountsState,
|
||||
nSlotLists=env.nSlotLists, storageDone=env.storageDone
|
||||
|
||||
|
||||
proc runMulti*(buddy: SnapBuddyRef) {.async.} =
|
||||
## Enabled while
|
||||
|
@ -317,7 +281,10 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} =
|
|||
ctx.data.pivotTable.beforeTopMostlyClean()
|
||||
|
||||
# This one is the syncing work horse which downloads the database
|
||||
let syncActionContinue = await env.execSnapSyncAction(buddy)
|
||||
await env.execSnapSyncAction(buddy)
|
||||
|
||||
if env.obsolete:
|
||||
return # pivot has changed
|
||||
|
||||
# Save state so sync can be partially resumed at next start up
|
||||
let
|
||||
|
@ -337,29 +304,8 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} =
|
|||
nAccounts=env.nAccounts, nSlotLists=env.nSlotLists,
|
||||
processed, nStoQu, blobSize=rc.value
|
||||
|
||||
if not syncActionContinue:
|
||||
return
|
||||
|
||||
# Check whether there are more accounts to fetch.
|
||||
#
|
||||
# Note that some other process might have temporarily borrowed from the
|
||||
# `fetchAccounts.unprocessed` list. Whether we are done can only be decided
|
||||
# if only a single buddy is active. S be it.
|
||||
if env.fetchAccounts.unprocessed.isEmpty():
|
||||
|
||||
# Debugging log: analyse pivot against database
|
||||
warn "Analysing accounts database -- might be slow", peer, pivot
|
||||
discard buddy.checkAccountsListOk(env)
|
||||
|
||||
# Check whether pivot download is complete.
|
||||
if env.fetchStorageFull.len == 0 and
|
||||
env.fetchStoragePart.len == 0:
|
||||
trace "Running pool mode for verifying completeness", peer, pivot
|
||||
buddy.ctx.poolMode = true
|
||||
|
||||
# Debugging log: analyse pivot against database
|
||||
warn "Analysing storage slots database -- might be slow", peer, pivot
|
||||
discard buddy.checkStorageSlotsTrieIsComplete(env)
|
||||
if buddy.ctrl.stopped:
|
||||
return # peer worker has gone
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
|
|
@ -165,7 +165,7 @@ type
|
|||
slot*: Option[int] ## May refer to indexed argument slots
|
||||
kind*: Option[NodeKind] ## Node type (if any)
|
||||
dangling*: seq[NodeSpecs] ## Missing inner sub-tries
|
||||
error*: HexaryDbError ## Error code, or `NothingSerious`
|
||||
error*: HexaryError ## Error code, or `NothingSerious`
|
||||
|
||||
const
|
||||
EmptyNodeBlob* = seq[byte].default
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
# except according to those terms.
|
||||
|
||||
type
|
||||
HexaryDbError* = enum
|
||||
HexaryError* = enum
|
||||
NothingSerious = 0
|
||||
|
||||
AccountNotFound
|
||||
|
@ -22,6 +22,7 @@ type
|
|||
SlotsNotSrictlyIncreasing
|
||||
TrieLoopAlert
|
||||
TrieIsEmpty
|
||||
TrieIsLockedForPerusal
|
||||
TooManyProcessedChunks
|
||||
TooManySlotAccounts
|
||||
|
||||
|
|
|
@ -139,7 +139,7 @@ proc processLink(
|
|||
inspect: var seq[(NodeKey,NibblesSeq)];
|
||||
trail: NibblesSeq;
|
||||
child: Rlp;
|
||||
) {.gcsafe, raises: [Defect,RlpError,KeyError]} =
|
||||
) {.gcsafe, raises: [Defect,RlpError]} =
|
||||
## Ditto
|
||||
if not child.isEmpty:
|
||||
let childBlob = child.toBytes
|
||||
|
@ -161,6 +161,27 @@ proc processLink(
|
|||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc to*(resumeCtx: TrieNodeStatCtxRef; T: type seq[NodeSpecs]): T =
|
||||
## Convert resumption context to nodes that can be used otherwise. This
|
||||
## function might be useful for error recovery.
|
||||
##
|
||||
## Note: In a non-persistant case, temporary `RepairKey` type node specs
|
||||
## that cannot be converted to `NodeKey` type nodes are silently dropped.
|
||||
## This should be no problem as a hexary trie with `RepairKey` type node
|
||||
## refs must be repaired or discarded anyway.
|
||||
if resumeCtx.persistent:
|
||||
for (key,trail) in resumeCtx.hddCtx:
|
||||
result.add NodeSpecs(
|
||||
partialPath: trail.hexPrefixEncode(isLeaf = false),
|
||||
nodeKey: key)
|
||||
else:
|
||||
for (key,trail) in resumeCtx.memCtx:
|
||||
if key.isNodeKey:
|
||||
result.add NodeSpecs(
|
||||
partialPath: trail.hexPrefixEncode(isLeaf = false),
|
||||
nodeKey: key.convertTo(NodeKey))
|
||||
|
||||
|
||||
proc hexaryInspectPath*(
|
||||
db: HexaryTreeDbRef; ## Database
|
||||
root: NodeKey; ## State root
|
||||
|
@ -206,7 +227,7 @@ proc hexaryInspectToKeys*(
|
|||
proc hexaryInspectTrie*(
|
||||
db: HexaryTreeDbRef; ## Database
|
||||
root: NodeKey; ## State root
|
||||
paths: seq[Blob]; ## Starting paths for search
|
||||
paths: seq[Blob] = @[]; ## Starting paths for search
|
||||
resumeCtx: TrieNodeStatCtxRef = nil; ## Context for resuming inspection
|
||||
suspendAfter = high(uint64); ## To be resumed
|
||||
stopAtLevel = 64; ## Instead of loop detector
|
||||
|
@ -309,12 +330,12 @@ proc hexaryInspectTrie*(
|
|||
proc hexaryInspectTrie*(
|
||||
getFn: HexaryGetFn; ## Database abstraction
|
||||
rootKey: NodeKey; ## State root
|
||||
paths: seq[Blob]; ## Starting paths for search
|
||||
paths: seq[Blob] = @[]; ## Starting paths for search
|
||||
resumeCtx: TrieNodeStatCtxRef = nil; ## Context for resuming inspection
|
||||
suspendAfter = high(uint64); ## To be resumed
|
||||
stopAtLevel = 64; ## Instead of loop detector
|
||||
): TrieNodeStat
|
||||
{.gcsafe, raises: [Defect,RlpError,KeyError]} =
|
||||
{.gcsafe, raises: [Defect,RlpError]} =
|
||||
## Variant of `hexaryInspectTrie()` for persistent database.
|
||||
when extraTraceMessages:
|
||||
let nPaths = paths.len
|
||||
|
|
|
@ -42,7 +42,7 @@ proc pp(w: seq[RPathXStep]; db: HexaryTreeDbRef; indent = 4): string =
|
|||
let pfx = "\n" & " ".repeat(indent)
|
||||
w.mapIt(it.pp(db)).join(pfx)
|
||||
|
||||
proc pp(rc: Result[TrieNodeStat, HexaryDbError]; db: HexaryTreeDbRef): string =
|
||||
proc pp(rc: Result[TrieNodeStat, HexaryError]; db: HexaryTreeDbRef): string =
|
||||
if rc.isErr: $rc.error else: rc.value.pp(db)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -524,7 +524,7 @@ proc hexaryInterpolate*(
|
|||
rootKey: NodeKey; ## Root node hash
|
||||
dbItems: var seq[RLeafSpecs]; ## List of path and leaf items
|
||||
bootstrap = false; ## Can create root node on-the-fly
|
||||
): Result[void,HexaryDbError]
|
||||
): Result[void,HexaryError]
|
||||
{.gcsafe, raises: [Defect,KeyError]} =
|
||||
## From the argument list `dbItems`, leaf nodes will be added to the hexary
|
||||
## trie while interpolating the path for the leaf nodes by adding missing
|
||||
|
|
|
@ -11,8 +11,9 @@
|
|||
## Find node paths in hexary tries.
|
||||
|
||||
import
|
||||
std/[tables],
|
||||
std/[algorithm, sequtils, tables],
|
||||
eth/[common, trie/nibbles],
|
||||
stew/[byteutils, interval_set],
|
||||
../../range_desc,
|
||||
./hexary_desc
|
||||
|
||||
|
@ -29,6 +30,16 @@ proc pp(w: Blob; db: HexaryTreeDbRef): string =
|
|||
# Private helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc `==`(a, b: XNodeObj): bool =
|
||||
if a.kind == b.kind:
|
||||
case a.kind:
|
||||
of Leaf:
|
||||
return a.lPfx == b.lPfx and a.lData == b.lData
|
||||
of Extension:
|
||||
return a.ePfx == b.ePfx and a.eLink == b.eLink
|
||||
of Branch:
|
||||
return a.bLink == b.bLink
|
||||
|
||||
proc getNibblesImpl(path: XPath|RPath; start = 0): NibblesSeq =
|
||||
## Re-build the key path
|
||||
for n in start ..< path.path.len:
|
||||
|
@ -42,6 +53,19 @@ proc getNibblesImpl(path: XPath|RPath; start = 0): NibblesSeq =
|
|||
result = result & it.node.lPfx
|
||||
result = result & path.tail
|
||||
|
||||
proc getNibblesImpl(path: XPath|RPath; start, maxLen: int): NibblesSeq =
|
||||
## Variant of `getNibblesImpl()` for partial rebuild
|
||||
for n in start ..< min(path.path.len, maxLen):
|
||||
let it = path.path[n]
|
||||
case it.node.kind:
|
||||
of Branch:
|
||||
result = result & @[it.nibble.byte].initNibbleRange.slice(1)
|
||||
of Extension:
|
||||
result = result & it.node.ePfx
|
||||
of Leaf:
|
||||
result = result & it.node.lPfx
|
||||
|
||||
|
||||
proc toBranchNode(
|
||||
rlp: Rlp
|
||||
): XNodeObj
|
||||
|
@ -88,6 +112,24 @@ proc `<`(a, b: NibblesSeq): bool =
|
|||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc padPartialPath(pfx: NibblesSeq; dblNibble: byte): NodeKey =
|
||||
## Extend (or cut) `partialPath` nibbles sequence and generate `NodeKey`
|
||||
# Pad with zeroes
|
||||
var padded: NibblesSeq
|
||||
|
||||
let padLen = 64 - pfx.len
|
||||
if 0 <= padLen:
|
||||
padded = pfx & dblNibble.repeat(padlen div 2).initNibbleRange
|
||||
if (padLen and 1) == 1:
|
||||
padded = padded & @[dblNibble].initNibbleRange.slice(1)
|
||||
else:
|
||||
let nope = seq[byte].default.initNibbleRange
|
||||
padded = pfx.slice(0,63) & nope # nope forces re-alignment
|
||||
|
||||
let bytes = padded.getBytes
|
||||
(addr result.ByteArray32[0]).copyMem(unsafeAddr bytes[0], bytes.len)
|
||||
|
||||
|
||||
proc pathExtend(
|
||||
path: RPath;
|
||||
key: RepairKey;
|
||||
|
@ -405,6 +447,82 @@ proc pathMost(
|
|||
# End while
|
||||
# Notreached
|
||||
|
||||
|
||||
proc dismantleLeft(envPt, ivPt: RPath|XPath): Result[seq[Blob],void] =
|
||||
## Helper for `dismantle()` for handling left side of envelope
|
||||
#
|
||||
# partialPath
|
||||
# / \
|
||||
# / \
|
||||
# / \
|
||||
# / \
|
||||
# envPt.. -- envelope of partial path
|
||||
# |
|
||||
# ivPt.. -- `iv`, not fully covering left of `env`
|
||||
#
|
||||
var collect: seq[Blob]
|
||||
block leftCurbEnvelope:
|
||||
for n in 0 ..< min(envPt.path.len, ivPt.path.len):
|
||||
if envPt.path[n] != ivPt.path[n]:
|
||||
#
|
||||
# At this point, the `node` entries of either `path[n]` step are
|
||||
# the same. This is so because the predecessor steps were the same
|
||||
# or were the `rootKey` in case n == 0.
|
||||
#
|
||||
# But then (`node` entries being equal) the only way for the
|
||||
# `path[n]` steps to differ is in the entry selector `nibble` for
|
||||
# a branch node.
|
||||
#
|
||||
for m in n ..< ivPt.path.len:
|
||||
let
|
||||
pfx = ivPt.getNibblesImpl(0,m) # common path segment
|
||||
top = ivPt.path[m].nibble # need nibbles smaller than top
|
||||
#
|
||||
# Incidentally for a non-`Branch` node, the value `top` becomes
|
||||
# `-1` and the `for`- loop will be ignored (which is correct)
|
||||
for nibble in 0 ..< top:
|
||||
collect.add hexPrefixEncode(
|
||||
pfx & @[nibble.byte].initNibbleRange.slice(1), isLeaf=false)
|
||||
break leftCurbEnvelope
|
||||
#
|
||||
# Fringe case, e.g. when `partialPath` is an empty prefix (aka `@[0]`)
|
||||
# and the database has a single leaf node `(a,some-value)` where the
|
||||
# `rootKey` is the hash of this node. In that case, `pMin == 0` and
|
||||
# `pMax == high(NodeTag)` and `iv == [a,a]`.
|
||||
#
|
||||
return err()
|
||||
|
||||
ok(collect)
|
||||
|
||||
proc dismantleRight(envPt, ivPt: RPath|XPath): Result[seq[Blob],void] =
|
||||
## Helper for `dismantle()` for handling right side of envelope
|
||||
#
|
||||
# partialPath
|
||||
# / \
|
||||
# / \
|
||||
# / \
|
||||
# / \
|
||||
# .. envPt -- envelope of partial path
|
||||
# |
|
||||
# .. ivPt -- `iv`, not fully covering right of `env`
|
||||
#
|
||||
var collect: seq[Blob]
|
||||
block rightCurbEnvelope:
|
||||
for n in 0 ..< min(envPt.path.len, ivPt.path.len):
|
||||
if envPt.path[n] != ivPt.path[n]:
|
||||
for m in n ..< ivPt.path.len:
|
||||
let
|
||||
pfx = ivPt.getNibblesImpl(0,m) # common path segment
|
||||
base = ivPt.path[m].nibble # need nibbles greater/equal
|
||||
if 0 <= base:
|
||||
for nibble in base+1 .. 15:
|
||||
collect.add hexPrefixEncode(
|
||||
pfx & @[nibble.byte].initNibbleRange.slice(1), isLeaf=false)
|
||||
break rightCurbEnvelope
|
||||
return err()
|
||||
|
||||
ok(collect)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -437,6 +555,45 @@ proc leafData*(path: RPath): Blob =
|
|||
of Extension:
|
||||
discard
|
||||
|
||||
proc pathEnvelope*(partialPath: Blob): NodeTagRange =
|
||||
## Convert partial path to range of all keys starting with this
|
||||
## partial path
|
||||
let pfx = (hexPrefixDecode partialPath)[1]
|
||||
NodeTagRange.new(
|
||||
pfx.padPartialPath(0).to(NodeTag),
|
||||
pfx.padPartialPath(255).to(NodeTag))
|
||||
|
||||
proc pathSortUniq*(
|
||||
partialPaths: openArray[Blob];
|
||||
): seq[Blob]
|
||||
{.gcsafe, raises: [Defect,KeyError]} =
|
||||
## Sort and simplify a list of partial paths by removoing nested entries.
|
||||
|
||||
var tab: Table[NodeTag,(Blob,bool)]
|
||||
for w in partialPaths:
|
||||
let iv = w.pathEnvelope
|
||||
tab[iv.minPt] = (w,true) # begin entry
|
||||
tab[iv.maxPt] = (@[],false) # end entry
|
||||
|
||||
# When sorted, nested entries look like
|
||||
#
|
||||
# 123000000.. (w0, true)
|
||||
# 123400000.. (w1, true)
|
||||
# 1234fffff.. (, false)
|
||||
# 123ffffff.. (, false)
|
||||
# ...
|
||||
# 777000000.. (w2, true)
|
||||
#
|
||||
var level = 0
|
||||
for key in toSeq(tab.keys).sorted(cmp):
|
||||
let (w,begin) = tab[key]
|
||||
if begin:
|
||||
if level == 0:
|
||||
result.add w
|
||||
level.inc
|
||||
else:
|
||||
level.dec
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -658,6 +815,97 @@ proc prev*(
|
|||
if minDepth <= newPath.depth and 0 < newPath.leafData.len:
|
||||
return newPath
|
||||
|
||||
|
||||
proc dismantle*(
|
||||
partialPath: Blob; ## Patrial path for existing node
|
||||
rootKey: NodeKey; ## State root
|
||||
iv: NodeTagRange; ## Proofed range of leaf paths
|
||||
db: HexaryTreeDbRef; ## Database
|
||||
): seq[Blob]
|
||||
{.gcsafe, raises: [Defect,RlpError,KeyError]} =
|
||||
## Returns the list of partial paths which envelopes span the range of
|
||||
## node paths one obtains by subtracting the argument range `iv` from the
|
||||
## envelope of the argumenr `partialPath`.
|
||||
##
|
||||
## The following boundary conditions apply in order to get a useful result
|
||||
## in a partially completed hexary trie database.
|
||||
##
|
||||
## * The argument `partialPath` refers to an existing node.
|
||||
##
|
||||
## * The argument `iv` contains a range of paths (e.g. account hash keys)
|
||||
## with the property that if there is no (leaf-) node for that path, then
|
||||
## no such node exists when the database is completed.
|
||||
##
|
||||
## This condition is sort of rephrasing the boundary proof condition that
|
||||
## applies when downloading a range of accounts or storage slots from the
|
||||
## network via `snap/1` protocol. In fact the condition here is stricter
|
||||
## as it excludes sub-trie *holes* (see comment on `importAccounts()`.)
|
||||
##
|
||||
# Chechk for the trivial case when the `partialPath` envelope and `iv` do
|
||||
# not overlap.
|
||||
let env = partialPath.pathEnvelope
|
||||
if iv.maxPt < env.minPt or env.maxPt < iv.minPt:
|
||||
return @[partialPath]
|
||||
|
||||
# So ranges do overlap. The case that the `partialPath` envelope is fully
|
||||
# contained in `iv` results in `@[]` which is implicitely handled by
|
||||
# non-matching any of the cases, below.
|
||||
if env.minPt < iv.minPt:
|
||||
let
|
||||
envPt = env.minPt.to(NodeKey).hexaryPath(rootKey.to(RepairKey), db)
|
||||
ivPt = iv.minPt.to(NodeKey).hexaryPath(rootKey.to(RepairKey), db)
|
||||
when false: # or true:
|
||||
echo ">>> ",
|
||||
"\n ", envPt.pp(db),
|
||||
"\n -----",
|
||||
"\n ", ivPt.pp(db)
|
||||
let rc = envPt.dismantleLeft ivPt
|
||||
if rc.isErr:
|
||||
return @[partialPath]
|
||||
result &= rc.value
|
||||
|
||||
if iv.maxPt < env.maxPt:
|
||||
let
|
||||
envPt = env.maxPt.to(NodeKey).hexaryPath(rootKey.to(RepairKey), db)
|
||||
ivPt = iv.maxPt.to(NodeKey).hexaryPath(rootKey.to(RepairKey), db)
|
||||
when false: # or true:
|
||||
echo ">>> ",
|
||||
"\n ", envPt.pp(db),
|
||||
"\n -----",
|
||||
"\n ", ivPt.pp(db)
|
||||
let rc = envPt.dismantleRight ivPt
|
||||
if rc.isErr:
|
||||
return @[partialPath]
|
||||
result &= rc.value
|
||||
|
||||
proc dismantle*(
|
||||
partialPath: Blob; ## Patrial path for existing node
|
||||
rootKey: NodeKey; ## State root
|
||||
iv: NodeTagRange; ## Proofed range of leaf paths
|
||||
getFn: HexaryGetFn; ## Database abstraction
|
||||
): seq[Blob]
|
||||
{.gcsafe, raises: [Defect,RlpError]} =
|
||||
## Variant of `dismantle()` for persistent database.
|
||||
let env = partialPath.pathEnvelope
|
||||
if iv.maxPt < env.minPt or env.maxPt < iv.minPt:
|
||||
return @[partialPath]
|
||||
|
||||
if env.minPt < iv.minPt:
|
||||
let rc = dismantleLeft(
|
||||
env.minPt.to(NodeKey).hexaryPath(rootKey, getFn),
|
||||
iv.minPt.to(NodeKey).hexaryPath(rootKey, getFn))
|
||||
if rc.isErr:
|
||||
return @[partialPath]
|
||||
result &= rc.value
|
||||
|
||||
if iv.maxPt < env.maxPt:
|
||||
let rc = dismantleRight(
|
||||
env.maxPt.to(NodeKey).hexaryPath(rootKey, getFn),
|
||||
iv.maxPt.to(NodeKey).hexaryPath(rootKey, getFn))
|
||||
if rc.isErr:
|
||||
return @[partialPath]
|
||||
result &= rc.value
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
||||
|
|
|
@ -18,7 +18,8 @@ import
|
|||
../../../../utils/prettify,
|
||||
../../../sync_desc,
|
||||
"../.."/[range_desc, worker_desc],
|
||||
"."/[hexary_desc, hexary_error, snapdb_accounts, snapdb_storage_slots]
|
||||
"."/[hexary_desc, hexary_error, hexary_inspect,
|
||||
snapdb_accounts, snapdb_storage_slots]
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
|
@ -45,7 +46,7 @@ proc accountsCtx(
|
|||
"{" &
|
||||
"pivot=" & "#" & $env.stateHeader.blockNumber & "," &
|
||||
"nAccounts=" & $env.nAccounts & "," &
|
||||
("covered=" & env.fetchAccounts.unprocessed.emptyFactor.toPC(0) & "/" &
|
||||
("covered=" & env.fetchAccounts.processed.fullFactor.toPC(0) & "/" &
|
||||
ctx.data.coveredAccounts.fullFactor.toPC(0)) & "," &
|
||||
"nCheckNodes=" & $env.fetchAccounts.checkNodes.len & "," &
|
||||
"nSickSubTries=" & $env.fetchAccounts.sickSubTries.len & "}"
|
||||
|
@ -74,7 +75,7 @@ proc storageSlotsCtx(
|
|||
"inherit=" & (if data.inherit: "t" else: "f") & ","
|
||||
if not slots.isNil:
|
||||
result &= "" &
|
||||
"covered=" & slots.unprocessed.emptyFactor.toPC(0) &
|
||||
"covered=" & slots.processed.fullFactor.toPC(0) &
|
||||
"nCheckNodes=" & $slots.checkNodes.len & "," &
|
||||
"nSickSubTries=" & $slots.sickSubTries.len
|
||||
result &= "}"
|
||||
|
@ -88,7 +89,7 @@ proc checkStorageSlotsTrie(
|
|||
accKey: NodeKey;
|
||||
storageRoot: Hash256;
|
||||
env: SnapPivotRef;
|
||||
): Result[bool,HexaryDbError] =
|
||||
): Result[bool,HexaryError] =
|
||||
## Check whether a storage slots hexary trie is complete.
|
||||
let
|
||||
ctx = buddy.ctx
|
||||
|
@ -106,7 +107,7 @@ proc checkStorageSlotsTrie(
|
|||
iterator accountsWalk(
|
||||
buddy: SnapBuddyRef;
|
||||
env: SnapPivotRef;
|
||||
): (NodeKey,Account,HexaryDbError) =
|
||||
): (NodeKey,Account,HexaryError) =
|
||||
let
|
||||
ctx = buddy.ctx
|
||||
db = ctx.data.snapDb
|
||||
|
@ -157,18 +158,29 @@ proc checkAccountsTrieIsComplete*(
|
|||
## Check whether accounts hexary trie is complete
|
||||
let
|
||||
ctx = buddy.ctx
|
||||
db = ctx.data.snapDb
|
||||
peer = buddy.peer
|
||||
stateRoot = env.stateHeader.stateRoot
|
||||
db = ctx.data.snapDb
|
||||
rootKey = env.stateHeader.stateRoot.to(NodeKey)
|
||||
var
|
||||
error: HexaryError
|
||||
|
||||
rc = db.inspectAccountsTrie(peer, stateRoot)
|
||||
try:
|
||||
let stats = db.getAccountFn.hexaryInspectTrie(rootKey, @[])
|
||||
if not stats.stopped:
|
||||
return stats.dangling.len == 0
|
||||
|
||||
if rc.isErr:
|
||||
error logTxt "accounts health check failed", peer,
|
||||
ctx=buddy.accountsCtx(env), error=rc.error
|
||||
return false
|
||||
error = TrieLoopAlert
|
||||
except RlpError:
|
||||
error = RlpEncoding
|
||||
except KeyError as e:
|
||||
raiseAssert "Not possible @ importRawAccountNodes: " & e.msg
|
||||
except Exception as e:
|
||||
raiseAssert "Ooops checkAccountsTrieIsComplete(): name=" &
|
||||
$e.name & " msg=" & e.msg
|
||||
|
||||
rc.value.dangling.len == 0
|
||||
error logTxt "accounts health check failed", peer,
|
||||
ctx=buddy.accountsCtx(env), error
|
||||
return false
|
||||
|
||||
|
||||
proc checkAccountsListOk*(
|
|
@ -12,7 +12,7 @@ import
|
|||
std/[algorithm, sequtils, tables],
|
||||
chronicles,
|
||||
eth/[common, p2p, rlp, trie/nibbles],
|
||||
stew/byteutils,
|
||||
stew/[byteutils, interval_set],
|
||||
../../range_desc,
|
||||
"."/[hexary_desc, hexary_error, hexary_import, hexary_interpolate,
|
||||
hexary_inspect, hexary_paths, snapdb_desc, snapdb_persistent]
|
||||
|
@ -25,7 +25,6 @@ logScope:
|
|||
type
|
||||
SnapDbAccountsRef* = ref object of SnapDbBaseRef
|
||||
peer: Peer ## For log messages
|
||||
getClsFn: AccountsGetFn ## Persistent database `get()` closure
|
||||
|
||||
SnapAccountsGaps* = object
|
||||
innerGaps*: seq[NodeSpecs]
|
||||
|
@ -44,12 +43,6 @@ proc to(h: Hash256; T: type NodeKey): T =
|
|||
proc convertTo(data: openArray[byte]; T: type Hash256): T =
|
||||
discard result.data.NodeKey.init(data) # size error => zero
|
||||
|
||||
proc getFn(ps: SnapDbAccountsRef): HexaryGetFn =
|
||||
## Derive from `GetClsFn` closure => `HexaryGetFn`. There reason for that
|
||||
## seemingly redundant mapping is that here is space for additional localised
|
||||
## and locked parameters as done with the `StorageSlotsGetFn`.
|
||||
return proc(key: openArray[byte]): Blob = ps.getClsFn(key)
|
||||
|
||||
template noKeyError(info: static[string]; code: untyped) =
|
||||
try:
|
||||
code
|
||||
|
@ -75,7 +68,7 @@ template noRlpExceptionOops(info: static[string]; code: untyped) =
|
|||
proc persistentAccounts(
|
||||
db: HexaryTreeDbRef; ## Current table
|
||||
ps: SnapDbAccountsRef; ## For persistent database
|
||||
): Result[void,HexaryDbError]
|
||||
): Result[void,HexaryError]
|
||||
{.gcsafe, raises: [Defect,OSError,KeyError].} =
|
||||
## Store accounts trie table on databse
|
||||
if ps.rockDb.isNil:
|
||||
|
@ -91,7 +84,7 @@ proc collectAccounts(
|
|||
peer: Peer, ## for log messages
|
||||
base: NodeTag;
|
||||
acc: seq[PackedAccount];
|
||||
): Result[seq[RLeafSpecs],HexaryDbError]
|
||||
): Result[seq[RLeafSpecs],HexaryError]
|
||||
{.gcsafe, raises: [Defect, RlpError].} =
|
||||
## Repack account records into a `seq[RLeafSpecs]` queue. The argument data
|
||||
## `acc` are as received with the snap message `AccountRange`).
|
||||
|
@ -137,11 +130,9 @@ proc init*(
|
|||
peer: Peer = nil
|
||||
): T =
|
||||
## Constructor, starts a new accounts session.
|
||||
let db = pv.kvDb
|
||||
new result
|
||||
result.init(pv, root.to(NodeKey))
|
||||
result.peer = peer
|
||||
result.getClsFn = db.persistentAccountsGetFn()
|
||||
|
||||
proc dup*(
|
||||
ps: SnapDbAccountsRef;
|
||||
|
@ -167,27 +158,15 @@ proc dup*(
|
|||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc nodeExists*(
|
||||
ps: SnapDbAccountsRef; ## Re-usable session descriptor
|
||||
node: NodeSpecs; ## Node specs, e.g. returned by `importAccounts()`
|
||||
persistent = false; ## Whether to check data on disk
|
||||
): bool =
|
||||
## ..
|
||||
if not persistent:
|
||||
return ps.hexaDb.tab.hasKey(node.nodeKey.to(RepairKey))
|
||||
try:
|
||||
return 0 < ps.getFn()(node.nodeKey.ByteArray32).len
|
||||
except Exception as e:
|
||||
raiseAssert "Not possible @ importAccounts(" & $e.name & "):" & e.msg
|
||||
proc getAccountFn*(ps: SnapDbAccountsRef): HexaryGetFn =
|
||||
## Return `HexaryGetFn` closure.
|
||||
let getFn = ps.kvDb.persistentAccountsGetFn()
|
||||
return proc(key: openArray[byte]): Blob = getFn(key)
|
||||
|
||||
proc nodeExists*(
|
||||
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
|
||||
peer: Peer; ## For log messages
|
||||
root: Hash256; ## State root
|
||||
node: NodeSpecs; ## Node specs, e.g. returned by `importAccounts()`
|
||||
): bool =
|
||||
## Variant of `nodeExists()` for presistent storage, only.
|
||||
SnapDbAccountsRef.init(pv, root, peer).nodeExists(node, persistent=true)
|
||||
proc getAccountFn*(pv: SnapDbRef): HexaryGetFn =
|
||||
## Variant of `getAccountFn()`
|
||||
let getFn = pv.kvDb.persistentAccountsGetFn()
|
||||
return proc(key: openArray[byte]): Blob = getFn(key)
|
||||
|
||||
|
||||
proc importAccounts*(
|
||||
|
@ -196,7 +175,7 @@ proc importAccounts*(
|
|||
data: PackedAccountRange; ## Re-packed `snap/1 ` reply data
|
||||
persistent = false; ## Store data on disk
|
||||
noBaseBoundCheck = false; ## Ignore left boundary proof check if `true`
|
||||
): Result[SnapAccountsGaps,HexaryDbError] =
|
||||
): Result[SnapAccountsGaps,HexaryError] =
|
||||
## Validate and import accounts (using proofs as received with the snap
|
||||
## message `AccountRange`). This function accumulates data in a memory table
|
||||
## which can be written to disk with the argument `persistent` set `true`.
|
||||
|
@ -243,9 +222,10 @@ proc importAccounts*(
|
|||
##
|
||||
## Note that the `peer` argument is for log messages, only.
|
||||
var
|
||||
accounts: seq[RLeafSpecs]
|
||||
outside: seq[NodeSpecs]
|
||||
gaps: SnapAccountsGaps
|
||||
accounts: seq[RLeafSpecs] # validated accounts to add to database
|
||||
gaps: SnapAccountsGaps # return value
|
||||
proofStats: TrieNodeStat # `proof` data dangling links
|
||||
innerSubTrie: seq[NodeSpecs] # internal, collect dangling links
|
||||
try:
|
||||
if 0 < data.proof.len:
|
||||
let rc = ps.mergeProofs(ps.peer, data.proof)
|
||||
|
@ -257,24 +237,25 @@ proc importAccounts*(
|
|||
return err(rc.error)
|
||||
accounts = rc.value
|
||||
|
||||
# Inspect trie for dangling nodes from prrof data (if any.)
|
||||
if 0 < data.proof.len:
|
||||
proofStats = ps.hexaDb.hexaryInspectTrie(ps.root, @[])
|
||||
|
||||
if 0 < accounts.len:
|
||||
var innerSubTrie: seq[NodeSpecs]
|
||||
if 0 < data.proof.len:
|
||||
# Inspect trie for dangling nodes. This is not a big deal here as the
|
||||
# proof data is typically small.
|
||||
let
|
||||
proofStats = ps.hexaDb.hexaryInspectTrie(ps.root, @[])
|
||||
topTag = accounts[^1].pathTag
|
||||
let topTag = accounts[^1].pathTag
|
||||
for w in proofStats.dangling:
|
||||
if base <= w.partialPath.max(NodeKey).to(NodeTag) and
|
||||
w.partialPath.min(NodeKey).to(NodeTag) <= topTag:
|
||||
# Extract dangling links which are inside the accounts range
|
||||
innerSubTrie.add w
|
||||
let iv = w.partialPath.pathEnvelope
|
||||
if iv.maxPt < base or topTag < iv.minPt:
|
||||
# Dangling link with partial path envelope outside accounts range
|
||||
gaps.dangling.add w
|
||||
else:
|
||||
# Otherwise register outside links
|
||||
outside.add w
|
||||
# Overlapping partial path envelope.
|
||||
innerSubTrie.add w
|
||||
|
||||
# Build partial hexary trie
|
||||
# Build partial or full hexary trie
|
||||
let rc = ps.hexaDb.hexaryInterpolate(
|
||||
ps.root, accounts, bootstrap = (data.proof.len == 0))
|
||||
if rc.isErr:
|
||||
|
@ -284,35 +265,35 @@ proc importAccounts*(
|
|||
# trie (if any).
|
||||
let bottomTag = accounts[0].pathTag
|
||||
for w in innerSubTrie:
|
||||
if ps.hexaDb.tab.hasKey(w.nodeKey.to(RepairKey)):
|
||||
continue
|
||||
# Verify that `base` is to the left of the first account and there is
|
||||
# nothing in between. Without proof, there can only be a complete
|
||||
# set/list of accounts. There must be a proof for an empty list.
|
||||
if not noBaseBoundCheck and
|
||||
w.partialPath.max(NodeKey).to(NodeTag) < bottomTag:
|
||||
return err(LowerBoundProofError)
|
||||
# Otherwise register left over entry
|
||||
gaps.innerGaps.add w
|
||||
if not ps.hexaDb.tab.hasKey(w.nodeKey.to(RepairKey)):
|
||||
if not noBaseBoundCheck:
|
||||
# Verify that `base` is to the left of the first account and there
|
||||
# is nothing in between.
|
||||
#
|
||||
# Without `proof` data available there can only be a complete
|
||||
# set/list of accounts so there are no dangling nodes in the first
|
||||
# place. But there must be `proof` data for an empty list.
|
||||
if w.partialPath.pathEnvelope.maxPt < bottomTag:
|
||||
return err(LowerBoundProofError)
|
||||
# Otherwise register left over entry
|
||||
gaps.innerGaps.add w
|
||||
|
||||
if persistent:
|
||||
let rc = ps.hexaDb.persistentAccounts(ps)
|
||||
if rc.isErr:
|
||||
return err(rc.error)
|
||||
# Verify outer links against database
|
||||
let getFn = ps.getFn
|
||||
for w in outside:
|
||||
if w.nodeKey.ByteArray32.getFn().len == 0:
|
||||
gaps.dangling.add w
|
||||
else:
|
||||
for w in outside:
|
||||
if not ps.hexaDb.tab.hasKey(w.nodeKey.to(RepairKey)):
|
||||
gaps.dangling.add w
|
||||
|
||||
elif data.proof.len == 0:
|
||||
# There must be a proof for an empty argument list.
|
||||
return err(LowerBoundProofError)
|
||||
|
||||
else:
|
||||
if not noBaseBoundCheck:
|
||||
for w in proofStats.dangling:
|
||||
if base <= w.partialPath.pathEnvelope.maxPt:
|
||||
return err(LowerBoundProofError)
|
||||
gaps.dangling = proofStats.dangling
|
||||
|
||||
except RlpError:
|
||||
return err(RlpEncoding)
|
||||
except KeyError as e:
|
||||
|
@ -338,7 +319,7 @@ proc importAccounts*(
|
|||
base: NodeTag; ## Before or at first account entry in `data`
|
||||
data: PackedAccountRange; ## Re-packed `snap/1 ` reply data
|
||||
noBaseBoundCheck = false; ## Ignore left bound proof check if `true`
|
||||
): Result[SnapAccountsGaps,HexaryDbError] =
|
||||
): Result[SnapAccountsGaps,HexaryError] =
|
||||
## Variant of `importAccounts()` for presistent storage, only.
|
||||
SnapDbAccountsRef.init(
|
||||
pv, root, peer).importAccounts(
|
||||
|
@ -423,82 +404,16 @@ proc importRawAccountsNodes*(
|
|||
nodes, reportNodes, persistent=true)
|
||||
|
||||
|
||||
proc inspectAccountsTrie*(
|
||||
ps: SnapDbAccountsRef; ## Re-usable session descriptor
|
||||
pathList = seq[Blob].default; ## Starting nodes for search
|
||||
resumeCtx: TrieNodeStatCtxRef = nil; ## Context for resuming inspection
|
||||
suspendAfter = high(uint64); ## To be resumed
|
||||
persistent = false; ## Read data from disk
|
||||
ignoreError = false; ## Always return partial results if any
|
||||
): Result[TrieNodeStat, HexaryDbError] =
|
||||
## Starting with the argument list `pathSet`, find all the non-leaf nodes in
|
||||
## the hexary trie which have at least one node key reference missing in
|
||||
## the trie database. Argument `pathSet` list entries that do not refer to a
|
||||
## valid node are silently ignored.
|
||||
##
|
||||
## Trie inspection can be automatically suspended after having visited
|
||||
## `suspendAfter` nodes to be resumed at the last state. An application of
|
||||
## this feature would look like
|
||||
## ::
|
||||
## var ctx = TrieNodeStatCtxRef()
|
||||
## while not ctx.isNil:
|
||||
## let rc = inspectAccountsTrie(.., resumeCtx=ctx, suspendAfter=1024)
|
||||
## ...
|
||||
## ctx = rc.value.resumeCtx
|
||||
##
|
||||
let peer = ps.peer
|
||||
var stats: TrieNodeStat
|
||||
noRlpExceptionOops("inspectAccountsTrie()"):
|
||||
if persistent:
|
||||
stats = ps.getFn.hexaryInspectTrie(
|
||||
ps.root, pathList, resumeCtx, suspendAfter=suspendAfter)
|
||||
else:
|
||||
stats = ps.hexaDb.hexaryInspectTrie(
|
||||
ps.root, pathList, resumeCtx, suspendAfter=suspendAfter)
|
||||
|
||||
block checkForError:
|
||||
var error = TrieIsEmpty
|
||||
if stats.stopped:
|
||||
error = TrieLoopAlert
|
||||
trace "Inspect account trie failed", peer, nPathList=pathList.len,
|
||||
nDangling=stats.dangling.len, stoppedAt=stats.level, error
|
||||
elif 0 < stats.level:
|
||||
break checkForError
|
||||
if ignoreError:
|
||||
return ok(stats)
|
||||
return err(error)
|
||||
|
||||
#when extraTraceMessages:
|
||||
# trace "Inspect account trie ok", peer, nPathList=pathList.len,
|
||||
# nDangling=stats.dangling.len, level=stats.level
|
||||
|
||||
return ok(stats)
|
||||
|
||||
proc inspectAccountsTrie*(
|
||||
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
|
||||
peer: Peer; ## For log messages, only
|
||||
root: Hash256; ## state root
|
||||
pathList = seq[Blob].default; ## Starting paths for search
|
||||
resumeCtx: TrieNodeStatCtxRef = nil; ## Context for resuming inspection
|
||||
suspendAfter = high(uint64); ## To be resumed
|
||||
ignoreError = false; ## Always return partial results if any
|
||||
): Result[TrieNodeStat, HexaryDbError] =
|
||||
## Variant of `inspectAccountsTrie()` for persistent storage.
|
||||
SnapDbAccountsRef.init(
|
||||
pv, root, peer).inspectAccountsTrie(
|
||||
pathList, resumeCtx, suspendAfter, persistent=true, ignoreError)
|
||||
|
||||
|
||||
proc getAccountsNodeKey*(
|
||||
ps: SnapDbAccountsRef; ## Re-usable session descriptor
|
||||
path: Blob; ## Partial node path
|
||||
persistent = false; ## Read data from disk
|
||||
): Result[NodeKey,HexaryDbError] =
|
||||
): Result[NodeKey,HexaryError] =
|
||||
## For a partial node path argument `path`, return the raw node key.
|
||||
var rc: Result[NodeKey,void]
|
||||
noRlpExceptionOops("getAccountsNodeKey()"):
|
||||
if persistent:
|
||||
rc = ps.getFn.hexaryInspectPath(ps.root, path)
|
||||
rc = ps.getAccountFn.hexaryInspectPath(ps.root, path)
|
||||
else:
|
||||
rc = ps.hexaDb.hexaryInspectPath(ps.root, path)
|
||||
if rc.isOk:
|
||||
|
@ -509,7 +424,7 @@ proc getAccountsNodeKey*(
|
|||
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
|
||||
root: Hash256; ## state root
|
||||
path: Blob; ## Partial node path
|
||||
): Result[NodeKey,HexaryDbError] =
|
||||
): Result[NodeKey,HexaryError] =
|
||||
## Variant of `getAccountsNodeKey()` for persistent storage.
|
||||
SnapDbAccountsRef.init(
|
||||
pv, root, Peer()).getAccountsNodeKey(path, persistent=true)
|
||||
|
@ -519,7 +434,7 @@ proc getAccountsData*(
|
|||
ps: SnapDbAccountsRef; ## Re-usable session descriptor
|
||||
path: NodeKey; ## Account to visit
|
||||
persistent = false; ## Read data from disk
|
||||
): Result[Account,HexaryDbError] =
|
||||
): Result[Account,HexaryError] =
|
||||
## Fetch account data.
|
||||
##
|
||||
## Caveat: There is no unit test yet for the non-persistent version
|
||||
|
@ -528,7 +443,7 @@ proc getAccountsData*(
|
|||
noRlpExceptionOops("getAccountData()"):
|
||||
var leaf: Blob
|
||||
if persistent:
|
||||
leaf = path.hexaryPath(ps.root, ps.getFn).leafData
|
||||
leaf = path.hexaryPath(ps.root, ps.getAccountFn).leafData
|
||||
else:
|
||||
leaf = path.hexaryPath(ps.root.to(RepairKey),ps.hexaDb).leafData
|
||||
|
||||
|
@ -542,7 +457,7 @@ proc getAccountsData*(
|
|||
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
|
||||
root: Hash256; ## State root
|
||||
path: NodeKey; ## Account to visit
|
||||
): Result[Account,HexaryDbError] =
|
||||
): Result[Account,HexaryError] =
|
||||
## Variant of `getAccountsData()` for persistent storage.
|
||||
SnapDbAccountsRef.init(
|
||||
pv, root, Peer()).getAccountsData(path, persistent=true)
|
||||
|
@ -576,20 +491,20 @@ proc sortMerge*(acc: openArray[seq[PackedAccount]]): seq[PackedAccount] =
|
|||
proc getAccountsChainDb*(
|
||||
ps: SnapDbAccountsRef;
|
||||
accKey: NodeKey;
|
||||
): Result[Account,HexaryDbError] =
|
||||
): Result[Account,HexaryError] =
|
||||
## Fetch account via `BaseChainDB`
|
||||
ps.getAccountsData(accKey, persistent = true)
|
||||
|
||||
proc nextAccountsChainDbKey*(
|
||||
ps: SnapDbAccountsRef;
|
||||
accKey: NodeKey;
|
||||
): Result[NodeKey,HexaryDbError] =
|
||||
): Result[NodeKey,HexaryError] =
|
||||
## Fetch the account path on the `BaseChainDB`, the one next to the
|
||||
## argument account key.
|
||||
noRlpExceptionOops("getChainDbAccount()"):
|
||||
let path = accKey
|
||||
.hexaryPath(ps.root, ps.getFn)
|
||||
.next(ps.getFn)
|
||||
.hexaryPath(ps.root, ps.getAccountFn)
|
||||
.next(ps.getAccountFn)
|
||||
.getNibbles
|
||||
if 64 == path.len:
|
||||
return ok(path.getBytes.convertTo(Hash256).to(NodeKey))
|
||||
|
@ -599,13 +514,13 @@ proc nextAccountsChainDbKey*(
|
|||
proc prevAccountsChainDbKey*(
|
||||
ps: SnapDbAccountsRef;
|
||||
accKey: NodeKey;
|
||||
): Result[NodeKey,HexaryDbError] =
|
||||
): Result[NodeKey,HexaryError] =
|
||||
## Fetch the account path on the `BaseChainDB`, the one before to the
|
||||
## argument account.
|
||||
noRlpExceptionOops("getChainDbAccount()"):
|
||||
let path = accKey
|
||||
.hexaryPath(ps.root, ps.getFn)
|
||||
.prev(ps.getFn)
|
||||
.hexaryPath(ps.root, ps.getAccountFn)
|
||||
.prev(ps.getAccountFn)
|
||||
.getNibbles
|
||||
if 64 == path.len:
|
||||
return ok(path.getBytes.convertTo(Hash256).to(NodeKey))
|
||||
|
|
|
@ -214,7 +214,7 @@ proc mergeProofs*(
|
|||
peer: Peer; ## For log messages
|
||||
proof: seq[Blob]; ## Node records
|
||||
freeStandingOk = false; ## Remove freestanding nodes
|
||||
): Result[void,HexaryDbError]
|
||||
): Result[void,HexaryError]
|
||||
{.gcsafe, raises: [Defect,RlpError,KeyError].} =
|
||||
## Import proof records (as received with snap message) into a hexary trie
|
||||
## of the repair table. These hexary trie records can be extended to a full
|
||||
|
@ -253,7 +253,7 @@ proc verifyLowerBound*(
|
|||
peer: Peer; ## For log messages
|
||||
base: NodeTag; ## Before or at first account entry in `data`
|
||||
first: NodeTag; ## First account key
|
||||
): Result[void,HexaryDbError]
|
||||
): Result[void,HexaryError]
|
||||
{.gcsafe, raises: [Defect, KeyError].} =
|
||||
## Verify that `base` is to the left of the first leaf entry and there is
|
||||
## nothing in between.
|
||||
|
@ -278,7 +278,7 @@ proc verifyNoMoreRight*(
|
|||
ps: SnapDbBaseRef; ## Database session descriptor
|
||||
peer: Peer; ## For log messages
|
||||
base: NodeTag; ## Before or at first account entry in `data`
|
||||
): Result[void,HexaryDbError]
|
||||
): Result[void,HexaryError]
|
||||
{.gcsafe, raises: [Defect, KeyError].} =
|
||||
## Verify that there is are no more leaf entries to the right of and
|
||||
## including `base`.
|
||||
|
|
|
@ -90,7 +90,7 @@ proc persistentStorageSlotsGetFn*(db: TrieDatabaseRef): StorageSlotsGetFn =
|
|||
proc persistentStateRootGet*(
|
||||
db: TrieDatabaseRef;
|
||||
root: NodeKey;
|
||||
): Result[StateRootRegistry,HexaryDbError] =
|
||||
): Result[StateRootRegistry,HexaryError] =
|
||||
## Implements a `get()` function for returning state root registry data.
|
||||
let rlpBlob = db.stateRootGet(root)
|
||||
if 0 < rlpBlob.len:
|
||||
|
@ -107,7 +107,7 @@ proc persistentStateRootGet*(
|
|||
proc persistentAccountsPut*(
|
||||
db: HexaryTreeDbRef;
|
||||
base: TrieDatabaseRef
|
||||
): Result[void,HexaryDbError] =
|
||||
): Result[void,HexaryError] =
|
||||
## Bulk store using transactional `put()`
|
||||
let dbTx = base.beginTransaction
|
||||
defer: dbTx.commit
|
||||
|
@ -123,7 +123,7 @@ proc persistentAccountsPut*(
|
|||
proc persistentStorageSlotsPut*(
|
||||
db: HexaryTreeDbRef;
|
||||
base: TrieDatabaseRef
|
||||
): Result[void,HexaryDbError] =
|
||||
): Result[void,HexaryError] =
|
||||
## Bulk store using transactional `put()`
|
||||
let dbTx = base.beginTransaction
|
||||
defer: dbTx.commit
|
||||
|
@ -179,7 +179,7 @@ proc persistentStateRootPut*(
|
|||
proc persistentAccountsPut*(
|
||||
db: HexaryTreeDbRef;
|
||||
rocky: RocksStoreRef
|
||||
): Result[void,HexaryDbError]
|
||||
): Result[void,HexaryError]
|
||||
{.gcsafe, raises: [Defect,OSError,KeyError].} =
|
||||
## SST based bulk load on `rocksdb`.
|
||||
if rocky.isNil:
|
||||
|
@ -228,7 +228,7 @@ proc persistentAccountsPut*(
|
|||
proc persistentStorageSlotsPut*(
|
||||
db: HexaryTreeDbRef;
|
||||
rocky: RocksStoreRef
|
||||
): Result[void,HexaryDbError]
|
||||
): Result[void,HexaryError]
|
||||
{.gcsafe, raises: [Defect,OSError,KeyError].} =
|
||||
## SST based bulk load on `rocksdb`.
|
||||
if rocky.isNil:
|
||||
|
|
|
@ -47,7 +47,7 @@ template handleRlpException(info: static[string]; code: untyped) =
|
|||
proc savePivot*(
|
||||
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
|
||||
data: SnapDbPivotRegistry; ## Registered data record
|
||||
): Result[int,HexaryDbError] =
|
||||
): Result[int,HexaryError] =
|
||||
## Register pivot environment
|
||||
handleRlpException("savePivot()"):
|
||||
let rlpData = rlp.encode(data)
|
||||
|
@ -58,7 +58,7 @@ proc savePivot*(
|
|||
proc recoverPivot*(
|
||||
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
|
||||
stateRoot: NodeKey; ## Check for a particular state root
|
||||
): Result[SnapDbPivotRegistry,HexaryDbError] =
|
||||
): Result[SnapDbPivotRegistry,HexaryError] =
|
||||
## Restore pivot environment for a particular state root.
|
||||
let rc = pv.kvDb.persistentStateRootGet(stateRoot)
|
||||
if rc.isOk:
|
||||
|
@ -70,7 +70,7 @@ proc recoverPivot*(
|
|||
|
||||
proc recoverPivot*(
|
||||
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
|
||||
): Result[SnapDbPivotRegistry,HexaryDbError] =
|
||||
): Result[SnapDbPivotRegistry,HexaryError] =
|
||||
## Restore pivot environment that was saved latest.
|
||||
let rc = pv.kvDb.persistentStateRootGet(NodeKey.default)
|
||||
if rc.isOk:
|
||||
|
|
|
@ -12,6 +12,7 @@ import
|
|||
std/tables,
|
||||
chronicles,
|
||||
eth/[common, p2p, rlp],
|
||||
stew/interval_set,
|
||||
../../../protocol,
|
||||
../../range_desc,
|
||||
"."/[hexary_desc, hexary_error, hexary_import, hexary_inspect,
|
||||
|
@ -29,7 +30,6 @@ type
|
|||
SnapDbStorageSlotsRef* = ref object of SnapDbBaseRef
|
||||
peer: Peer ## For log messages
|
||||
accKey: NodeKey ## Accounts address hash (curr.unused)
|
||||
getClsFn: StorageSlotsGetFn ## Persistent database `get()` closure
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private helpers
|
||||
|
@ -41,10 +41,6 @@ proc to(h: Hash256; T: type NodeKey): T =
|
|||
proc convertTo(data: openArray[byte]; T: type Hash256): T =
|
||||
discard result.data.NodeKey.init(data) # size error => zero
|
||||
|
||||
proc getFn(ps: SnapDbStorageSlotsRef; accKey: NodeKey): HexaryGetFn =
|
||||
## Capture `accKey` argument for `GetClsFn` closure => `HexaryGetFn`
|
||||
return proc(key: openArray[byte]): Blob = ps.getClsFn(accKey,key)
|
||||
|
||||
|
||||
template noKeyError(info: static[string]; code: untyped) =
|
||||
try:
|
||||
|
@ -81,7 +77,7 @@ template noGenericExOrKeyError(info: static[string]; code: untyped) =
|
|||
proc persistentStorageSlots(
|
||||
db: HexaryTreeDbRef; ## Current table
|
||||
ps: SnapDbStorageSlotsRef; ## For persistent database
|
||||
): Result[void,HexaryDbError]
|
||||
): Result[void,HexaryError]
|
||||
{.gcsafe, raises: [Defect,OSError,KeyError].} =
|
||||
## Store accounts trie table on databse
|
||||
if ps.rockDb.isNil:
|
||||
|
@ -97,7 +93,7 @@ proc collectStorageSlots(
|
|||
peer: Peer; ## for log messages
|
||||
base: NodeTag; ## before or at first account entry in `data`
|
||||
slotLists: seq[SnapStorage];
|
||||
): Result[seq[RLeafSpecs],HexaryDbError]
|
||||
): Result[seq[RLeafSpecs],HexaryError]
|
||||
{.gcsafe, raises: [Defect, RlpError].} =
|
||||
## Similar to `collectAccounts()`
|
||||
var rcSlots: seq[RLeafSpecs]
|
||||
|
@ -140,15 +136,17 @@ proc importStorageSlots(
|
|||
data: AccountSlots; ## Account storage descriptor
|
||||
proof: SnapStorageProof; ## Storage slots proof data
|
||||
noBaseBoundCheck = false; ## Ignore left boundary proof check if `true`
|
||||
): Result[seq[NodeSpecs],HexaryDbError]
|
||||
): Result[seq[NodeSpecs],HexaryError]
|
||||
{.gcsafe, raises: [Defect,RlpError,KeyError].} =
|
||||
## Process storage slots for a particular storage root. See `importAccounts()`
|
||||
## for comments on the return value.
|
||||
let
|
||||
tmpDb = SnapDbBaseRef.init(ps, data.account.storageRoot.to(NodeKey))
|
||||
var
|
||||
slots: seq[RLeafSpecs]
|
||||
dangling: seq[NodeSpecs]
|
||||
slots: seq[RLeafSpecs] # validated slots to add to database
|
||||
dangling: seq[NodeSpecs] # return value
|
||||
proofStats: TrieNodeStat # `proof` data dangling links
|
||||
innerSubTrie: seq[NodeSpecs] # internal, collect dangling links
|
||||
if 0 < proof.len:
|
||||
let rc = tmpDb.mergeProofs(ps.peer, proof)
|
||||
if rc.isErr:
|
||||
|
@ -160,17 +158,17 @@ proc importStorageSlots(
|
|||
slots = rc.value
|
||||
|
||||
if 0 < slots.len:
|
||||
var innerSubTrie: seq[NodeSpecs]
|
||||
if 0 < proof.len:
|
||||
# Inspect trie for dangling nodes. This is not a big deal here as the
|
||||
# proof data is typically small.
|
||||
let
|
||||
proofStats = ps.hexaDb.hexaryInspectTrie(ps.root, @[])
|
||||
topTag = slots[^1].pathTag
|
||||
let topTag = slots[^1].pathTag
|
||||
for w in proofStats.dangling:
|
||||
if base <= w.partialPath.max(NodeKey).to(NodeTag) and
|
||||
w.partialPath.min(NodeKey).to(NodeTag) <= topTag:
|
||||
# Extract dangling links which are inside the accounts range
|
||||
let iv = w.partialPath.pathEnvelope
|
||||
if iv.maxPt < base or topTag < iv.minPt:
|
||||
# Dangling link with partial path envelope outside accounts range
|
||||
discard
|
||||
else:
|
||||
# Overlapping partial path envelope.
|
||||
innerSubTrie.add w
|
||||
|
||||
# Build partial hexary trie
|
||||
|
@ -183,16 +181,18 @@ proc importStorageSlots(
|
|||
# trie (if any).
|
||||
let bottomTag = slots[0].pathTag
|
||||
for w in innerSubTrie:
|
||||
if ps.hexaDb.tab.hasKey(w.nodeKey.to(RepairKey)):
|
||||
continue
|
||||
# Verify that `base` is to the left of the first slot and there is
|
||||
# nothing in between. Without proof, there can only be a complete
|
||||
# set/list of slots. There must be a proof for an empty list.
|
||||
if not noBaseBoundCheck and
|
||||
w.partialPath.max(NodeKey).to(NodeTag) < bottomTag:
|
||||
return err(LowerBoundProofError)
|
||||
# Otherwise register left over entry
|
||||
dangling.add w
|
||||
if not ps.hexaDb.tab.hasKey(w.nodeKey.to(RepairKey)):
|
||||
if not noBaseBoundCheck:
|
||||
# Verify that `base` is to the left of the first slot and there is
|
||||
# nothing in between.
|
||||
#
|
||||
# Without `proof` data available there can only be a complete
|
||||
# set/list of accounts so there are no dangling nodes in the first
|
||||
# place. But there must be `proof` data for an empty list.
|
||||
if w.partialPath.pathEnvelope.maxPt < bottomTag:
|
||||
return err(LowerBoundProofError)
|
||||
# Otherwise register left over entry
|
||||
dangling.add w
|
||||
|
||||
# Commit to main descriptor
|
||||
for k,v in tmpDb.hexaDb.tab.pairs:
|
||||
|
@ -204,6 +204,13 @@ proc importStorageSlots(
|
|||
# There must be a proof for an empty argument list.
|
||||
return err(LowerBoundProofError)
|
||||
|
||||
else:
|
||||
if not noBaseBoundCheck:
|
||||
for w in proofStats.dangling:
|
||||
if base <= w.partialPath.pathEnvelope.maxPt:
|
||||
return err(LowerBoundProofError)
|
||||
dangling = proofStats.dangling
|
||||
|
||||
ok(dangling)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -224,12 +231,27 @@ proc init*(
|
|||
result.init(pv, root.to(NodeKey))
|
||||
result.peer = peer
|
||||
result.accKey = accKey
|
||||
result.getClsFn = db.persistentStorageSlotsGetFn()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc getStorageSlotsFn*(
|
||||
ps: SnapDbStorageSlotsRef;
|
||||
): HexaryGetFn =
|
||||
## Return `HexaryGetFn` closure.
|
||||
let getFn = ps.kvDb.persistentStorageSlotsGetFn()
|
||||
return proc(key: openArray[byte]): Blob = getFn(ps.accKey, key)
|
||||
|
||||
proc getStorageSlotsFn*(
|
||||
pv: SnapDbRef;
|
||||
accKey: NodeKey;
|
||||
): HexaryGetFn =
|
||||
## Variant of `getStorageSlotsFn()` for captured `accKey` argument.
|
||||
let getFn = pv.kvDb.persistentStorageSlotsGetFn()
|
||||
return proc(key: openArray[byte]): Blob = getFn(accKey, key)
|
||||
|
||||
|
||||
proc importStorageSlots*(
|
||||
ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor
|
||||
data: AccountStorageRange; ## Account storage reply from `snap/1` protocol
|
||||
|
@ -407,7 +429,7 @@ proc inspectStorageSlotsTrie*(
|
|||
suspendAfter = high(uint64); ## To be resumed
|
||||
persistent = false; ## Read data from disk
|
||||
ignoreError = false; ## Always return partial results if any
|
||||
): Result[TrieNodeStat, HexaryDbError] =
|
||||
): Result[TrieNodeStat, HexaryError] =
|
||||
## Starting with the argument list `pathSet`, find all the non-leaf nodes in
|
||||
## the hexary trie which have at least one node key reference missing in
|
||||
## the trie database. Argument `pathSet` list entries that do not refer to a
|
||||
|
@ -427,7 +449,7 @@ proc inspectStorageSlotsTrie*(
|
|||
var stats: TrieNodeStat
|
||||
noRlpExceptionOops("inspectStorageSlotsTrie()"):
|
||||
if persistent:
|
||||
stats = ps.getFn(ps.accKey).hexaryInspectTrie(
|
||||
stats = ps.getStorageSlotsFn.hexaryInspectTrie(
|
||||
ps.root, pathList, resumeCtx, suspendAfter=suspendAfter)
|
||||
else:
|
||||
stats = ps.hexaDb.hexaryInspectTrie(
|
||||
|
@ -460,7 +482,7 @@ proc inspectStorageSlotsTrie*(
|
|||
resumeCtx: TrieNodeStatCtxRef = nil; ## Context for resuming inspection
|
||||
suspendAfter = high(uint64); ## To be resumed
|
||||
ignoreError = false; ## Always return partial results if any
|
||||
): Result[TrieNodeStat, HexaryDbError] =
|
||||
): Result[TrieNodeStat, HexaryError] =
|
||||
## Variant of `inspectStorageSlotsTrieTrie()` for persistent storage.
|
||||
SnapDbStorageSlotsRef.init(
|
||||
pv, accKey, root, peer).inspectStorageSlotsTrie(
|
||||
|
@ -471,12 +493,12 @@ proc getStorageSlotsNodeKey*(
|
|||
ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor
|
||||
path: Blob; ## Partial node path
|
||||
persistent = false; ## Read data from disk
|
||||
): Result[NodeKey,HexaryDbError] =
|
||||
): Result[NodeKey,HexaryError] =
|
||||
## For a partial node path argument `path`, return the raw node key.
|
||||
var rc: Result[NodeKey,void]
|
||||
noRlpExceptionOops("getStorageSlotsNodeKey()"):
|
||||
if persistent:
|
||||
rc = ps.getFn(ps.accKey).hexaryInspectPath(ps.root, path)
|
||||
rc = ps.getStorageSlotsFn.hexaryInspectPath(ps.root, path)
|
||||
else:
|
||||
rc = ps.hexaDb.hexaryInspectPath(ps.root, path)
|
||||
if rc.isOk:
|
||||
|
@ -489,7 +511,7 @@ proc getStorageSlotsNodeKey*(
|
|||
accKey: NodeKey; ## Account key
|
||||
root: Hash256; ## state root
|
||||
path: Blob; ## Partial node path
|
||||
): Result[NodeKey,HexaryDbError] =
|
||||
): Result[NodeKey,HexaryError] =
|
||||
## Variant of `getStorageSlotsNodeKey()` for persistent storage.
|
||||
SnapDbStorageSlotsRef.init(
|
||||
pv, accKey, root, peer).getStorageSlotsNodeKey(path, persistent=true)
|
||||
|
@ -499,7 +521,7 @@ proc getStorageSlotsData*(
|
|||
ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor
|
||||
path: NodeKey; ## Account to visit
|
||||
persistent = false; ## Read data from disk
|
||||
): Result[Account,HexaryDbError] =
|
||||
): Result[Account,HexaryError] =
|
||||
## Fetch storage slots data.
|
||||
##
|
||||
## Caveat: There is no unit test yet
|
||||
|
@ -509,9 +531,9 @@ proc getStorageSlotsData*(
|
|||
noRlpExceptionOops("getStorageSlotsData()"):
|
||||
var leaf: Blob
|
||||
if persistent:
|
||||
leaf = path.hexaryPath(ps.root, ps.getFn(ps.accKey)).leafData
|
||||
leaf = path.hexaryPath(ps.root, ps.getStorageSlotsFn).leafData
|
||||
else:
|
||||
leaf = path.hexaryPath(ps.root.to(RepairKey),ps.hexaDb).leafData
|
||||
leaf = path.hexaryPath(ps.root.to(RepairKey), ps.hexaDb).leafData
|
||||
|
||||
if leaf.len == 0:
|
||||
return err(AccountNotFound)
|
||||
|
@ -525,7 +547,7 @@ proc getStorageSlotsData*(
|
|||
accKey: NodeKey; ## Account key
|
||||
root: Hash256; ## state root
|
||||
path: NodeKey; ## Account to visit
|
||||
): Result[Account,HexaryDbError] =
|
||||
): Result[Account,HexaryError] =
|
||||
## Variant of `getStorageSlotsData()` for persistent storage.
|
||||
SnapDbStorageSlotsRef.init(
|
||||
pv, accKey, root, peer).getStorageSlotsData(path, persistent=true)
|
||||
|
@ -541,8 +563,7 @@ proc haveStorageSlotsData*(
|
|||
## Caveat: There is no unit test yet
|
||||
noGenericExOrKeyError("haveStorageSlotsData()"):
|
||||
if persistent:
|
||||
let getFn = ps.getFn(ps.accKey)
|
||||
return 0 < ps.root.ByteArray32.getFn().len
|
||||
return 0 < ps.getStorageSlotsFn()(ps.root.ByteArray32).len
|
||||
else:
|
||||
return ps.hexaDb.tab.hasKey(ps.root.to(RepairKey))
|
||||
|
||||
|
|
|
@ -14,17 +14,17 @@
|
|||
## Flow chart for healing algorithm
|
||||
## --------------------------------
|
||||
## ::
|
||||
## START with {state-root}
|
||||
## START
|
||||
## |
|
||||
## | +--------------------------------+
|
||||
## | | |
|
||||
## v v |
|
||||
## <inspect-trie> |
|
||||
## | |
|
||||
## | +--------------------------+ |
|
||||
## | | +--------------------+ | |
|
||||
## | | | | | |
|
||||
## v v v | | |
|
||||
## | v |
|
||||
## | <inspect-trie> |
|
||||
## | | |
|
||||
## | | +-----------------------+ |
|
||||
## | | | +------------------+ | |
|
||||
## | | | | | | |
|
||||
## v v v v | | |
|
||||
## {missing-nodes} | | |
|
||||
## | | | |
|
||||
## v | | |
|
||||
|
@ -48,8 +48,6 @@
|
|||
## Legend:
|
||||
## * `<..>`: some action, process, etc.
|
||||
## * `{missing-nodes}`: list implemented as `env.fetchAccounts.sickSubTries`
|
||||
## * `(state-root}`: implicit argument for `getAccountNodeKey()` when
|
||||
## the argument list is empty
|
||||
## * `{leaf-nodes}`: list is optimised out
|
||||
## * `{check-nodes}`: list implemented as `env.fetchAccounts.checkNodes`
|
||||
## * `{storage-roots}`: list implemented as pair of queues
|
||||
|
@ -57,8 +55,8 @@
|
|||
##
|
||||
## Discussion of flow chart
|
||||
## ------------------------
|
||||
## * Input nodes for `<inspect-trie>` are checked for dangling child node
|
||||
## links which in turn are collected as output.
|
||||
## * If there are no missing nodes, START proceeds with the `<inspect-trie>`
|
||||
## process.
|
||||
##
|
||||
## * Nodes of the `{missing-nodes}` list are fetched from the network and
|
||||
## merged into the persistent accounts trie database.
|
||||
|
@ -67,6 +65,9 @@
|
|||
## + Successfully merged leaf nodes are processed as single entry accounts
|
||||
## node ranges.
|
||||
##
|
||||
## * Input nodes for `<inspect-trie>` are checked for dangling child node
|
||||
## links which in turn are collected as output.
|
||||
##
|
||||
## * If there is a problem with a node travelling from the source list
|
||||
## `{missing-nodes}` towards either target list `{leaf-nodes}` or
|
||||
## `{check-nodes}`, this problem node will fed back to the `{missing-nodes}`
|
||||
|
@ -115,7 +116,8 @@ import
|
|||
../../sync_desc,
|
||||
".."/[constants, range_desc, worker_desc],
|
||||
./com/[com_error, get_trie_nodes],
|
||||
./db/[hexary_desc, hexary_error, snapdb_accounts]
|
||||
./db/[hexary_desc, hexary_error, snapdb_accounts],
|
||||
./sub_tries_helper
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
|
@ -141,7 +143,7 @@ proc healingCtx(
|
|||
"{" &
|
||||
"pivot=" & "#" & $env.stateHeader.blockNumber & "," &
|
||||
"nAccounts=" & $env.nAccounts & "," &
|
||||
("covered=" & env.fetchAccounts.unprocessed.emptyFactor.toPC(0) & "/" &
|
||||
("covered=" & env.fetchAccounts.processed.fullFactor.toPC(0) & "/" &
|
||||
ctx.data.coveredAccounts.fullFactor.toPC(0)) & "," &
|
||||
"nCheckNodes=" & $env.fetchAccounts.checkNodes.len & "," &
|
||||
"nSickSubTries=" & $env.fetchAccounts.sickSubTries.len & "}"
|
||||
|
@ -150,32 +152,6 @@ proc healingCtx(
|
|||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc verifyStillMissingNodes(
|
||||
buddy: SnapBuddyRef;
|
||||
env: SnapPivotRef;
|
||||
) =
|
||||
## Check whether previously missing nodes from the `sickSubTries` list
|
||||
## have been magically added to the database since it was checked last
|
||||
## time. These nodes will me moved to `checkNodes` for further processing.
|
||||
let
|
||||
ctx = buddy.ctx
|
||||
db = ctx.data.snapDb
|
||||
peer = buddy.peer
|
||||
stateRoot = env.stateHeader.stateRoot
|
||||
|
||||
var delayed: seq[NodeSpecs]
|
||||
for w in env.fetchAccounts.sickSubTries:
|
||||
if ctx.data.snapDb.nodeExists(peer, stateRoot, w):
|
||||
# Check nodes for dangling links below
|
||||
env.fetchAccounts.checkNodes.add w.partialPath
|
||||
else:
|
||||
# Node is still missing
|
||||
delayed.add w
|
||||
|
||||
# Must not modify sequence while looping over it
|
||||
env.fetchAccounts.sickSubTries = env.fetchAccounts.sickSubTries & delayed
|
||||
|
||||
|
||||
proc updateMissingNodesList(
|
||||
buddy: SnapBuddyRef;
|
||||
env: SnapPivotRef;
|
||||
|
@ -186,40 +162,23 @@ proc updateMissingNodesList(
|
|||
## fed back to the vey same list `checkNodes`
|
||||
let
|
||||
ctx = buddy.ctx
|
||||
db = ctx.data.snapDb
|
||||
peer = buddy.peer
|
||||
stateRoot = env.stateHeader.stateRoot
|
||||
db = ctx.data.snapDb
|
||||
|
||||
while env.fetchAccounts.sickSubTries.len < snapRequestTrieNodesFetchMax:
|
||||
# Inspect hexary trie for dangling nodes
|
||||
let rc = db.inspectAccountsTrie(
|
||||
peer, stateRoot,
|
||||
env.fetchAccounts.checkNodes, # start with these nodes
|
||||
env.fetchAccounts.resumeCtx, # resume previous attempt
|
||||
healInspectionBatch) # visit no more than this many nodes
|
||||
if rc.isErr:
|
||||
when extraTraceMessages:
|
||||
error logTxt "failed => stop", peer,
|
||||
ctx=buddy.healingCtx(env), error=rc.error
|
||||
# Attempt to switch peers, there is not much else we can do here
|
||||
let rc = await db.getAccountFn.subTriesFromPartialPaths(
|
||||
env.stateHeader.stateRoot, # State root related to pivot
|
||||
env.fetchAccounts, # Account download specs
|
||||
snapRequestTrieNodesFetchMax) # Maxinmal datagram request size
|
||||
if rc.isErr:
|
||||
if rc.error == TrieIsLockedForPerusal:
|
||||
trace logTxt "failed", peer,
|
||||
ctx=buddy.healingCtx(env), error=rc.error
|
||||
else:
|
||||
error logTxt "failed => stop", peer,
|
||||
ctx=buddy.healingCtx(env), error=rc.error
|
||||
# Attempt to switch pivot, there is not much else one can do here
|
||||
buddy.ctrl.zombie = true
|
||||
return false
|
||||
|
||||
# Update context for async threading environment
|
||||
env.fetchAccounts.resumeCtx = rc.value.resumeCtx
|
||||
env.fetchAccounts.checkNodes.setLen(0)
|
||||
|
||||
# Collect result
|
||||
env.fetchAccounts.sickSubTries =
|
||||
env.fetchAccounts.sickSubTries & rc.value.dangling
|
||||
|
||||
# Done unless there is some resumption context
|
||||
if rc.value.resumeCtx.isNil:
|
||||
break
|
||||
|
||||
# Allow async task switch and continue. Note that some other task might
|
||||
# steal some of the `env.fetchAccounts.sickSubTries`.
|
||||
await sleepAsync 1.nanoseconds
|
||||
return false
|
||||
|
||||
return true
|
||||
|
||||
|
@ -325,25 +284,17 @@ proc registerAccountLeaf(
|
|||
peer = buddy.peer
|
||||
pt = accKey.to(NodeTag)
|
||||
|
||||
# Find range set (from list) containing `pt`
|
||||
var ivSet: NodeTagRangeSet
|
||||
block foundCoveringRange:
|
||||
for w in env.fetchAccounts.unprocessed:
|
||||
if 0 < w.covered(pt,pt):
|
||||
ivSet = w
|
||||
break foundCoveringRange
|
||||
return # already processed, forget this account leaf
|
||||
# Register isolated leaf node
|
||||
if 0 < env.fetchAccounts.processed.merge(pt,pt) :
|
||||
env.nAccounts.inc
|
||||
env.fetchAccounts.unprocessed.reduce(pt,pt)
|
||||
discard buddy.ctx.data.coveredAccounts.merge(pt,pt)
|
||||
|
||||
# Register this isolated leaf node that was added
|
||||
env.nAccounts.inc
|
||||
discard ivSet.reduce(pt,pt)
|
||||
discard buddy.ctx.data.coveredAccounts.merge(pt,pt)
|
||||
|
||||
# Update storage slots batch
|
||||
if acc.storageRoot != emptyRlpHash:
|
||||
env.fetchStorageFull.merge AccountSlotsHeader(
|
||||
acckey: accKey,
|
||||
storageRoot: acc.storageRoot)
|
||||
# Update storage slots batch
|
||||
if acc.storageRoot != emptyRlpHash:
|
||||
env.fetchStorageFull.merge AccountSlotsHeader(
|
||||
acckey: accKey,
|
||||
storageRoot: acc.storageRoot)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions: do the healing for one round
|
||||
|
@ -362,19 +313,19 @@ proc accountsHealingImpl(
|
|||
peer = buddy.peer
|
||||
|
||||
# Update for changes since last visit
|
||||
buddy.verifyStillMissingNodes(env)
|
||||
try:
|
||||
db.getAccountFn.subTriesNodesReclassify(
|
||||
env.stateHeader.stateRoot.to(NodeKey), env.fetchAccounts)
|
||||
except Exception as e:
|
||||
raiseAssert "Not possible @ accountsHealingImpl(" & $e.name & "):" & e.msg
|
||||
|
||||
# If `checkNodes` is empty, healing is at the very start or was
|
||||
# postponed in which case `sickSubTries` is non-empty.
|
||||
if env.fetchAccounts.checkNodes.len != 0 or
|
||||
env.fetchAccounts.sickSubTries.len == 0:
|
||||
if not await buddy.updateMissingNodesList(env):
|
||||
return 0
|
||||
|
||||
# Check whether the trie is complete.
|
||||
if env.fetchAccounts.sickSubTries.len == 0:
|
||||
trace logTxt "complete", peer, ctx=buddy.healingCtx(env)
|
||||
return 0 # nothing to do
|
||||
# Traverse the hexary trie for more missing nodes. This call is expensive.
|
||||
if await buddy.updateMissingNodesList(env):
|
||||
# Check whether the trie is complete.
|
||||
if env.fetchAccounts.sickSubTries.len == 0:
|
||||
trace logTxt "complete", peer, ctx=buddy.healingCtx(env)
|
||||
return 0 # nothing to do
|
||||
|
||||
# Get next batch of nodes that need to be merged it into the database
|
||||
let nodeSpecs = await buddy.getMissingNodesFromNetwork(env)
|
||||
|
|
|
@ -34,7 +34,8 @@ import
|
|||
../../sync_desc,
|
||||
".."/[constants, range_desc, worker_desc],
|
||||
./com/[com_error, get_trie_nodes],
|
||||
./db/[hexary_desc, hexary_error, snapdb_storage_slots]
|
||||
./db/[hexary_desc, hexary_error, snapdb_storage_slots],
|
||||
./sub_tries_helper
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
|
@ -71,7 +72,7 @@ proc healingCtx(
|
|||
proc acceptWorkItemAsIs(
|
||||
buddy: SnapBuddyRef;
|
||||
kvp: SnapSlotsQueuePair;
|
||||
): Result[bool, HexaryDbError] =
|
||||
): Result[bool,HexaryError] =
|
||||
## Check whether this work item is done and the corresponding storage trie
|
||||
## can be completely inherited.
|
||||
if kvp.data.inherit:
|
||||
|
@ -140,36 +141,21 @@ proc updateMissingNodesList(
|
|||
storageRoot = kvp.key
|
||||
slots = kvp.data.slots
|
||||
|
||||
while slots.sickSubTries.len < snapRequestTrieNodesFetchMax:
|
||||
# Inspect hexary trie for dangling nodes
|
||||
let rc = db.inspectStorageSlotsTrie(
|
||||
peer, accKey, storageRoot,
|
||||
slots.checkNodes, # start with these nodes
|
||||
slots.resumeCtx, # resume previous attempt
|
||||
healStorageSlotsInspectionBatch) # visit no more than this many nodes
|
||||
if rc.isErr:
|
||||
when extraTraceMessages:
|
||||
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
|
||||
error logTxt "failed => stop", peer, itCtx=buddy.healingCtx(kvp,env),
|
||||
nSlotLists=env.nSlotLists, nStorageQueue, error=rc.error
|
||||
# Attempt to switch peers, there is not much else we can do here
|
||||
let rc = await db.getStorageSlotsFn(accKey).subTriesFromPartialPaths(
|
||||
storageRoot, # State root related to storage slots
|
||||
slots, # Storage slots download specs
|
||||
snapRequestTrieNodesFetchMax) # Maxinmal datagram request size
|
||||
if rc.isErr:
|
||||
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
|
||||
if rc.error == TrieIsLockedForPerusal:
|
||||
trace logTxt "failed", peer, itCtx=buddy.healingCtx(kvp,env),
|
||||
nSlotLists=env.nSlotLists, nStorageQueue, error=rc.error
|
||||
else:
|
||||
error logTxt "failed => stop", peer, itCtx=buddy.healingCtx(kvp,env),
|
||||
nSlotLists=env.nSlotLists, nStorageQueue, error=rc.error
|
||||
# Attempt to switch pivot, there is not much else one can do here
|
||||
buddy.ctrl.zombie = true
|
||||
return false
|
||||
|
||||
# Update context for async threading environment
|
||||
slots.resumeCtx = rc.value.resumeCtx
|
||||
slots.checkNodes.setLen(0)
|
||||
|
||||
# Collect result
|
||||
slots.sickSubTries = slots.sickSubTries & rc.value.dangling
|
||||
|
||||
# Done unless there is some resumption context
|
||||
if rc.value.resumeCtx.isNil:
|
||||
break
|
||||
|
||||
# Allow async task switch and continue. Note that some other task might
|
||||
# steal some of the `env.fetchAccounts.sickSubTries`.
|
||||
await sleepAsync 1.nanoseconds
|
||||
return false
|
||||
|
||||
return true
|
||||
|
||||
|
|
|
@ -36,6 +36,11 @@ proc init(batch: SnapRangeBatchRef; ctx: SnapCtxRef) =
|
|||
batch.unprocessed.init()
|
||||
batch.processed = NodeTagRangeSet.init()
|
||||
|
||||
# Initialise partial path the envelope of which covers the full range of
|
||||
# account keys `0..high(NodeTag)`. This will trigger healing on the full
|
||||
# range all possible keys.
|
||||
batch.checkNodes.add @[0.byte]
|
||||
|
||||
# Initialise accounts range fetch batch, the pair of `fetchAccounts[]`
|
||||
# range sets.
|
||||
if ctx.data.coveredAccounts.isFull:
|
||||
|
@ -123,9 +128,19 @@ proc update*(
|
|||
let rc = pivotTable.secondKey
|
||||
if rc.isOk:
|
||||
pivotTable.del rc.value
|
||||
|
||||
# Update healing threshold for top pivot entry
|
||||
topEnv = pivotTable.lastValue.value
|
||||
|
||||
else:
|
||||
discard pivotTable.lruAppend(header.stateRoot, env, ctx.buddiesMax)
|
||||
|
||||
# Update healing threshold
|
||||
let
|
||||
slots = max(0, healAccountsPivotTriggerNMax - pivotTable.len)
|
||||
delta = slots.float * healAccountsPivotTriggerWeight
|
||||
topEnv.healThresh = healAccountsPivotTriggerMinFactor + delta
|
||||
|
||||
|
||||
proc tickerStats*(
|
||||
pivotTable: var SnapPivotTable; ## Pivot table
|
||||
|
@ -189,13 +204,39 @@ proc tickerStats*(
|
|||
# Public functions: particular pivot
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc pivotAccountsComplete*(
|
||||
env: SnapPivotRef; ## Current pivot environment
|
||||
): bool =
|
||||
## Returns `true` if accounts are fully available for this this pivot.
|
||||
env.fetchAccounts.processed.isFull
|
||||
|
||||
|
||||
proc pivotAccountsHealingOk*(
|
||||
env: SnapPivotRef; ## Current pivot environment
|
||||
ctx: SnapCtxRef; ## Some global context
|
||||
): bool =
|
||||
## Returns `true` if accounts healing is enabled for this pivot.
|
||||
##
|
||||
if not env.pivotAccountsComplete():
|
||||
# Only start accounts healing if there is some completion level, already.
|
||||
#
|
||||
# We check against the global coverage factor, i.e. a measure for how much
|
||||
# of the total of all accounts have been processed. Even if the hexary trie
|
||||
# database for the current pivot state root is sparsely filled, there is a
|
||||
# good chance that it can inherit some unchanged sub-trie from an earlier
|
||||
# pivot state root download. The healing process then works like sort of
|
||||
# glue.
|
||||
if healAccountsCoverageTrigger <= ctx.data.coveredAccounts.fullFactor:
|
||||
# Ditto for pivot.
|
||||
if env.healThresh <= env.fetchAccounts.processed.fullFactor:
|
||||
return true
|
||||
|
||||
|
||||
proc execSnapSyncAction*(
|
||||
env: SnapPivotRef; ## Current pivot environment
|
||||
buddy: SnapBuddyRef; ## Worker peer
|
||||
): Future[bool]
|
||||
{.async.} =
|
||||
## Execute a synchronisation run. The return code is `true` if a full
|
||||
## synchronisation cycle could be executed.
|
||||
) {.async.} =
|
||||
## Execute a synchronisation run.
|
||||
let
|
||||
ctx = buddy.ctx
|
||||
|
||||
|
@ -205,58 +246,34 @@ proc execSnapSyncAction*(
|
|||
if snapStorageSlotsQuPrioThresh < nStoQu:
|
||||
await buddy.rangeFetchStorageSlots(env)
|
||||
if buddy.ctrl.stopped or env.obsolete:
|
||||
return false
|
||||
return
|
||||
|
||||
if env.accountsState != HealerDone:
|
||||
if not env.pivotAccountsComplete():
|
||||
await buddy.rangeFetchAccounts(env)
|
||||
if buddy.ctrl.stopped or env.obsolete:
|
||||
return false
|
||||
return
|
||||
|
||||
await buddy.rangeFetchStorageSlots(env)
|
||||
if buddy.ctrl.stopped or env.obsolete:
|
||||
return false
|
||||
return
|
||||
|
||||
if not ctx.data.accountsHealing:
|
||||
# Only start healing if there is some completion level, already.
|
||||
#
|
||||
# We check against the global coverage factor, i.e. a measure for how
|
||||
# much of the total of all accounts have been processed. Even if the
|
||||
# hexary trie database for the current pivot state root is sparsely
|
||||
# filled, there is a good chance that it can inherit some unchanged
|
||||
# sub-trie from an earlier pivor state root download. The healing
|
||||
# process then works like sort of glue.
|
||||
if 0 < env.nAccounts:
|
||||
if healAccountsTrigger <= ctx.data.coveredAccounts.fullFactor:
|
||||
ctx.data.accountsHealing = true
|
||||
|
||||
if ctx.data.accountsHealing:
|
||||
# Can only run a single accounts healer instance at a time. This
|
||||
# instance will clear the batch queue so there is nothing to do for
|
||||
# another process.
|
||||
if env.accountsState == HealerIdle:
|
||||
env.accountsState = HealerRunning
|
||||
await buddy.healAccounts(env)
|
||||
env.accountsState = HealerIdle
|
||||
|
||||
if buddy.ctrl.stopped or env.obsolete:
|
||||
return false
|
||||
if env.pivotAccountsHealingOk(ctx):
|
||||
await buddy.healAccounts(env)
|
||||
if buddy.ctrl.stopped or env.obsolete:
|
||||
return
|
||||
|
||||
# Some additional storage slots might have been popped up
|
||||
await buddy.rangeFetchStorageSlots(env)
|
||||
if buddy.ctrl.stopped or env.obsolete:
|
||||
return false
|
||||
return
|
||||
|
||||
await buddy.healStorageSlots(env)
|
||||
if buddy.ctrl.stopped or env.obsolete:
|
||||
return false
|
||||
|
||||
return true
|
||||
|
||||
|
||||
proc saveCheckpoint*(
|
||||
env: SnapPivotRef; ## Current pivot environment
|
||||
ctx: SnapCtxRef; ## Some global context
|
||||
): Result[int,HexaryDbError] =
|
||||
): Result[int,HexaryError] =
|
||||
## Save current sync admin data. On success, the size of the data record
|
||||
## saved is returned (e.g. for logging.)
|
||||
##
|
||||
|
|
|
@ -29,17 +29,17 @@
|
|||
## * Data points in `iv` that were invalid or not recevied from the network
|
||||
## are merged back it the set `env.fetchAccounts.unprocessed`.
|
||||
##
|
||||
|
||||
import
|
||||
chronicles,
|
||||
chronos,
|
||||
eth/[common, p2p],
|
||||
stew/[interval_set, keyed_queue],
|
||||
stint,
|
||||
../../../utils/prettify,
|
||||
../../sync_desc,
|
||||
".."/[constants, range_desc, worker_desc],
|
||||
./com/[com_error, get_account_range],
|
||||
./db/snapdb_accounts
|
||||
./db/[hexary_paths, snapdb_accounts]
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
|
@ -57,21 +57,21 @@ const
|
|||
template logTxt(info: static[string]): static[string] =
|
||||
"Accounts range " & info
|
||||
|
||||
proc dumpUnprocessed(
|
||||
buddy: SnapBuddyRef;
|
||||
env: SnapPivotRef;
|
||||
): string =
|
||||
## Debugging ...
|
||||
let
|
||||
peer = buddy.peer
|
||||
pivot = "#" & $env.stateHeader.blockNumber # for logging
|
||||
moan = proc(overlap: UInt256; iv: NodeTagRange) =
|
||||
trace logTxt "unprocessed => overlap", peer, pivot, overlap, iv
|
||||
|
||||
env.fetchAccounts.unprocessed.dump(moan, 5)
|
||||
# proc dumpUnprocessed(
|
||||
# buddy: SnapBuddyRef;
|
||||
# env: SnapPivotRef;
|
||||
# ): string =
|
||||
# ## Debugging ...
|
||||
# let
|
||||
# peer = buddy.peer
|
||||
# pivot = "#" & $env.stateHeader.blockNumber # for logging
|
||||
# moan = proc(overlap: UInt256; iv: NodeTagRange) =
|
||||
# trace logTxt "unprocessed => overlap", peer, pivot, overlap, iv
|
||||
#
|
||||
# env.fetchAccounts.unprocessed.dump(moan, 5)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions
|
||||
# Private helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc getUnprocessed(
|
||||
|
@ -97,6 +97,8 @@ proc accountsRangefetchImpl(
|
|||
let
|
||||
ctx = buddy.ctx
|
||||
peer = buddy.peer
|
||||
db = ctx.data.snapDb
|
||||
fa = env.fetchAccounts
|
||||
stateRoot = env.stateHeader.stateRoot
|
||||
pivot = "#" & $env.stateHeader.blockNumber # for logging
|
||||
|
||||
|
@ -113,7 +115,7 @@ proc accountsRangefetchImpl(
|
|||
let dd = block:
|
||||
let rc = await buddy.getAccountRange(stateRoot, iv, pivot)
|
||||
if rc.isErr:
|
||||
env.fetchAccounts.unprocessed.merge iv # fail => interval back to pool
|
||||
fa.unprocessed.merge iv # fail => interval back to pool
|
||||
let error = rc.error
|
||||
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors):
|
||||
when extraTraceMessages:
|
||||
|
@ -132,29 +134,29 @@ proc accountsRangefetchImpl(
|
|||
# trace logTxt "fetched", peer, gotAccounts, gotStorage,
|
||||
# pivot, reqLen=iv.len, gotLen=dd.consumed.len
|
||||
|
||||
# Now, we fully own the scheduler. The original interval will savely be
|
||||
# placed back for a moment -- to be corrected below.
|
||||
env.fetchAccounts.unprocessed.merge iv
|
||||
# Now, we fully own the scheduler. The original interval will savely be placed
|
||||
# back for a moment (the `unprocessed` range set to be corrected below.)
|
||||
fa.unprocessed.merge iv
|
||||
|
||||
# Processed accounts hashes are set up as a set of intervals which is needed
|
||||
# if the data range returned from the network contains holes.
|
||||
let processed = NodeTagRangeSet.init()
|
||||
let covered = NodeTagRangeSet.init()
|
||||
if 0 < dd.data.accounts.len:
|
||||
discard processed.merge(iv.minPt, dd.data.accounts[^1].accKey.to(NodeTag))
|
||||
discard covered.merge(iv.minPt, dd.data.accounts[^1].accKey.to(NodeTag))
|
||||
else:
|
||||
discard processed.merge iv
|
||||
discard covered.merge iv
|
||||
|
||||
let gaps = block:
|
||||
# No left boundary check needed. If there is a gap, the partial path for
|
||||
# that gap is returned by the import function to be registered, below.
|
||||
let rc = ctx.data.snapDb.importAccounts(
|
||||
let rc = db.importAccounts(
|
||||
peer, stateRoot, iv.minPt, dd.data, noBaseBoundCheck = true)
|
||||
if rc.isErr:
|
||||
# Bad data, just try another peer
|
||||
buddy.ctrl.zombie = true
|
||||
when extraTraceMessages:
|
||||
trace logTxt "import failed => stop", peer, gotAccounts, gotStorage,
|
||||
pivot, reqLen=iv.len, gotLen=processed.total, error=rc.error
|
||||
pivot, reqLen=iv.len, gotLen=covered.total, error=rc.error
|
||||
return
|
||||
rc.value
|
||||
|
||||
|
@ -164,47 +166,23 @@ proc accountsRangefetchImpl(
|
|||
# Punch holes into the reported range of received accounts from the network
|
||||
# if it there are gaps (described by dangling nodes.)
|
||||
for w in gaps.innerGaps:
|
||||
discard processed.reduce(
|
||||
w.partialPath.min(NodeKey).to(NodeTag),
|
||||
w.partialPath.max(NodeKey).to(Nodetag))
|
||||
|
||||
# Update dangling nodes list unless healing is activated. The problem
|
||||
# with healing activated is, that a previously missing node that suddenly
|
||||
# appears will not automatically translate into a full sub-trie. It might
|
||||
# just be the node itself (which justified the name `sickSubTrie`).
|
||||
#
|
||||
# Instead of managing partial sub-tries here, this is delegated to the
|
||||
# healing module.
|
||||
if not ctx.data.accountsHealing:
|
||||
var delayed: seq[NodeSpecs]
|
||||
for w in env.fetchAccounts.sickSubTries:
|
||||
if not ctx.data.snapDb.nodeExists(peer, stateRoot, w):
|
||||
delayed.add w
|
||||
when extraTraceMessages:
|
||||
trace logTxt "dangling nodes", peer, pivot,
|
||||
nCheckNodes=env.fetchAccounts.checkNodes.len,
|
||||
nSickSubTries=env.fetchAccounts.sickSubTries.len,
|
||||
nUpdatedMissing=delayed.len, nOutsideGaps=gaps.dangling.len
|
||||
env.fetchAccounts.sickSubTries = delayed & gaps.dangling
|
||||
discard covered.reduce w.partialPath.pathEnvelope
|
||||
|
||||
# Update book keeping
|
||||
for w in processed.increasing:
|
||||
for w in covered.increasing:
|
||||
# Remove the processed range from the batch of unprocessed ones.
|
||||
env.fetchAccounts.unprocessed.reduce w
|
||||
# Register consumed intervals on the accumulator over all state roots.
|
||||
fa.unprocessed.reduce w
|
||||
# Register consumed intervals on the accumulators over all state roots.
|
||||
discard buddy.ctx.data.coveredAccounts.merge w
|
||||
discard fa.processed.merge w
|
||||
|
||||
# Register accounts with storage slots on the storage TODO list.
|
||||
env.fetchStorageFull.merge dd.withStorage
|
||||
|
||||
#when extraTraceMessages:
|
||||
# let
|
||||
# imported = processed.dump()
|
||||
# unprocessed = buddy.dumpUnprocessed(env)
|
||||
# trace logTxt "request done", peer, pivot,
|
||||
# nCheckNodes=env.fetchAccounts.checkNodes.len,
|
||||
# nSickSubTries=env.fetchAccounts.sickSubTries.len,
|
||||
# imported, unprocessed
|
||||
when extraTraceMessages:
|
||||
trace logTxt "request done", peer, pivot,
|
||||
covered=covered.fullFactor.toPC(2),
|
||||
processed=fa.processed.fullFactor.toPC(2)
|
||||
|
||||
return true
|
||||
|
||||
|
@ -217,7 +195,10 @@ proc rangeFetchAccounts*(
|
|||
env: SnapPivotRef;
|
||||
) {.async.} =
|
||||
## Fetch accounts and store them in the database.
|
||||
if not env.fetchAccounts.unprocessed.isEmpty():
|
||||
let
|
||||
fa = env.fetchAccounts
|
||||
|
||||
if not fa.processed.isFull():
|
||||
let
|
||||
ctx = buddy.ctx
|
||||
peer = buddy.peer
|
||||
|
@ -226,8 +207,8 @@ proc rangeFetchAccounts*(
|
|||
when extraTraceMessages:
|
||||
trace logTxt "start", peer, pivot
|
||||
|
||||
var nFetchAccounts = 0
|
||||
while not env.fetchAccounts.unprocessed.isEmpty() and
|
||||
var nFetchAccounts = 0 # for logging
|
||||
while not fa.processed.isFull() and
|
||||
buddy.ctrl.running and
|
||||
not env.obsolete:
|
||||
nFetchAccounts.inc
|
||||
|
@ -241,6 +222,7 @@ proc rangeFetchAccounts*(
|
|||
|
||||
when extraTraceMessages:
|
||||
trace logTxt "done", peer, pivot, nFetchAccounts,
|
||||
nCheckNodes=fa.checkNodes.len, nSickSubTries=fa.sickSubTries.len,
|
||||
runState=buddy.ctrl.state
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
|
|
|
@ -61,7 +61,7 @@ import
|
|||
../../sync_desc,
|
||||
".."/[constants, range_desc, worker_desc],
|
||||
./com/[com_error, get_storage_ranges],
|
||||
./db/snapdb_storage_slots
|
||||
./db/[hexary_error, snapdb_storage_slots]
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
|
@ -131,28 +131,24 @@ proc getNextSlotItemPartial(
|
|||
|
||||
for kvp in env.fetchStoragePart.nextPairs:
|
||||
if not kvp.data.slots.isNil:
|
||||
# Extract first interval and return single item request queue
|
||||
for ivSet in kvp.data.slots.unprocessed:
|
||||
let rc = ivSet.ge()
|
||||
if rc.isOk:
|
||||
# Extract range and return single item request queue
|
||||
let rc = kvp.data.slots.unprocessed.fetch(maxLen = high(UInt256))
|
||||
if rc.isOk:
|
||||
|
||||
# Extraxt this interval from the range set
|
||||
discard ivSet.reduce rc.value
|
||||
# Delete from batch queue if the range set becomes empty
|
||||
if kvp.data.slots.unprocessed.isEmpty:
|
||||
env.fetchStoragePart.del(kvp.key)
|
||||
|
||||
# Delete from batch queue if the range set becomes empty
|
||||
if kvp.data.slots.unprocessed.isEmpty:
|
||||
env.fetchStoragePart.del(kvp.key)
|
||||
when extraTraceMessages:
|
||||
trace logTxt "fetch partial", peer,
|
||||
nSlotLists=env.nSlotLists,
|
||||
nStorageQuPart=env.fetchStoragePart.len,
|
||||
subRange=rc.value, account=kvp.data.accKey
|
||||
|
||||
when extraTraceMessages:
|
||||
trace logTxt "fetch partial", peer,
|
||||
nSlotLists=env.nSlotLists,
|
||||
nStorageQuPart=env.fetchStoragePart.len,
|
||||
subRange=rc.value, account=kvp.data.accKey
|
||||
|
||||
return @[AccountSlotsHeader(
|
||||
accKey: kvp.data.accKey,
|
||||
storageRoot: kvp.key,
|
||||
subRange: some rc.value)]
|
||||
return @[AccountSlotsHeader(
|
||||
accKey: kvp.data.accKey,
|
||||
storageRoot: kvp.key,
|
||||
subRange: some rc.value)]
|
||||
|
||||
# Oops, empty range set? Remove range and move item to the full requests
|
||||
kvp.data.slots = nil
|
||||
|
@ -231,20 +227,54 @@ proc storeStoragesSingleBatch(
|
|||
for w in report:
|
||||
# All except the last item always index to a node argument. The last
|
||||
# item has been checked for, already.
|
||||
let inx = w.slot.get
|
||||
let
|
||||
inx = w.slot.get
|
||||
acc = stoRange.data.storages[inx].account
|
||||
|
||||
# if w.error in {RootNodeMismatch, RightBoundaryProofFailed}:
|
||||
# ???
|
||||
if w.error == RootNodeMismatch:
|
||||
# Some pathological case, needs further investigation. For the
|
||||
# moment, provide partial fetches.
|
||||
const
|
||||
halfTag = (high(UInt256) div 2).NodeTag
|
||||
halfTag1 = halfTag + 1.u256
|
||||
env.fetchStoragePart.merge [
|
||||
AccountSlotsHeader(
|
||||
accKey: acc.accKey,
|
||||
storageRoot: acc.storageRoot,
|
||||
subRange: some NodeTagRange.new(low(NodeTag), halfTag)),
|
||||
AccountSlotsHeader(
|
||||
accKey: acc.accKey,
|
||||
storageRoot: acc.storageRoot,
|
||||
subRange: some NodeTagRange.new(halfTag1, high(NodeTag)))]
|
||||
|
||||
# Reset any partial result (which would be the last entry) to
|
||||
# requesting the full interval. So all the storage slots are
|
||||
# re-fetched completely for this account.
|
||||
env.fetchStorageFull.merge AccountSlotsHeader(
|
||||
accKey: stoRange.data.storages[inx].account.accKey,
|
||||
storageRoot: stoRange.data.storages[inx].account.storageRoot)
|
||||
elif w.error == RightBoundaryProofFailed and
|
||||
acc.subRange.isSome and 1 < acc.subRange.unsafeGet.len:
|
||||
# Some pathological case, needs further investigation. For the
|
||||
# moment, provide a partial fetches.
|
||||
let
|
||||
iv = acc.subRange.unsafeGet
|
||||
halfTag = iv.minPt + (iv.len div 2)
|
||||
halfTag1 = halfTag + 1.u256
|
||||
env.fetchStoragePart.merge [
|
||||
AccountSlotsHeader(
|
||||
accKey: acc.accKey,
|
||||
storageRoot: acc.storageRoot,
|
||||
subRange: some NodeTagRange.new(iv.minPt, halfTag)),
|
||||
AccountSlotsHeader(
|
||||
accKey: acc.accKey,
|
||||
storageRoot: acc.storageRoot,
|
||||
subRange: some NodeTagRange.new(halfTag1, iv.maxPt))]
|
||||
|
||||
# Last entry might be partial
|
||||
if inx == topStoRange:
|
||||
else:
|
||||
# Reset any partial result (which would be the last entry) to
|
||||
# requesting the full interval. So all the storage slots are
|
||||
# re-fetched completely for this account.
|
||||
env.fetchStorageFull.merge AccountSlotsHeader(
|
||||
accKey: acc.accKey,
|
||||
storageRoot: acc.storageRoot)
|
||||
|
||||
# Last entry might be partial (if any)
|
||||
#
|
||||
# Forget about partial result processing if the last partial entry
|
||||
# was reported because
|
||||
# * either there was an error processing it
|
||||
|
|
|
@ -0,0 +1,223 @@
|
|||
# Nimbus
|
||||
# Copyright (c) 2021 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
||||
# http://opensource.org/licenses/MIT)
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
import
|
||||
chronicles,
|
||||
chronos,
|
||||
eth/[common, p2p],
|
||||
stew/interval_set,
|
||||
".."/[constants, range_desc, worker_desc],
|
||||
./db/[hexary_desc, hexary_error, hexary_inspect, hexary_paths]
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
logScope:
|
||||
topics = "snap-subtrie"
|
||||
|
||||
const
|
||||
extraTraceMessages = false or true
|
||||
## Enabled additional logging noise
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private logging helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
template logTxt(info: static[string]): static[string] =
|
||||
"Sub-trie helper " & info
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc doInspect(
|
||||
getFn: HexaryGetFn; ## Abstract database access
|
||||
rootKey: NodeKey; ## Start of hexary trie
|
||||
partialPaths: seq[Blob]; ## Nodes with prob. dangling child links
|
||||
resumeCtx: TrieNodeStatCtxRef; ## Resume previous inspection
|
||||
): Result[TrieNodeStat,HexaryError]
|
||||
{.gcsafe, raises: [Defect,RlpError].} =
|
||||
## ..
|
||||
let stats = getFn.hexaryInspectTrie(
|
||||
rootKey, partialPaths, resumeCtx, healInspectionBatch)
|
||||
|
||||
if stats.stopped:
|
||||
return err(TrieLoopAlert)
|
||||
|
||||
ok(stats)
|
||||
|
||||
|
||||
proc getOverlapping(
|
||||
batch: SnapRangeBatchRef; ## Healing data support
|
||||
iv: NodeTagRange; ## Reference interval
|
||||
): Result[NodeTagRange,void] =
|
||||
## Find overlapping interval in `batch`
|
||||
block:
|
||||
let rc = batch.processed.ge iv.minPt
|
||||
if rc.isOk and rc.value.minPt <= iv.maxPt:
|
||||
return ok(rc.value)
|
||||
block:
|
||||
let rc = batch.processed.le iv.maxPt
|
||||
if rc.isOk and iv.minPt <= rc.value.maxPt:
|
||||
return ok(rc.value)
|
||||
err()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc subTriesFromPartialPaths*(
|
||||
getFn: HexaryGetFn; ## Abstract database access
|
||||
stateRoot: Hash256; ## Start of hexary trie
|
||||
batch: SnapRangeBatchRef; ## Healing data support
|
||||
sickSubTriesMaxLen = high(int); ## Max length of `sickSubTries`
|
||||
): Future[Result[void,HexaryError]]
|
||||
{.async.} =
|
||||
## Starting with a given set of potentially dangling account nodes
|
||||
## `checkNodes`, this set is filtered and processed. The outcome is
|
||||
## fed back to the vey same list `checkNodes`
|
||||
|
||||
# Process might be expensive, so only a single instance is allowed to run it
|
||||
if batch.lockTriePerusal:
|
||||
return err(TrieIsLockedForPerusal)
|
||||
batch.lockTriePerusal = true
|
||||
|
||||
let
|
||||
rootKey = stateRoot.to(NodeKey)
|
||||
var
|
||||
error: HexaryError
|
||||
count = 0 # for logging
|
||||
start = Moment.now() # for logging
|
||||
|
||||
block errorWhenOutside:
|
||||
try:
|
||||
while batch.sickSubTries.len < sickSubTriesMaxLen:
|
||||
# Inspect hexary trie for dangling nodes
|
||||
let rc = getFn.doInspect(rootKey, batch.checkNodes, batch.resumeCtx)
|
||||
if rc.isErr:
|
||||
error = rc.error
|
||||
break errorWhenOutside
|
||||
|
||||
count.inc
|
||||
|
||||
# Update context for async threading environment
|
||||
batch.resumeCtx = rc.value.resumeCtx
|
||||
batch.checkNodes.setLen(0)
|
||||
|
||||
# Collect result
|
||||
batch.sickSubTries = batch.sickSubTries & rc.value.dangling
|
||||
|
||||
# Done unless there is some resumption context
|
||||
if rc.value.resumeCtx.isNil:
|
||||
break
|
||||
|
||||
when extraTraceMessages:
|
||||
trace logTxt "inspection wait", count,
|
||||
elapsed=(Moment.now()-start),
|
||||
sleep=healInspectionBatchWaitNanoSecs,
|
||||
sickSubTriesLen=batch.sickSubTries.len, sickSubTriesMaxLen,
|
||||
resumeCtxLen = batch.resumeCtx.hddCtx.len
|
||||
|
||||
# Allow async task switch and continue. Note that some other task might
|
||||
# steal some of the `sickSubTries` var argument.
|
||||
await sleepAsync healInspectionBatchWaitNanoSecs.nanoseconds
|
||||
|
||||
batch.lockTriePerusal = false
|
||||
return ok()
|
||||
|
||||
except RlpError:
|
||||
error = RlpEncoding
|
||||
|
||||
batch.sickSubTries = batch.sickSubTries & batch.resumeCtx.to(seq[NodeSpecs])
|
||||
batch.resumeCtx = nil
|
||||
|
||||
batch.lockTriePerusal = false
|
||||
return err(error)
|
||||
|
||||
|
||||
proc subTriesNodesReclassify*(
|
||||
getFn: HexaryGetFn; ## Abstract database access
|
||||
rootKey: NodeKey; ## Start node into hexary trie
|
||||
batch: SnapRangeBatchRef; ## Healing data support
|
||||
) {.gcsafe, raises: [Defect,KeyError].} =
|
||||
## Check whether previously missing nodes from the `sickSubTries` list have
|
||||
## been magically added to the database since it was checked last time. These
|
||||
## nodes will me moved to `checkNodes` for further processing. Also, some
|
||||
## full sub-tries might have been added which can be checked against
|
||||
## the `processed` range set.
|
||||
|
||||
# Move `sickSubTries` entries that have now an exisiting node to the
|
||||
# list of partial paths to be re-checked.
|
||||
block:
|
||||
var delayed: seq[NodeSpecs]
|
||||
for w in batch.sickSubTries:
|
||||
if 0 < getFn(w.nodeKey.ByteArray32).len:
|
||||
batch.checkNodes.add w.partialPath
|
||||
else:
|
||||
delayed.add w
|
||||
batch.sickSubTries = delayed
|
||||
|
||||
# Remove `checkNodes` entries with complete known sub-tries.
|
||||
var
|
||||
doneWith: seq[Blob] # loop will not recurse on that list
|
||||
count = 0 # for logging only
|
||||
|
||||
# `While` loop will terminate with processed paths in `doneWith`.
|
||||
block:
|
||||
var delayed: seq[Blob]
|
||||
while 0 < batch.checkNodes.len:
|
||||
|
||||
when extraTraceMessages:
|
||||
trace logTxt "reclassify", count,
|
||||
nCheckNodes=batch.checkNodes.len
|
||||
|
||||
for w in batch.checkNodes:
|
||||
let
|
||||
iv = w.pathEnvelope
|
||||
nCov = batch.processed.covered iv
|
||||
|
||||
if iv.len <= nCov:
|
||||
# Fully processed envelope, no need to keep `w` any longer
|
||||
when extraTraceMessages:
|
||||
trace logTxt "reclassify discard", count, partialPath=w,
|
||||
nDelayed=delayed.len
|
||||
continue
|
||||
|
||||
if 0 < nCov:
|
||||
# Partially processed range, fetch an overlapping interval and
|
||||
# remove that from the envelope of `w`.
|
||||
try:
|
||||
let paths = w.dismantle(
|
||||
rootKey, batch.getOverlapping(iv).value, getFn)
|
||||
delayed &= paths
|
||||
when extraTraceMessages:
|
||||
trace logTxt "reclassify dismantled", count, partialPath=w,
|
||||
nPaths=paths.len, nDelayed=delayed.len
|
||||
continue
|
||||
except RlpError:
|
||||
discard
|
||||
|
||||
# Not processed at all. So keep `w` but there is no need to look
|
||||
# at it again in the next lap.
|
||||
doneWith.add w
|
||||
|
||||
# Prepare for next lap
|
||||
batch.checkNodes.swap delayed
|
||||
delayed.setLen(0)
|
||||
|
||||
batch.checkNodes = doneWith.pathSortUniq
|
||||
|
||||
when extraTraceMessages:
|
||||
trace logTxt "reclassify finalise", count,
|
||||
nDoneWith=doneWith.len, nCheckNodes=batch.checkNodes.len
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
||||
|
|
@ -54,14 +54,7 @@ type
|
|||
checkNodes*: seq[Blob] ## Nodes with prob. dangling child links
|
||||
sickSubTries*: seq[NodeSpecs] ## Top ref for sub-tries to be healed
|
||||
resumeCtx*: TrieNodeStatCtxRef ## State for resuming trie inpection
|
||||
|
||||
SnapHealingState* = enum
|
||||
## State of healing process. The `HealerRunning` state indicates that
|
||||
## dangling and/or missing nodes have been temprarily removed from the
|
||||
## batch queue while processing.
|
||||
HealerIdle
|
||||
HealerRunning
|
||||
HealerDone
|
||||
lockTriePerusal*: bool ## Only one process at a time
|
||||
|
||||
SnapPivotRef* = ref object
|
||||
## Per-state root cache for particular snap data environment
|
||||
|
@ -69,7 +62,6 @@ type
|
|||
|
||||
# Accounts download
|
||||
fetchAccounts*: SnapRangeBatchRef ## Set of accounts ranges to fetch
|
||||
accountsState*: SnapHealingState ## All accounts have been processed
|
||||
healThresh*: float ## Start healing when fill factor reached
|
||||
|
||||
# Storage slots download
|
||||
|
@ -107,7 +99,6 @@ type
|
|||
pivotTable*: SnapPivotTable ## Per state root environment
|
||||
pivotFinderCtx*: RootRef ## Opaque object reference for sub-module
|
||||
coveredAccounts*: NodeTagRangeSet ## Derived from all available accounts
|
||||
accountsHealing*: bool ## Activates accounts DB healing
|
||||
recovery*: SnapRecoveryRef ## Current recovery checkpoint/context
|
||||
noRecovery*: bool ## Ignore recovery checkpoints
|
||||
|
||||
|
|
|
@ -65,8 +65,8 @@ else:
|
|||
|
||||
let
|
||||
# Forces `check()` to print the error (as opposed when using `isOk()`)
|
||||
OkHexDb = Result[void,HexaryDbError].ok()
|
||||
OkStoDb = Result[void,seq[(int,HexaryDbError)]].ok()
|
||||
OkHexDb = Result[void,HexaryError].ok()
|
||||
OkStoDb = Result[void,seq[(int,HexaryError)]].ok()
|
||||
|
||||
# There was a problem with the Github/CI which results in spurious crashes
|
||||
# when leaving the `runner()` if the persistent BaseChainDB initialisation
|
||||
|
@ -88,7 +88,7 @@ var
|
|||
proc isOk(rc: ValidationResult): bool =
|
||||
rc == ValidationResult.OK
|
||||
|
||||
proc isImportOk(rc: Result[SnapAccountsGaps,HexaryDbError]): bool =
|
||||
proc isImportOk(rc: Result[SnapAccountsGaps,HexaryError]): bool =
|
||||
if rc.isErr:
|
||||
check rc.error == NothingSerious # prints an error if different
|
||||
elif 0 < rc.value.innerGaps.len:
|
||||
|
@ -96,7 +96,7 @@ proc isImportOk(rc: Result[SnapAccountsGaps,HexaryDbError]): bool =
|
|||
else:
|
||||
return true
|
||||
|
||||
proc toStoDbRc(r: seq[HexaryNodeReport]): Result[void,seq[(int,HexaryDbError)]]=
|
||||
proc toStoDbRc(r: seq[HexaryNodeReport]): Result[void,seq[(int,HexaryError)]]=
|
||||
## Kludge: map error report to (older version) return code
|
||||
if r.len != 0:
|
||||
return err(r.mapIt((it.slot.get(otherwise = -1),it.error)))
|
||||
|
@ -125,13 +125,13 @@ proc pp(d: Duration): string =
|
|||
else:
|
||||
d.ppUs
|
||||
|
||||
proc pp(rc: Result[Account,HexaryDbError]): string =
|
||||
proc pp(rc: Result[Account,HexaryError]): string =
|
||||
if rc.isErr: $rc.error else: rc.value.pp
|
||||
|
||||
proc pp(rc: Result[Hash256,HexaryDbError]): string =
|
||||
proc pp(rc: Result[Hash256,HexaryError]): string =
|
||||
if rc.isErr: $rc.error else: $rc.value.to(NodeTag)
|
||||
|
||||
proc pp(rc: Result[TrieNodeStat,HexaryDbError]; db: SnapDbBaseRef): string =
|
||||
proc pp(rc: Result[TrieNodeStat,HexaryError]; db: SnapDbBaseRef): string =
|
||||
if rc.isErr: $rc.error else: rc.value.pp(db.hexaDb)
|
||||
|
||||
proc pp(a: NodeKey; collapse = true): string =
|
||||
|
@ -409,6 +409,56 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) =
|
|||
# Beware: dumping a large database is not recommended
|
||||
#true.say "***", "database dump\n ", desc.dumpHexaDB()
|
||||
|
||||
test "Dismantle path prefix envelopes":
|
||||
doAssert 1 < accKeys.len
|
||||
let
|
||||
iv = NodeTagRange.new(accBaseTag, accKeys[^2].to(NodeTag))
|
||||
ivMin = iv.minPt.to(NodeKey).ByteArray32.toSeq.initNibbleRange
|
||||
ivMax = iv.maxPt.to(NodeKey).ByteArray32.toSeq.initNibbleRange
|
||||
pfxLen = ivMin.sharedPrefixLen ivMax
|
||||
# Use some overlapping prefixes. Note that a prefix must refer to
|
||||
# an existing node
|
||||
for n in 0 .. pfxLen:
|
||||
let
|
||||
pfx = ivMin.slice(0, pfxLen - n).hexPrefixEncode
|
||||
qfx = pfx.dismantle(root.to(NodeKey), iv, desc.hexaDB)
|
||||
|
||||
# Re-assemble intervals
|
||||
let covered = NodeTagRangeSet.init()
|
||||
for w in qfx:
|
||||
let iv = pathEnvelope w
|
||||
check iv.len == covered.merge iv
|
||||
|
||||
if covered.chunks == 1 and iv.minPt == low(NodeTag):
|
||||
# Order: `iv` <= `covered`
|
||||
check iv.maxPt <= covered.ge.value.minPt
|
||||
elif covered.chunks == 1 and iv.maxPt == high(NodeTag):
|
||||
# Order: `covered` <= `iv`
|
||||
check covered.ge.value.maxPt <= iv.minPt
|
||||
else:
|
||||
# Covered contains two ranges were the gap is big enough for `iv`
|
||||
check covered.chunks == 2
|
||||
# Order: `covered.ge` <= `iv` <= `covered.le`
|
||||
check covered.ge.value.maxPt <= iv.minPt
|
||||
check iv.maxPt <= covered.le.value.minPt
|
||||
|
||||
# Must hold
|
||||
check covered.le.value.minPt <= accKeys[^1].to(Nodetag)
|
||||
|
||||
when false: # or true:
|
||||
let
|
||||
cmaNlSp0 = ",\n" & repeat(" ",12)
|
||||
cmaNlSpc = ",\n" & repeat(" ",13)
|
||||
echo ">>> n=", n, " pfxMax=", pfxLen,
|
||||
"\n pfx=", pfx,
|
||||
"\n ivMin=", ivMin,
|
||||
"\n iv1st=", accKeys[0],
|
||||
"\n ivMax=", ivMax,
|
||||
"\n ivPast=", accKeys[^1],
|
||||
"\n covered=@[", toSeq(covered.increasing)
|
||||
.mapIt(&"[{it.minPt}{cmaNlSpc}{it.maxPt}]")
|
||||
.join(cmaNlSp0), "]",
|
||||
"\n => @[", qfx.mapIt(it.toHex).join(cmaNlSpc), "]"
|
||||
|
||||
test &"Storing/retrieving {accKeys.len} items " &
|
||||
"on persistent state root registry":
|
||||
|
@ -464,7 +514,7 @@ proc storagesRunner(
|
|||
noisy = true;
|
||||
persistent = true;
|
||||
sample = storSample;
|
||||
knownFailures: seq[(string,seq[(int,HexaryDbError)])] = @[]) =
|
||||
knownFailures: seq[(string,seq[(int,HexaryError)])] = @[]) =
|
||||
let
|
||||
peer = Peer.new
|
||||
accountsList = sample.to(seq[UndumpAccounts])
|
||||
|
@ -502,7 +552,7 @@ proc storagesRunner(
|
|||
let
|
||||
testId = fileInfo & "#" & $n
|
||||
expRc = if ignore.hasKey(testId):
|
||||
Result[void,seq[(int,HexaryDbError)]].err(ignore[testId])
|
||||
Result[void,seq[(int,HexaryError)]].err(ignore[testId])
|
||||
else:
|
||||
OkStoDb
|
||||
check dbDesc.importStorageSlots(w.data, persistent).toStoDbRc == expRc
|
||||
|
@ -521,7 +571,7 @@ proc storagesRunner(
|
|||
dbDesc = SnapDbStorageSlotsRef.init(dbBase, accKey, root, peer)
|
||||
rc = dbDesc.inspectStorageSlotsTrie(persistent=persistent)
|
||||
if m == errInx:
|
||||
check rc == Result[TrieNodeStat,HexaryDbError].err(TrieIsEmpty)
|
||||
check rc == Result[TrieNodeStat,HexaryError].err(TrieIsEmpty)
|
||||
else:
|
||||
check rc.isOk # ok => level > 0 and not stopped
|
||||
|
||||
|
@ -574,25 +624,24 @@ proc inspectionRunner(
|
|||
desc = SnapDbAccountsRef.init(memBase, root, peer)
|
||||
for w in accList:
|
||||
check desc.importAccounts(w.base, w.data, persistent=false).isImportOk
|
||||
let rc = desc.inspectAccountsTrie(persistent=false)
|
||||
check rc.isOk
|
||||
let stats = desc.hexaDb.hexaryInspectTrie(rootKey)
|
||||
check not stats.stopped
|
||||
let
|
||||
dangling = rc.value.dangling.mapIt(it.partialPath)
|
||||
dangling = stats.dangling.mapIt(it.partialPath)
|
||||
keys = desc.hexaDb.hexaryInspectToKeys(rootKey, dangling)
|
||||
check dangling.len == keys.len
|
||||
singleStats.add (desc.hexaDb.tab.len,rc.value)
|
||||
singleStats.add (desc.hexaDb.tab.len,stats)
|
||||
|
||||
# Verify piecemeal approach for `inspectAccountsTrie()` ...
|
||||
# Verify piecemeal approach for `hexaryInspectTrie()` ...
|
||||
var
|
||||
ctx = TrieNodeStatCtxRef()
|
||||
piecemeal: HashSet[Blob]
|
||||
while not ctx.isNil:
|
||||
let rx = desc.inspectAccountsTrie(
|
||||
resumeCtx=ctx, suspendAfter=128, persistent=false)
|
||||
check rx.isOk
|
||||
let stats = rx.get(otherwise = TrieNodeStat())
|
||||
ctx = stats.resumeCtx
|
||||
piecemeal.incl stats.dangling.mapIt(it.partialPath).toHashSet
|
||||
let stat2 = desc.hexaDb.hexaryInspectTrie(
|
||||
rootKey, resumeCtx=ctx, suspendAfter=128)
|
||||
check not stat2.stopped
|
||||
ctx = stat2.resumeCtx
|
||||
piecemeal.incl stat2.dangling.mapIt(it.partialPath).toHashSet
|
||||
# Must match earlier all-in-one result
|
||||
check dangling.len == piecemeal.len
|
||||
check dangling.toHashSet == piecemeal
|
||||
|
@ -614,27 +663,26 @@ proc inspectionRunner(
|
|||
|
||||
for w in accList:
|
||||
check desc.importAccounts(w.base,w.data, persistent=true).isImportOk
|
||||
let rc = desc.inspectAccountsTrie(persistent=true)
|
||||
check rc.isOk
|
||||
let stats = desc.getAccountFn.hexaryInspectTrie(rootKey)
|
||||
check not stats.stopped
|
||||
let
|
||||
dangling = rc.value.dangling.mapIt(it.partialPath)
|
||||
dangling = stats.dangling.mapIt(it.partialPath)
|
||||
keys = desc.hexaDb.hexaryInspectToKeys(rootKey, dangling)
|
||||
check dangling.len == keys.len
|
||||
# Must be the same as the in-memory fingerprint
|
||||
let ssn1 = singleStats[n][1].dangling.mapIt(it.partialPath)
|
||||
check ssn1.toHashSet == dangling.toHashSet
|
||||
|
||||
# Verify piecemeal approach for `inspectAccountsTrie()` ...
|
||||
# Verify piecemeal approach for `hexaryInspectTrie()` ...
|
||||
var
|
||||
ctx = TrieNodeStatCtxRef()
|
||||
piecemeal: HashSet[Blob]
|
||||
while not ctx.isNil:
|
||||
let rx = desc.inspectAccountsTrie(
|
||||
resumeCtx=ctx, suspendAfter=128, persistent=persistent)
|
||||
check rx.isOk
|
||||
let stats = rx.get(otherwise = TrieNodeStat())
|
||||
ctx = stats.resumeCtx
|
||||
piecemeal.incl stats.dangling.mapIt(it.partialPath).toHashSet
|
||||
let stat2 = desc.getAccountFn.hexaryInspectTrie(
|
||||
rootKey, resumeCtx=ctx, suspendAfter=128)
|
||||
check not stat2.stopped
|
||||
ctx = stat2.resumeCtx
|
||||
piecemeal.incl stat2.dangling.mapIt(it.partialPath).toHashSet
|
||||
# Must match earlier all-in-one result
|
||||
check dangling.len == piecemeal.len
|
||||
check dangling.toHashSet == piecemeal
|
||||
|
@ -649,14 +697,14 @@ proc inspectionRunner(
|
|||
desc = memDesc.dup(root,Peer())
|
||||
for w in accList:
|
||||
check desc.importAccounts(w.base, w.data, persistent=false).isImportOk
|
||||
let rc = desc.inspectAccountsTrie(persistent=false)
|
||||
check rc.isOk
|
||||
let stats = desc.hexaDb.hexaryInspectTrie(rootKey)
|
||||
check not stats.stopped
|
||||
let
|
||||
dangling = rc.value.dangling.mapIt(it.partialPath)
|
||||
dangling = stats.dangling.mapIt(it.partialPath)
|
||||
keys = desc.hexaDb.hexaryInspectToKeys(
|
||||
rootKey, dangling.toHashSet.toSeq)
|
||||
check dangling.len == keys.len
|
||||
accuStats.add (desc.hexaDb.tab.len,rc.value)
|
||||
accuStats.add (desc.hexaDb.tab.len, stats)
|
||||
|
||||
test &"Fingerprinting {inspectList.len} accumulated accounts lists " &
|
||||
"for persistent db":
|
||||
|
@ -672,14 +720,14 @@ proc inspectionRunner(
|
|||
desc = perDesc.dup(root,Peer())
|
||||
for w in accList:
|
||||
check desc.importAccounts(w.base, w.data, persistent).isImportOk
|
||||
let rc = desc.inspectAccountsTrie(persistent=false)
|
||||
check rc.isOk
|
||||
let stats = desc.getAccountFn.hexaryInspectTrie(rootKey)
|
||||
check not stats.stopped
|
||||
let
|
||||
dangling = rc.value.dangling.mapIt(it.partialPath)
|
||||
dangling = stats.dangling.mapIt(it.partialPath)
|
||||
keys = desc.hexaDb.hexaryInspectToKeys(
|
||||
rootKey, dangling.toHashSet.toSeq)
|
||||
check dangling.len == keys.len
|
||||
check accuStats[n][1] == rc.value
|
||||
check accuStats[n][1] == stats
|
||||
|
||||
test &"Cascaded fingerprinting {inspectList.len} accumulated accounts " &
|
||||
"lists for in-memory-db":
|
||||
|
@ -702,12 +750,13 @@ proc inspectionRunner(
|
|||
if cscStep.hasKeyOrPut(rootKey,(1,seq[Blob].default)):
|
||||
cscStep[rootKey][0].inc
|
||||
let
|
||||
r0 = desc.inspectAccountsTrie(persistent=false)
|
||||
rc = desc.inspectAccountsTrie(cscStep[rootKey][1],persistent=false)
|
||||
check rc.isOk
|
||||
stat0 = desc.hexaDb.hexaryInspectTrie(rootKey)
|
||||
stats = desc.hexaDb.hexaryInspectTrie(rootKey,cscStep[rootKey][1])
|
||||
check not stat0.stopped
|
||||
check not stats.stopped
|
||||
let
|
||||
accumulated = r0.value.dangling.mapIt(it.partialPath).toHashSet
|
||||
cascaded = rc.value.dangling.mapIt(it.partialPath).toHashSet
|
||||
accumulated = stat0.dangling.mapIt(it.partialPath).toHashSet
|
||||
cascaded = stats.dangling.mapIt(it.partialPath).toHashSet
|
||||
check accumulated == cascaded
|
||||
# Make sure that there are no trivial cases
|
||||
let trivialCases = toSeq(cscStep.values).filterIt(it[0] <= 1).len
|
||||
|
@ -734,12 +783,14 @@ proc inspectionRunner(
|
|||
if cscStep.hasKeyOrPut(rootKey,(1,seq[Blob].default)):
|
||||
cscStep[rootKey][0].inc
|
||||
let
|
||||
r0 = desc.inspectAccountsTrie(persistent=true)
|
||||
rc = desc.inspectAccountsTrie(cscStep[rootKey][1],persistent=true)
|
||||
check rc.isOk
|
||||
stat0 = desc.getAccountFn.hexaryInspectTrie(rootKey)
|
||||
stats = desc.getAccountFn.hexaryInspectTrie(rootKey,
|
||||
cscStep[rootKey][1])
|
||||
check not stat0.stopped
|
||||
check not stats.stopped
|
||||
let
|
||||
accumulated = r0.value.dangling.mapIt(it.partialPath).toHashSet
|
||||
cascaded = rc.value.dangling.mapIt(it.partialPath).toHashSet
|
||||
accumulated = stat0.dangling.mapIt(it.partialPath).toHashSet
|
||||
cascaded = stats.dangling.mapIt(it.partialPath).toHashSet
|
||||
check accumulated == cascaded
|
||||
# Make sure that there are no trivial cases
|
||||
let trivialCases = toSeq(cscStep.values).filterIt(it[0] <= 1).len
|
||||
|
|
Loading…
Reference in New Issue