Prep for full sync after snap make 6 (#1291)

* Update log ticker, using time interval rather than ticker count

why:
  Counting and logging ticker occurrences is inherently imprecise. So
  time intervals are used.

* Use separate storage tables for snap sync data

* Left boundary proof update

why:
  Was not properly implemented, yet.

* Capture pivot in peer worker (aka buddy) tasks

why:
  The pivot environment is linked to the `buddy` descriptor. While
  there is a task switch, the pivot may change. So it is passed on as
  function argument `env` rather than retrieved from the buddy at
  the start of a sub-function.

* Split queues `fetchStorage` into `fetchStorageFull` and `fetchStoragePart`

* Remove obsolete account range returned from `GetAccountRange` message

why:
  Handler returned the wrong right value of the range. This range was
  for convenience, only.

* Prioritise storage slots if the queue becomes large

why:
  Currently, accounts processing is prioritised up until all accounts
  are downloaded. The new prioritisation has two thresholds for
  + start processing storage slots with a new worker
  + stop account processing and switch to storage processing

also:
  Provide api for `SnapTodoRanges` pair of range sets in `worker_desc.nim`

* Generalise left boundary proof for accounts or storage slots.

why:
  Detailed explanation how this works is documented with
  `snapdb_accounts.importAccounts()`.

  Instead of enforcing a left boundary proof (which is still the default),
  the importer functions return a list of `holes` (aka node paths) found in
  the argument ranges of leaf nodes. This in turn is used by the book
   keeping software for data download.

* Forgot to pass on variable in function wrapper

also:
  + Start healing not before 99% accounts covered (previously 95%)
  + Logging updated/prettified
This commit is contained in:
Jordan Hrycaj 2022-11-08 18:56:04 +00:00 committed by GitHub
parent d75afd9f8a
commit e14fd4b96c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1106 additions and 578 deletions

View File

@ -19,6 +19,8 @@ type
skeletonBlockHashToNumber
skeletonBlock
skeletonTransaction
snapSyncAccount
snapSyncStorageSlot
DbKey* = object
# The first byte stores the key type. The rest are key-specific values
@ -104,6 +106,18 @@ proc skeletonTransactionKey*(u: BlockNumber): DbKey {.inline.} =
copyMem(addr result.data[1], unsafeAddr u, sizeof(u))
result.dataEndPos = uint8 sizeof(u)
proc snapSyncAccountKey*(h: openArray[byte]): DbKey {.inline.} =
doAssert(h.len == 32)
result.data[0] = byte ord(snapSyncAccount)
result.data[1 .. 32] = h
result.dataEndPos = uint8 sizeof(h)
proc snapSyncStorageSlotKey*(h: openArray[byte]): DbKey {.inline.} =
doAssert(h.len == 32)
result.data[0] = byte ord(snapSyncStorageSlot)
result.data[1 .. 32] = h
result.dataEndPos = uint8 sizeof(h)
template toOpenArray*(k: DbKey): openArray[byte] =
k.data.toOpenArray(0, int(k.dataEndPos))

View File

@ -51,9 +51,18 @@ const
## Keap on gloing in healing task up until this many nodes have been
## fetched from the network or some error contition therminates the task.
snapNewBuddyStoragesSlotsQuPrioThresh* = 5_000
## For a new worker, prioritise processing the storage slots queue over
## processing accounts if the queue has more than this many items.
snapAccountsBuddyStoragesSlotsQuPrioThresh* = 30_000
## For a running worker processing accounts, stop processing accounts
## and switch to processing the storage slots queue if the queue has
## more than this many items.
# --------------
healAccountsTrigger* = 0.95
healAccountsTrigger* = 0.99
## Apply accounts healing if the global snap download coverage factor
## exceeds this setting. The global coverage factor is derived by merging
## all account ranges retrieved for all pivot state roots (see
@ -76,7 +85,7 @@ const
# --------------
comErrorsTimeoutMax* = 4
comErrorsTimeoutMax* = 3
## Maximal number of non-resonses accepted in a row. If there are more than
## `comErrorsTimeoutMax` consecutive errors, the worker will be degraded
## as zombie.

View File

@ -9,8 +9,8 @@
# distributed except according to those terms.
import
std/[math, sequtils, hashes],
eth/common/eth_types_rlp,
std/[math, sequtils, strutils, hashes],
eth/[common, trie/nibbles],
stew/[byteutils, interval_set],
stint,
../../constants,
@ -72,15 +72,37 @@ type
AccountStorageRange* = object
## List of storage descriptors, the last `AccountSlots` storage data might
## be incomplete and tthe `proof` is needed for proving validity.
## be incomplete and the `proof` is needed for proving validity.
storages*: seq[AccountSlots] ## List of accounts and storage data
proof*: SnapStorageProof ## Boundary proofs for last entry
base*: NodeTag ## Lower limit for last entry w/proof
AccountSlots* = object
## Account storage descriptor
account*: AccountSlotsHeader
data*: seq[SnapStorage]
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc padPartialPath(partialPath: NibblesSeq; dblNibble: byte): NodeKey =
## Extend (or cut) `partialPath` nibbles sequence and generate `NodeKey`
# Pad with zeroes
var padded: NibblesSeq
let padLen = 64 - partialPath.len
if 0 <= padLen:
padded = partialPath & dblNibble.repeat(padlen div 2).initNibbleRange
if (padLen and 1) == 1:
padded = padded & @[dblNibble].initNibbleRange.slice(1)
else:
let nope = seq[byte].default.initNibbleRange
padded = partialPath.slice(0,63) & nope # nope forces re-alignment
let bytes = padded.getBytes
(addr result.ByteArray32[0]).copyMem(unsafeAddr bytes[0], bytes.len)
# ------------------------------------------------------------------------------
# Public helpers
# ------------------------------------------------------------------------------
@ -117,6 +139,11 @@ proc to*(n: SomeUnsignedInt|UInt256; T: type NodeTag): T =
## Syntactic sugar
n.u256.T
proc min*(partialPath: Blob; T: type NodeKey): T =
(hexPrefixDecode partialPath)[1].padPartialPath(0)
proc max*(partialPath: Blob; T: type NodeKey): T =
(hexPrefixDecode partialPath)[1].padPartialPath(0xff)
proc digestTo*(data: Blob; T: type NodeKey): T =
keccakHash(data).data.T
@ -273,7 +300,7 @@ proc fullFactor*(lrs: openArray[NodeTagRangeSet]): float =
proc `$`*(nodeTag: NodeTag): string =
if nodeTag == high(NodeTag):
"high(NodeTag)"
"2^256-1"
elif nodeTag == 0.u256.NodeTag:
"0"
else:
@ -317,6 +344,61 @@ proc `$`*(n: NodeSpecs): string =
result &= "ditto"
result &= ")"
proc dump*(
ranges: openArray[NodeTagRangeSet];
moan: proc(overlap: UInt256; iv: NodeTagRange) {.gcsafe.};
printRangesMax = high(int);
): string =
## Dump/anlalyse range sets
var
cache: NodeTagRangeSet
ivTotal = 0.u256
ivCarry = false
if ranges.len == 1:
cache = ranges[0]
ivTotal = cache.total
if ivTotal == 0.u256 and 0 < cache.chunks:
ivCarry = true
else:
cache = NodeTagRangeSet.init()
for ivSet in ranges:
if ivSet.total == 0.u256 and 0 < ivSet.chunks:
ivCarry = true
elif ivTotal <= high(UInt256) - ivSet.total:
ivTotal += ivSet.total
else:
ivCarry = true
for iv in ivSet.increasing():
let n = cache.merge(iv)
if n != iv.len and not moan.isNil:
moan(iv.len - n, iv)
if 0 == cache.total and 0 < cache.chunks:
result = "2^256"
if not ivCarry:
result &= ":" & $ivTotal
else:
result = $cache.total
if ivCarry:
result &= ":2^256"
elif ivTotal != cache.total:
result &= ":" & $ivTotal
result &= ":"
if cache.chunks <= printRangesMax:
result &= toSeq(cache.increasing).mapIt($it).join(",")
else:
result &= toSeq(cache.increasing).mapIt($it)[0 ..< printRangesMax].join(",")
result &= " " & $(cache.chunks - printRangesMax) & " more .."
proc dump*(
range: NodeTagRangeSet;
printRangesMax = high(int);
): string =
## Ditto
[range].dump(nil, printRangesMax)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -77,35 +77,38 @@ proc `pivot=`(buddy: SnapBuddyRef; val: BestPivotWorkerRef) =
proc init(batch: var SnapTrieRangeBatch; ctx: SnapCtxRef) =
## Returns a pair of account hash range lists with the full range of hashes
## smartly spread across the mutually disjunct interval sets.
for n in 0 ..< batch.unprocessed.len:
batch.unprocessed[n] = NodeTagRangeSet.init()
batch.unprocessed.init()
# Initialise accounts range fetch batch, the pair of `fetchAccounts[]`
# range sets.
if ctx.data.coveredAccounts.total == 0 and
ctx.data.coveredAccounts.chunks == 1:
# All (i.e. 100%) of accounts hashes are covered by completed range fetch
# processes for all pivot environments. Do a random split distributing the
# full accounts hash range across the pair of range sats.
var nodeKey: NodeKey
ctx.data.rng[].generate(nodeKey.ByteArray32)
let partition = nodeKey.to(NodeTag)
discard batch.unprocessed[0].merge(partition, high(NodeTag))
if low(NodeTag) < partition:
discard batch.unprocessed[1].merge(low(NodeTag), partition - 1.u256)
if ctx.data.coveredAccounts.isFull:
# All of accounts hashes are covered by completed range fetch processes
# for all pivot environments. Do a random split distributing the full
# accounts hash range across the pair of range sets.
for _ in 0 .. 5:
var nodeKey: NodeKey
ctx.data.rng[].generate(nodeKey.ByteArray32)
let top = nodeKey.to(NodeTag)
if low(NodeTag) < top and top < high(NodeTag):
# Move covered account ranges (aka intervals) to the second set.
batch.unprocessed.merge NodeTagRange.new(low(NodeTag), top)
break
# Otherwise there is a full single range in `unprocessed[0]`
else:
# Not all account hashes are covered, yet. So keep the uncovered
# account hashes in the first range set, and the other account hashes
# in the second range set.
# Pre-filled with the first range set with largest possible interval
discard batch.unprocessed[0].merge(low(NodeTag),high(NodeTag))
# Move covered account ranges (aka intervals) to the second set.
for iv in ctx.data.coveredAccounts.increasing:
discard batch.unprocessed[0].reduce(iv)
discard batch.unprocessed[1].merge(iv)
# Move covered account ranges (aka intervals) to the second set.
batch.unprocessed.merge(iv)
if batch.unprocessed[0].isEmpty:
doAssert batch.unprocessed[1].isFull
elif batch.unprocessed[1].isEmpty:
doAssert batch.unprocessed[0].isFull
else:
doAssert((batch.unprocessed[0].total - 1) +
batch.unprocessed[1].total == high(UInt256))
proc appendPivotEnv(buddy: SnapBuddyRef; header: BlockHeader) =
@ -198,7 +201,8 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater =
pivotBlock = if env.isNil: none(BlockNumber)
else: some(env.stateHeader.blockNumber)
stoQuLen = if env.isNil: none(uint64)
else: some(env.fetchStorage.len.uint64)
else: some(env.fetchStorageFull.len.uint64 +
env.fetchStoragePart.len.uint64)
accCoverage = ctx.data.coveredAccounts.fullFactor
accFill = meanStdDev(uSum, uSqSum, count)
@ -329,7 +333,8 @@ proc runPool*(buddy: SnapBuddyRef, last: bool) =
env.accountsState = HealerDone
# Check whether storage slots are complete
if env.fetchStorage.len == 0:
if env.fetchStorageFull.len == 0 and
env.fetchStoragePart.len == 0:
env.storageDone = true
if extraTraceMessages:
@ -389,12 +394,19 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} =
block:
let rc = ctx.data.pivotTable.beforeLastValue
if rc.isOk:
let nFetchStorage = rc.value.fetchStorage.len
let nFetchStorage =
rc.value.fetchStorageFull.len + rc.value.fetchStoragePart.len
if 0 < nFetchStorage:
trace "Cleaning up previous pivot", peer, pivot, nFetchStorage
rc.value.fetchStorage.clear()
rc.value.fetchAccounts.checkNodes.setLen(0)
rc.value.fetchAccounts.missingNodes.setLen(0)
rc.value.fetchStorageFull.clear()
rc.value.fetchStoragePart.clear()
rc.value.fetchAccounts.checkNodes.setLen(0)
rc.value.fetchAccounts.missingNodes.setLen(0)
# Clean up storage slots queue first it it becomes too large
let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
if snapNewBuddyStoragesSlotsQuPrioThresh < nStoQu:
runAsync buddy.rangeFetchStorageSlots()
if env.accountsState != HealerDone:
runAsync buddy.rangeFetchAccounts()
@ -412,10 +424,6 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} =
runAsync buddy.healStorageSlots()
# Debugging log: analyse pivot against database
discard buddy.checkAccountsListOk(env)
discard buddy.checkStorageSlotsTrieIsComplete(env)
# Check whether there are more accounts to fetch.
#
# Note that some other process might have temporarily borrowed from the
@ -423,11 +431,20 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} =
# if only a single buddy is active. S be it.
if env.fetchAccounts.unprocessed.isEmpty():
# Debugging log: analyse pivot against database
warn "Analysing accounts database -- might be slow", peer, pivot
discard buddy.checkAccountsListOk(env)
# Check whether pivot download is complete.
if env.fetchStorage.len == 0:
if env.fetchStorageFull.len == 0 and
env.fetchStoragePart.len == 0:
trace "Running pool mode for verifying completeness", peer, pivot
buddy.ctx.poolMode = true
# Debugging log: analyse pivot against database
warn "Analysing storage slots database -- might be slow", peer, pivot
discard buddy.checkStorageSlotsTrieIsComplete(env)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -28,11 +28,9 @@ type
ComEmptyAccountsArguments
ComEmptyPartialRange
ComEmptyRequestArguments
ComMissingProof
ComNetworkProblem
ComNoAccountsForStateRoot
ComNoByteCodesAvailable
ComNoDataForProof
#ComNoHeaderAvailable -- unused, see get_block_header.nim
ComNoStorageForAccounts
ComNoTrieNodesAvailable
@ -91,8 +89,7 @@ proc stopAfterSeriousComError*(
# Otherwise try again some time later.
await sleepAsync(comErrorsNoDataSleepMSecs.milliseconds)
of ComMissingProof,
ComAccountsMinTooSmall,
of ComAccountsMinTooSmall,
ComAccountsMaxTooLarge,
ComTooManyByteCodes,
ComTooManyHeaders,
@ -105,7 +102,6 @@ proc stopAfterSeriousComError*(
of ComEmptyAccountsArguments,
ComEmptyRequestArguments,
ComEmptyPartialRange,
ComNoDataForProof,
ComNothingSerious:
discard

View File

@ -28,7 +28,6 @@ logScope:
type
GetAccountRange* = object
consumed*: NodeTagRange ## Real accounts interval covered
data*: PackedAccountRange ## Re-packed reply data
withStorage*: seq[AccountSlotsHeader] ## Accounts with non-idle storage root
@ -79,7 +78,6 @@ proc getAccountRange*(
return err(ComResponseTimeout)
let snAccRange = rc.value.get
GetAccountRange(
consumed: iv,
data: PackedAccountRange(
proof: snAccRange.proof,
accounts: snAccRange.accounts
@ -114,31 +112,15 @@ proc getAccountRange*(
nAccounts, nProof, accRange="n/a", reqRange=iv
return err(ComNoAccountsForStateRoot)
# So there is no data, otherwise an account beyond the interval end
# `iv.maxPt` would have been returned.
dd.consumed = NodeTagRange.new(iv.minPt, high(NodeTag))
trace trSnapRecvReceived & "terminal AccountRange", peer, pivot,
nAccounts, nProof, accRange=dd.consumed, reqRange=iv
# So there is no data and a proof.
trace trSnapRecvReceived & "terminal AccountRange", peer, pivot, nAccounts,
nProof, accRange=NodeTagRange.new(iv.minPt, high(NodeTag)), reqRange=iv
return ok(dd)
let (accMinPt, accMaxPt) = (
dd.data.accounts[0].accKey.to(NodeTag),
dd.data.accounts[^1].accKey.to(NodeTag))
if nProof == 0:
# github.com/ethereum/devp2p/blob/master/caps/snap.md#accountrange-0x01
# Notes:
# * If the account range is the entire state (requested origin was 0x00..0
# and all accounts fit into the response), no proofs should be sent along
# the response. This is unlikely for accounts, but since it's a common
# situation for storage slots, this clause keeps the behavior the same
# across both.
if 0.to(NodeTag) < iv.minPt:
trace trSnapRecvProtocolViolation & "proof-less AccountRange", peer,
pivot, nAccounts, nProof, accRange=NodeTagRange.new(iv.minPt, accMaxPt),
reqRange=iv
return err(ComMissingProof)
if accMinPt < iv.minPt:
# Not allowed
trace trSnapRecvProtocolViolation & "min too small in AccountRange", peer,
@ -157,15 +139,14 @@ proc getAccountRange*(
# Geth always seems to allow the last account to be larger than the
# limit (seen with Geth/v1.10.18-unstable-4b309c70-20220517.)
if iv.maxPt < dd.data.accounts[^2].accKey.to(NodeTag):
# The segcond largest should not excceed the top one requested.
# The second largest should not excceed the top one requested.
trace trSnapRecvProtocolViolation & "AccountRange top exceeded", peer,
pivot, nAccounts, nProof,
accRange=NodeTagRange.new(iv.minPt, accMaxPt), reqRange=iv
return err(ComAccountsMaxTooLarge)
dd.consumed = NodeTagRange.new(iv.minPt, max(iv.maxPt,accMaxPt))
trace trSnapRecvReceived & "AccountRange", peer, pivot,
nAccounts, nProof, accRange=dd.consumed, reqRange=iv
trace trSnapRecvReceived & "AccountRange", peer, pivot, nAccounts, nProof,
accRange=NodeTagRange.new(accMinPt, accMaxPt), reqRange=iv
return ok(dd)

View File

@ -104,19 +104,21 @@ proc getStorageRanges*(
trace trSnapSendSending & "GetStorageRanges", peer, pivot,
nAccounts, bytesLimit=snapRequestBytesLimit
let snStoRanges = block:
let rc = await buddy.getStorageRangesReq(stateRoot,
accounts.mapIt(it.accKey.to(Hash256)), accounts[0].subRange, pivot)
if rc.isErr:
return err(ComNetworkProblem)
if rc.value.isNone:
trace trSnapRecvTimeoutWaiting & "for StorageRanges", peer, pivot,
nAccounts
return err(ComResponseTimeout)
if nAccounts < rc.value.get.slotLists.len:
# Ooops, makes no sense
return err(ComTooManyStorageSlots)
rc.value.get
let
iv = accounts[0].subRange
snStoRanges = block:
let rc = await buddy.getStorageRangesReq(stateRoot,
accounts.mapIt(it.accKey.to(Hash256)), iv, pivot)
if rc.isErr:
return err(ComNetworkProblem)
if rc.value.isNone:
trace trSnapRecvTimeoutWaiting & "for StorageRanges", peer, pivot,
nAccounts
return err(ComResponseTimeout)
if nAccounts < rc.value.get.slotLists.len:
# Ooops, makes no sense
return err(ComTooManyStorageSlots)
rc.value.get
let
nSlotLists = snStoRanges.slotLists.len
@ -138,6 +140,10 @@ proc getStorageRanges*(
# Assemble return structure for given peer response
var dd = GetStorageRanges(data: AccountStorageRange(proof: snStoRanges.proof))
# Set the left proof boundary (if any)
if 0 < nProof and iv.isSome:
dd.data.base = iv.unsafeGet.minPt
# Filter remaining `slots` responses:
# * Accounts for empty ones go back to the `leftOver` list.
for n in 0 ..< nSlotLists:
@ -154,23 +160,23 @@ proc getStorageRanges*(
# assigning empty slice is ok
dd.leftOver = dd.leftOver & accounts[nSlotLists ..< nAccounts]
# Ok, we have a proof now. What was it to be proved?
elif snStoRanges.slotLists[^1].len == 0:
return err(ComNoDataForProof) # Now way to prove an empty node set
else:
# If the storage data for the last account comes with a proof, then the data
# set is incomplete. So record the missing part on the `dd.leftOver` list.
let
reqTop = if accounts[0].subRange.isNone: high(NodeTag)
else: accounts[0].subRange.unsafeGet.maxPt
respTop = dd.data.storages[^1].data[^1].slotHash.to(NodeTag)
if respTop < reqTop:
dd.leftOver.add AccountSlotsHeader(
subRange: some(NodeTagRange.new(respTop + 1.u256, reqTop)),
accKey: accounts[nSlotLists-1].accKey,
storageRoot: accounts[nSlotLists-1].storageRoot)
# assigning empty slice isa ok
# Ok, we have a proof now
if 0 < snStoRanges.slotLists[^1].len:
# If the storage data for the last account comes with a proof, then the
# data set is incomplete. So record the missing part on the `dd.leftOver`
# list.
let
reqTop = if accounts[0].subRange.isNone: high(NodeTag)
else: accounts[0].subRange.unsafeGet.maxPt
respTop = dd.data.storages[^1].data[^1].slotHash.to(NodeTag)
if respTop < reqTop:
dd.leftOver.add AccountSlotsHeader(
subRange: some(NodeTagRange.new(respTop + 1.u256, reqTop)),
accKey: accounts[nSlotLists-1].accKey,
storageRoot: accounts[nSlotLists-1].storageRoot)
# Do thew rest (assigning empty slice is ok)
dd.leftOver = dd.leftOver & accounts[nSlotLists ..< nAccounts]
trace trSnapRecvReceived & "StorageRanges", peer, pivot, nAccounts,

View File

@ -155,6 +155,7 @@ type
## Return code for single node operations
slot*: Option[int] ## May refer to indexed argument slots
kind*: Option[NodeKind] ## Node type (if any)
dangling*: seq[NodeSpecs] ## Missing inner sub-tries
error*: HexaryDbError ## Error code, or `NothingSerious`
const

View File

@ -13,9 +13,10 @@ type
NothingSerious = 0
AccountNotFound
AccountSmallerThanBase
AccountsNotSrictlyIncreasing
AccountRangesOverlap
LowerBoundAfterFirstEntry
LowerBoundProofError
NodeNotFound
RlpEncoding
SlotsNotSrictlyIncreasing

View File

@ -29,7 +29,7 @@ proc pp(w: Blob; db: HexaryTreeDbRef): string =
# Private helpers
# ------------------------------------------------------------------------------
proc getNibblesImpl(path: XPath; start = 0): NibblesSeq =
proc getNibblesImpl(path: XPath|RPath; start = 0): NibblesSeq =
## Re-build the key path
for n in start ..< path.path.len:
let it = path.path[n]
@ -63,16 +63,26 @@ proc toExtensionNode(
{.gcsafe, raises: [Defect,RlpError]} =
XNodeObj(kind: Extension, ePfx: pSegm, eLink: rlp.listElem(1).toBytes)
# not now ...
when false:
proc `[]`(path: XPath; n: int): XPathStep =
path.path[n]
proc `[]`(path: XPath; s: Slice[int]): XPath =
XPath(path: path.path[s.a .. s.b], tail: path.getNibbles(s.b+1))
proc `<=`(a, b: NibblesSeq): bool =
## Compare nibbles, different lengths are padded to the right with zeros
let abMin = min(a.len, b.len)
for n in 0 ..< abMin:
if a[n] < b[n]:
return true
if b[n] < a[n]:
return false
# otherwise a[n] == b[n]
proc len(path: XPath): int =
path.path.len
# Assuming zero for missing entries
if b.len < a.len:
for n in abMin + 1 ..< a.len:
if 0 < a[n]:
return false
true
proc `<`(a, b: NibblesSeq): bool =
not (b <= a)
# ------------------------------------------------------------------------------
# Private functions
@ -173,6 +183,48 @@ proc pathExtend(
# notreached
proc completeLeast(
path: RPath;
key: RepairKey;
db: HexaryTreeDbRef;
pathLenMax = 64;
): RPath
{.gcsafe, raises: [Defect,KeyError].} =
## Extend path using least nodes without recursion.
result.path = path.path
if db.tab.hasKey(key):
var
key = key
node = db.tab[key]
while result.path.len < pathLenMax:
case node.kind:
of Leaf:
result.path.add RPathStep(key: key, node: node, nibble: -1)
return # done
of Extension:
block useExtensionLink:
let newKey = node.eLink
if not newkey.isZero and db.tab.hasKey(newKey):
result.path.add RPathStep(key: key, node: node, nibble: -1)
key = newKey
node = db.tab[key]
break useExtensionLink
return # Oops, no way
of Branch:
block findBranchLink:
for inx in 0 .. 15:
let newKey = node.bLink[inx]
if not newkey.isZero and db.tab.hasKey(newKey):
result.path.add RPathStep(key: key, node: node, nibble: inx.int8)
key = newKey
node = db.tab[key]
break findBranchLink
return # Oops, no way
proc pathLeast(
path: XPath;
key: Blob;
@ -357,7 +409,7 @@ proc pathMost(
# Public helpers
# ------------------------------------------------------------------------------
proc getNibbles*(path: XPath; start = 0): NibblesSeq =
proc getNibbles*(path: XPath|RPath; start = 0): NibblesSeq =
## Re-build the key path
path.getNibblesImpl(start)
@ -434,6 +486,117 @@ proc hexaryPath*(
## Variant of `hexaryPath`.
XPath(tail: partialPath).pathExtend(root.to(Blob), getFn)
proc right*(
path: RPath;
db: HexaryTreeDbRef;
): RPath
{.gcsafe, raises: [Defect,KeyError]} =
## Extends the maximally extended argument nodes `path` to the right (with
## path value not decreasing). This is similar to `next()`, only that the
## algorithm does not backtrack if there are dangling links in between.
##
## This code is intended be used for verifying a left-bound proof.
# Some easy cases
if path.path.len == 0:
return RPath() # error
if path.path[^1].node.kind == Leaf:
return path
var rPath = path
while 0 < rPath.path.len:
let top = rPath.path[^1]
if top.node.kind != Branch or
top.nibble < 0 or
rPath.tail.len == 0:
return RPath() # error
let topLink = top.node.bLink[top.nibble]
if topLink.isZero or not db.tab.hasKey(topLink):
return RPath() # error
let nextNibble = rPath.tail[0].int8
if nextNibble < 15:
let
nextNode = db.tab[topLink]
rPathLen = rPath.path.len # in case of backtracking
case nextNode.kind
of Leaf:
if rPath.tail <= nextNode.lPfx:
return rPath.completeLeast(topLink, db)
of Extension:
if rPath.tail <= nextNode.ePfx:
return rPath.completeLeast(topLink, db)
of Branch:
# Step down and complete with a branch link on the child node
rPath.path = rPath.path & RPathStep(
key: topLink,
node: nextNode,
nibble: nextNibble)
# Find the next item to the right of the new top entry
let step = rPath.path[^1]
for inx in (step.nibble + 1) .. 15:
let link = step.node.bLink[inx]
if not link.isZero:
rPath.path[^1].nibble = inx.int8
return rPath.completeLeast(link, db)
# Restore `rPath` and backtrack
rPath.path.setLen(rPathLen)
# Pop `Branch` node on top and append nibble to `tail`
rPath.tail = @[top.nibble.byte].initNibbleRange.slice(1) & rPath.tail
rPath.path.setLen(rPath.path.len - 1)
# Pathological case: nfffff.. for n < f
var step = path.path[0]
for inx in (step.nibble + 1) .. 15:
let link = step.node.bLink[inx]
if not link.isZero:
step.nibble = inx.int8
rPath.path = @[step]
return rPath.completeLeast(link, db)
RPath() # error
proc rightStop*(
path: RPath;
db: HexaryTreeDbRef;
): bool
{.gcsafe, raises: [Defect,KeyError]} =
## Returns `true` if the maximally extended argument nodes `path` is the
## rightmost on the hexary trie database. It verifies that there is no more
## leaf entry to the right of the argument `path`.
##
## This code is intended be used for verifying a left-bound proof.
if 0 < path.path.len and 0 < path.tail.len:
let top = path.path[^1]
if top.node.kind == Branch and 0 <= top.nibble:
let topLink = top.node.bLink[top.nibble]
if not topLink.isZero and db.tab.hasKey(topLink):
let
nextNibble = path.tail[0]
nextNode = db.tab[topLink]
case nextNode.kind
of Leaf:
return nextNode.lPfx < path.tail
of Extension:
return nextNode.ePfx < path.tail
of Branch:
# Step down and verify that there is no branch link
for inx in nextNibble .. 15:
if not nextNode.bLink[inx].isZero:
return false
return true
proc next*(
path: XPath;
getFn: HexaryGetFn;

View File

@ -9,7 +9,7 @@
# except according to those terms.
import
std/[algorithm, sequtils, strutils, tables],
std/[algorithm, sequtils, tables],
chronicles,
eth/[common, p2p, rlp, trie/nibbles],
stew/byteutils,
@ -97,23 +97,12 @@ proc collectAccounts(
## can be used for validating the argument account data.
var rcAcc: seq[RLeafSpecs]
if acc.len != 0:
if 0 < acc.len:
let pathTag0 = acc[0].accKey.to(NodeTag)
# Verify lower bound
if pathTag0 < base:
let error = HexaryDbError.AccountSmallerThanBase
trace "collectAccounts()", peer, base, accounts=acc.len, error
return err(error)
# Add base for the records (no payload). Note that the assumption
# holds: `rcAcc[^1].tag <= base`
if base < pathTag0:
rcAcc.add RLeafSpecs(pathTag: base)
# Check for the case that accounts are appended
elif 0 < rcAcc.len and pathTag0 <= rcAcc[^1].pathTag:
let error = HexaryDbError.AccountsNotSrictlyIncreasing
let error = LowerBoundAfterFirstEntry
trace "collectAccounts()", peer, base, accounts=acc.len, error
return err(error)
@ -179,17 +168,56 @@ proc importAccounts*(
base: NodeTag; ## before or at first account entry in `data`
data: PackedAccountRange; ## re-packed `snap/1 ` reply data
persistent = false; ## store data on disk
): Result[void,HexaryDbError] =
noBaseBoundCheck = false; ## Ignore left boundary proof check if `true`
): Result[seq[NodeSpecs],HexaryDbError] =
## Validate and import accounts (using proofs as received with the snap
## message `AccountRange`). This function accumulates data in a memory table
## which can be written to disk with the argument `persistent` set `true`. The
## memory table is held in the descriptor argument`ps`.
##
## On success, the function returns a list of dangling node links from the
## argument `proof` list of nodes after the populating with accounts. The
## following example may illustrate the case:
##
## Assume an accounts hexary trie
## ::
## | 0 1 2 3 4 5 6 7 8 9 a b c d e f -- nibble positions
## | root -> (a, .. b, .. c, .. d, .. ,) -- root branch node
## | | | | |
## | ... v v v
## | (x,X) (y,Y) (z,Z)
##
## with `a`,`b`,`c`,`d` node hashes, `x`,`y`,`z` partial paths and account
## hashes `3&x`,`7&y`,`b&z` for account values `X`,`Y`,`Z`. All other
## links in the *root branch node* are assumed nil.
##
## The passing to this function
## * base: `3&x`
## * data.proof: *root branch node*
## * data.accounts: `(3&x,X)`, `(7&y,Y)`, `(b&z,Z)`
## a partial tree can be fully constructed and boundary proofs succeed.
## The return value will be an empty list.
##
## Leaving out `(7&y,Y)` the boundary proofs still succeed but the
## return value will be @[`(7&y,c)`].
##
## The left boundary proof might be omitted by passing `true` for the
## `noBaseBoundCheck` argument. In this case, the boundary check must be
## performed on the return code as
## * if `data.accounts` is empty, the return value must be an empty list
## * otherwise, all type `NodeSpecs` items `w` of the return code must
## satisfy
## ::
## let leastAccountPath = data.accounts[0].accKey.to(NodeTag)
## leastAccountPath <= w.partialPath.max(NodeKey).to(NodeTag)
##
## Note that the `peer` argument is for log messages, only.
var accounts: seq[RLeafSpecs]
var
accounts: seq[RLeafSpecs]
dangling: seq[NodeSpecs]
try:
if 0 < data.proof.len:
let rc = ps.mergeProofs(ps.peer, ps.root, data.proof)
let rc = ps.mergeProofs(ps.peer, data.proof)
if rc.isErr:
return err(rc.error)
block:
@ -197,16 +225,50 @@ proc importAccounts*(
if rc.isErr:
return err(rc.error)
accounts = rc.value
block:
if 0 < accounts.len:
var innerSubTrie: seq[NodeSpecs]
if 0 < data.proof.len:
# Inspect trie for dangling nodes. This is not a big deal here as the
# proof data is typically small.
let
proofStats = ps.hexaDb.hexaryInspectTrie(ps.root, @[])
topTag = accounts[^1].pathTag
for w in proofStats.dangling:
if base <= w.partialPath.max(NodeKey).to(NodeTag) and
w.partialPath.min(NodeKey).to(NodeTag) <= topTag:
# Extract dangling links which are inside the accounts range
innerSubTrie.add w
# Build partial hexary trie
let rc = ps.hexaDb.hexaryInterpolate(
ps.root, accounts, bootstrap = (data.proof.len == 0))
if rc.isErr:
return err(rc.error)
if persistent and 0 < ps.hexaDb.tab.len:
let rc = ps.hexaDb.persistentAccounts(ps)
if rc.isErr:
return err(rc.error)
# Collect missing inner sub-trees in the reconstructed partial hexary
# trie (if any).
let bottomTag = accounts[0].pathTag
for w in innerSubTrie:
if ps.hexaDb.tab.hasKey(w.nodeKey.to(RepairKey)):
continue
# Verify that `base` is to the left of the first account and there is
# nothing in between. Without proof, there can only be a complete
# set/list of accounts. There must be a proof for an empty list.
if not noBaseBoundCheck and
w.partialPath.max(NodeKey).to(NodeTag) < bottomTag:
return err(LowerBoundProofError)
# Otherwise register left over entry
dangling.add w
if persistent:
let rc = ps.hexaDb.persistentAccounts(ps)
if rc.isErr:
return err(rc.error)
elif data.proof.len == 0:
# There must be a proof for an empty argument list.
return err(LowerBoundProofError)
except RlpError:
return err(RlpEncoding)
@ -217,21 +279,24 @@ proc importAccounts*(
return err(OSErrorException)
#when extraTraceMessages:
# trace "Accounts imported", peer=ps.peer,
# root=ps.root.ByteArray32.toHex,
# proof=data.proof.len, base, accounts=data.accounts.len
ok()
# trace "Accounts imported", peer=ps.peer, root=ps.root.ByteArray32.toHex,
# proof=data.proof.len, base, accounts=data.accounts.len,
# top=accounts[^1].pathTag, danglingLen=dangling.len
ok(dangling)
proc importAccounts*(
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer, ## for log messages
root: Hash256; ## state root
base: NodeTag; ## before or at first account entry in `data`
data: PackedAccountRange; ## re-packed `snap/1 ` reply data
): Result[void,HexaryDbError] =
peer: Peer; ## For log messages
root: Hash256; ## State root
base: NodeTag; ## Before or at first account entry in `data`
data: PackedAccountRange; ## Re-packed `snap/1 ` reply data
noBaseBoundCheck = false; ## Ignore left bound proof check if `true`
): Result[seq[NodeSpecs],HexaryDbError] =
## Variant of `importAccounts()`
SnapDbAccountsRef.init(
pv, root, peer).importAccounts(base, data, persistent=true)
pv, root, peer).importAccounts(
base, data, persistent=true, noBaseBoundCheck)
proc importRawAccountsNodes*(

View File

@ -61,7 +61,10 @@ proc storageSlotsCtx(
): string =
let
ctx = buddy.ctx
rc = env.fetchStorage.eq(storageRoot)
rc = block:
let rcx = env.fetchStorageFull.eq(storageRoot)
if rcx.isOk: rcx
else: env.fetchStoragePart.eq(storageRoot)
if rc.isErr:
return "n/a"
let
@ -221,8 +224,8 @@ proc checkStorageSlotsTrieIsComplete*(
return rc.value
when extraTraceMessages:
debug logTxt "atorage slots health check failed", peer,
nStoQueue=env.fetchStorage.len,
let nStoQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
debug logTxt "atorage slots health check failed", peer, nStoQueue,
ctx=buddy.storageSlotsCtx(storageRoot, env), error=rc.error
proc checkStorageSlotsTrieIsComplete*(
@ -240,8 +243,9 @@ proc checkStorageSlotsTrieIsComplete*(
for (accKey,accData,error) in buddy.accountsWalk(env):
if error != NothingSerious:
error logTxt "atorage slots accounts loop stopped", peer,
nStoQueue=env.fetchStorage.len, accounts, incomplete, complete, error
let nStoQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
error logTxt "atorage slots accounts loop stopped", peer, nStoQueue,
accounts, incomplete, complete, error
return false
accounts.inc
@ -256,8 +260,9 @@ proc checkStorageSlotsTrieIsComplete*(
incomplete.inc
when extraTraceMessages:
let nStoQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
debug logTxt "storage slots report", peer, ctx=buddy.accountsCtx(env),
nStoQueue=env.fetchStorage.len, accounts, incomplete, complete
nStoQueue, accounts, incomplete, complete
0 < accounts and incomplete == 0

View File

@ -11,7 +11,7 @@
import
std/[sequtils, tables],
chronicles,
eth/[common, p2p, trie/db],
eth/[common, p2p, trie/db, trie/nibbles],
../../../../db/[select_backend, storage_types],
../../range_desc,
"."/[hexary_desc, hexary_error, hexary_import, hexary_paths, rocky_bulk_load]
@ -22,6 +22,8 @@ logScope:
topics = "snap-db"
const
extraTraceMessages = false or true
RockyBulkCache* = "accounts.sst"
## Name of temporary file to accomodate SST records for `rocksdb`
@ -34,7 +36,7 @@ type
SnapDbBaseRef* = ref object of RootRef
## Session descriptor
xDb: HexaryTreeDbRef ## Hexary database
xDb: HexaryTreeDbRef ## Hexary database, memory based
base: SnapDbRef ## Back reference to common parameters
root*: NodeKey ## Session DB root node key
@ -70,6 +72,9 @@ proc toKey(a: NodeKey; ps: SnapDbBaseRef): uint =
proc toKey(a: NodeTag; ps: SnapDbBaseRef): uint =
a.to(NodeKey).toKey(ps)
proc ppImpl(a: RepairKey; pv: SnapDbRef): string =
if a.isZero: "ø" else:"$" & $a.toKey(pv)
# ------------------------------------------------------------------------------
# Debugging, pretty printing
# ------------------------------------------------------------------------------
@ -122,7 +127,7 @@ proc init*(
): T =
## Constructor for inner hexary trie database
let xDb = HexaryTreeDbRef()
xDb.keyPp = proc(key: RepairKey): string = key.pp(xDb) # will go away
xDb.keyPp = proc(key: RepairKey): string = key.ppImpl(pv) # will go away
return xDb
proc init*(
@ -138,7 +143,7 @@ proc init*(
ps: SnapDbBaseRef;
pv: SnapDbRef;
root: NodeKey;
peer: Peer = nil) =
) =
## Session base constructor
ps.base = pv
ps.root = root
@ -148,7 +153,7 @@ proc init*(
T: type SnapDbBaseRef;
ps: SnapDbBaseRef;
root: NodeKey;
peer: Peer = nil): T =
): T =
## Variant of session base constructor
new result
result.init(ps.base, root)
@ -177,11 +182,11 @@ proc kvDb*(pv: SnapDbRef): TrieDatabaseRef =
# Public functions, select sub-tables for persistent storage
# ------------------------------------------------------------------------------
proc toAccountsKey*(a: NodeKey): ByteArray32 =
a.ByteArray32
proc toAccountsKey*(a: NodeKey): ByteArray33 =
a.ByteArray32.snapSyncAccountKey.data
proc toStorageSlotsKey*(a: NodeKey): ByteArray33 =
a.ByteArray32.slotHashToSlotKey.data
a.ByteArray32.snapSyncStorageSlotKey.data
template toOpenArray*(k: ByteArray32): openArray[byte] =
k.toOpenArray(0, 31)
@ -204,7 +209,6 @@ proc dbBackendRocksDb*(ps: SnapDbBaseRef): bool =
proc mergeProofs*(
ps: SnapDbBaseRef; ## Session database
peer: Peer; ## For log messages
root: NodeKey; ## Root for checking nodes
proof: seq[Blob]; ## Node records
freeStandingOk = false; ## Remove freestanding nodes
): Result[void,HexaryDbError]
@ -216,7 +220,7 @@ proc mergeProofs*(
db = ps.hexaDb
var
nodes: HashSet[RepairKey]
refs = @[root.to(RepairKey)].toHashSet
refs = @[ps.root.to(RepairKey)].toHashSet
for n,rlpRec in proof:
let report = db.hexaryImport(rlpRec, nodes, refs)
@ -240,6 +244,52 @@ proc mergeProofs*(
ok()
proc verifyLowerBound*(
ps: SnapDbBaseRef; ## Database session descriptor
peer: Peer; ## For log messages
base: NodeTag; ## Before or at first account entry in `data`
first: NodeTag; ## First account key
): Result[void,HexaryDbError]
{.gcsafe, raises: [Defect, KeyError].} =
## Verify that `base` is to the left of the first leaf entry and there is
## nothing in between.
proc convertTo(data: openArray[byte]; T: type Hash256): T =
discard result.data.NodeKey.init(data) # size error => zero
let
root = ps.root.to(RepairKey)
base = base.to(NodeKey)
next = base.hexaryPath(root, ps.hexaDb).right(ps.hexaDb).getNibbles
if next.len == 64:
if first == next.getBytes.convertTo(Hash256).to(NodeTag):
return ok()
let error = LowerBoundProofError
when extraTraceMessages:
trace "verifyLowerBound()", peer, base=base.pp,
first=first.to(NodeKey).pp, error
err(error)
proc verifyNoMoreRight*(
ps: SnapDbBaseRef; ## Database session descriptor
peer: Peer; ## For log messages
base: NodeTag; ## Before or at first account entry in `data`
): Result[void,HexaryDbError]
{.gcsafe, raises: [Defect, KeyError].} =
## Verify that there is are no more leaf entries to the right of and
## including `base`.
let
root = ps.root.to(RepairKey)
base = base.to(NodeKey)
if base.hexaryPath(root, ps.hexaDb).rightStop(ps.hexaDb):
return ok()
let error = LowerBoundProofError
when extraTraceMessages:
trace "verifyLeftmostBound()", peer, base=base.pp, error
err(error)
# ------------------------------------------------------------------------------
# Debugging (and playing with the hexary database)
# ------------------------------------------------------------------------------

View File

@ -40,7 +40,7 @@ proc convertTo(key: RepairKey; T: type NodeTag): T =
## Might be lossy, check before use
UInt256.fromBytesBE(key.ByteArray33[1 .. 32]).T
proc toAccountsKey(a: RepairKey): ByteArray32 =
proc toAccountsKey(a: RepairKey): ByteArray33 =
a.convertTo(NodeKey).toAccountsKey
proc toStorageSlotsKey(a: RepairKey): ByteArray33 =

View File

@ -94,7 +94,8 @@ proc persistentStorageSlots(
proc collectStorageSlots(
peer: Peer;
peer: Peer; ## for log messages
base: NodeTag; ## before or at first account entry in `data`
slotLists: seq[SnapStorage];
): Result[seq[RLeafSpecs],HexaryDbError]
{.gcsafe, raises: [Defect, RlpError].} =
@ -102,6 +103,15 @@ proc collectStorageSlots(
var rcSlots: seq[RLeafSpecs]
if slotLists.len != 0:
let pathTag0 = slotLists[0].slotHash.to(NodeTag)
# Verify lower bound
if pathTag0 < base:
let error = LowerBoundAfterFirstEntry
trace "collectStorageSlots()", peer, base, item=0,
nSlotLists=slotLists.len, error
return err(error)
# Add initial account
rcSlots.add RLeafSpecs(
pathTag: slotLists[0].slotHash.to(NodeTag),
@ -126,38 +136,75 @@ proc collectStorageSlots(
proc importStorageSlots(
ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor
base: NodeTag; ## before or at first account entry in `data`
data: AccountSlots; ## Account storage descriptor
proof: SnapStorageProof; ## Account storage proof
): Result[void,HexaryDbError]
proof: SnapStorageProof; ## Storage slots proof data
noBaseBoundCheck = false; ## Ignore left boundary proof check if `true`
): Result[seq[NodeSpecs],HexaryDbError]
{.gcsafe, raises: [Defect,RlpError,KeyError].} =
## Preocess storage slots for a particular storage root
## Process storage slots for a particular storage root. See `importAccounts()`
## for comments on the return value.
let
root = data.account.storageRoot.to(NodeKey)
tmpDb = SnapDbBaseRef.init(ps, ps.root, ps.peer)
tmpDb = SnapDbBaseRef.init(ps, data.account.storageRoot.to(NodeKey))
var
slots: seq[RLeafSpecs]
dangling: seq[NodeSpecs]
if 0 < proof.len:
let rc = tmpDb.mergeProofs(ps.peer, root, proof)
let rc = tmpDb.mergeProofs(ps.peer, proof)
if rc.isErr:
return err(rc.error)
block:
let rc = ps.peer.collectStorageSlots(data.data)
let rc = ps.peer.collectStorageSlots(base, data.data)
if rc.isErr:
return err(rc.error)
slots = rc.value
block:
if 0 < slots.len:
var innerSubTrie: seq[NodeSpecs]
if 0 < proof.len:
# Inspect trie for dangling nodes. This is not a big deal here as the
# proof data is typically small.
let
proofStats = ps.hexaDb.hexaryInspectTrie(ps.root, @[])
topTag = slots[^1].pathTag
for w in proofStats.dangling:
if base <= w.partialPath.max(NodeKey).to(NodeTag) and
w.partialPath.min(NodeKey).to(NodeTag) <= topTag:
# Extract dangling links which are inside the accounts range
innerSubTrie.add w
# Build partial hexary trie
let rc = tmpDb.hexaDb.hexaryInterpolate(
root, slots, bootstrap = (proof.len == 0))
tmpDb.root, slots, bootstrap = (proof.len == 0))
if rc.isErr:
return err(rc.error)
# Commit to main descriptor
for k,v in tmpDb.hexaDb.tab.pairs:
if not k.isNodeKey:
return err(UnresolvedRepairNode)
ps.hexaDb.tab[k] = v
# Collect missing inner sub-trees in the reconstructed partial hexary
# trie (if any).
let bottomTag = slots[0].pathTag
for w in innerSubTrie:
if ps.hexaDb.tab.hasKey(w.nodeKey.to(RepairKey)):
continue
# Verify that `base` is to the left of the first slot and there is
# nothing in between. Without proof, there can only be a complete
# set/list of slots. There must be a proof for an empty list.
if not noBaseBoundCheck and
w.partialPath.max(NodeKey).to(NodeTag) < bottomTag:
return err(LowerBoundProofError)
# Otherwise register left over entry
dangling.add w
ok()
# Commit to main descriptor
for k,v in tmpDb.hexaDb.tab.pairs:
if not k.isNodeKey:
return err(UnresolvedRepairNode)
ps.hexaDb.tab[k] = v
elif proof.len == 0:
# There must be a proof for an empty argument list.
return err(LowerBoundProofError)
ok(dangling)
# ------------------------------------------------------------------------------
# Public constructor
@ -187,6 +234,7 @@ proc importStorageSlots*(
ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor
data: AccountStorageRange; ## Account storage reply from `snap/1` protocol
persistent = false; ## store data on disk
noBaseBoundCheck = false; ## Ignore left boundary proof check if `true`
): seq[HexaryNodeReport] =
## Validate and import storage slots (using proofs as received with the snap
## message `StorageRanges`). This function accumulates data in a memory table
@ -213,9 +261,10 @@ proc importStorageSlots*(
if 0 <= sTop:
try:
for n in 0 ..< sTop:
# These ones never come with proof data
# These ones always come without proof data => `NodeTag.default`
itemInx = some(n)
let rc = ps.importStorageSlots(data.storages[n], @[])
let rc = ps.importStorageSlots(
NodeTag.default, data.storages[n], @[])
if rc.isErr:
result.add HexaryNodeReport(slot: itemInx, error: rc.error)
trace "Storage slots item fails", peer, itemInx=n, nItems,
@ -225,12 +274,15 @@ proc importStorageSlots*(
# Final one might come with proof data
block:
itemInx = some(sTop)
let rc = ps.importStorageSlots(data.storages[sTop], data.proof)
let rc = ps.importStorageSlots(
data.base, data.storages[sTop], data.proof, noBaseBoundCheck)
if rc.isErr:
result.add HexaryNodeReport(slot: itemInx, error: rc.error)
trace "Storage slots last item fails", peer, itemInx=sTop, nItems,
nSlots=data.storages[sTop].data.len, proofs=data.proof.len,
error=rc.error, nErrors=result.len
elif 0 < rc.value.len:
result.add HexaryNodeReport(slot: itemInx, dangling: rc.value)
# Store to disk
if persistent and 0 < ps.hexaDb.tab.len:
@ -260,11 +312,12 @@ proc importStorageSlots*(
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer; ## For log messages, only
data: AccountStorageRange; ## Account storage reply from `snap/1` protocol
noBaseBoundCheck = false; ## Ignore left boundary proof check if `true`
): seq[HexaryNodeReport] =
## Variant of `importStorages()`
SnapDbStorageSlotsRef.init(
pv, Hash256().to(NodeKey), Hash256(), peer).importStorageSlots(
data, persistent = true)
data, persistent = true, noBaseBoundCheck)
proc importRawStorageSlotsNodes*(

View File

@ -52,7 +52,8 @@
## the argument list is empty
## * `{leaf-nodes}`: list is optimised out
## * `{check-nodes}`: list implemented as `env.fetchAccounts.checkNodes`
## * `{storage-roots}`: list implemented as `env.fetchStorage`
## * `{storage-roots}`: list implemented as pair of queues
## `env.fetchStorageFull` and `env.fetchStoragePart`
##
## Discussion of flow chart
## ------------------------
@ -132,10 +133,11 @@ const
template logTxt(info: static[string]): static[string] =
"Accounts healing " & info
proc healingCtx(buddy: SnapBuddyRef): string =
let
ctx = buddy.ctx
env = buddy.data.pivotEnv
proc healingCtx(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): string =
let ctx = buddy.ctx
"{" &
"pivot=" & "#" & $env.stateHeader.blockNumber & "," &
"nAccounts=" & $env.nAccounts & "," &
@ -148,7 +150,10 @@ proc healingCtx(buddy: SnapBuddyRef): string =
# Private functions
# ------------------------------------------------------------------------------
proc updateMissingNodesList(buddy: SnapBuddyRef) =
proc updateMissingNodesList(
buddy: SnapBuddyRef;
env: SnapPivotRef;
) =
## Check whether previously missing nodes from the `missingNodes` list
## have been magically added to the database since it was checked last
## time. These nodes will me moved to `checkNodes` for further processing.
@ -156,7 +161,6 @@ proc updateMissingNodesList(buddy: SnapBuddyRef) =
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
env = buddy.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
var delayed: seq[NodeSpecs]
@ -173,7 +177,10 @@ proc updateMissingNodesList(buddy: SnapBuddyRef) =
env.fetchAccounts.missingNodes = env.fetchAccounts.missingNodes & delayed
proc appendMoreDanglingNodesToMissingNodesList(buddy: SnapBuddyRef): bool =
proc appendMoreDanglingNodesToMissingNodesList(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): bool =
## Starting with a given set of potentially dangling account nodes
## `checkNodes`, this set is filtered and processed. The outcome is
## fed back to the vey same list `checkNodes`
@ -181,7 +188,6 @@ proc appendMoreDanglingNodesToMissingNodesList(buddy: SnapBuddyRef): bool =
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
env = buddy.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
rc = db.inspectAccountsTrie(peer, stateRoot, env.fetchAccounts.checkNodes)
@ -189,7 +195,7 @@ proc appendMoreDanglingNodesToMissingNodesList(buddy: SnapBuddyRef): bool =
if rc.isErr:
when extraTraceMessages:
error logTxt "failed => stop", peer,
ctx=buddy.healingCtx(), error=rc.error
ctx=buddy.healingCtx(env), error=rc.error
# Attempt to switch peers, there is not much else we can do here
buddy.ctrl.zombie = true
return false
@ -204,6 +210,7 @@ proc appendMoreDanglingNodesToMissingNodesList(buddy: SnapBuddyRef): bool =
proc getMissingNodesFromNetwork(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): Future[seq[NodeSpecs]]
{.async.} =
## Extract from `missingNodes` the next batch of nodes that need
@ -211,7 +218,6 @@ proc getMissingNodesFromNetwork(
let
ctx = buddy.ctx
peer = buddy.peer
env = buddy.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
pivot = "#" & $env.stateHeader.blockNumber # for logging
@ -257,12 +263,12 @@ proc getMissingNodesFromNetwork(
discard
when extraTraceMessages:
trace logTxt "fetch nodes error => stop", peer,
ctx=buddy.healingCtx(), error
ctx=buddy.healingCtx(env), error
else:
discard
when extraTraceMessages:
trace logTxt "fetch nodes error", peer,
ctx=buddy.healingCtx(), error
ctx=buddy.healingCtx(env), error
return @[]
@ -270,6 +276,7 @@ proc getMissingNodesFromNetwork(
proc kvAccountLeaf(
buddy: SnapBuddyRef;
node: NodeSpecs;
env: SnapPivotRef;
): (bool,NodeKey,Account)
{.gcsafe, raises: [Defect,RlpError]} =
## Re-read leaf node from persistent database (if any)
@ -286,18 +293,19 @@ proc kvAccountLeaf(
when extraTraceMessages:
trace logTxt "non-leaf node path", peer,
ctx=buddy.healingCtx(), nNibbles=nibbles.len
ctx=buddy.healingCtx(env), nNibbles=nibbles.len
proc registerAccountLeaf(
buddy: SnapBuddyRef;
accKey: NodeKey;
acc: Account) =
acc: Account;
env: SnapPivotRef;
) =
## Process single account node as would be done with an interval by
## the `storeAccounts()` function
let
peer = buddy.peer
env = buddy.data.pivotEnv
pt = accKey.to(NodeTag)
# Find range set (from list) containing `pt`
@ -316,7 +324,7 @@ proc registerAccountLeaf(
# Update storage slots batch
if acc.storageRoot != emptyRlpHash:
env.fetchStorage.merge AccountSlotsHeader(
env.fetchStorageFull.merge AccountSlotsHeader(
acckey: accKey,
storageRoot: acc.storageRoot)
@ -324,32 +332,35 @@ proc registerAccountLeaf(
# Private functions: do the healing for one round
# ------------------------------------------------------------------------------
proc accountsHealingImpl(buddy: SnapBuddyRef): Future[int] {.async.} =
proc accountsHealingImpl(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): Future[int]
{.async.} =
## Fetching and merging missing account trie database nodes. It returns the
## number of nodes fetched from the network, and -1 upon error.
let
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
env = buddy.data.pivotEnv
# Update for changes since last visit
buddy.updateMissingNodesList()
buddy.updateMissingNodesList(env)
# If `checkNodes` is empty, healing is at the very start or was
# postponed in which case `missingNodes` is non-empty.
if env.fetchAccounts.checkNodes.len != 0 or
env.fetchAccounts.missingNodes.len == 0:
if not buddy.appendMoreDanglingNodesToMissingNodesList():
if not buddy.appendMoreDanglingNodesToMissingNodesList(env):
return 0
# Check whether the trie is complete.
if env.fetchAccounts.missingNodes.len == 0:
trace logTxt "complete", peer, ctx=buddy.healingCtx()
trace logTxt "complete", peer, ctx=buddy.healingCtx(env)
return 0 # nothing to do
# Get next batch of nodes that need to be merged it into the database
let nodeSpecs = await buddy.getMissingNodesFromNetwork()
let nodeSpecs = await buddy.getMissingNodesFromNetwork(env)
if nodeSpecs.len == 0:
return 0
@ -358,7 +369,7 @@ proc accountsHealingImpl(buddy: SnapBuddyRef): Future[int] {.async.} =
if 0 < report.len and report[^1].slot.isNone:
# Storage error, just run the next lap (not much else that can be done)
error logTxt "error updating persistent database", peer,
ctx=buddy.healingCtx(), nNodes=nodeSpecs.len, error=report[^1].error
ctx=buddy.healingCtx(env), nNodes=nodeSpecs.len, error=report[^1].error
env.fetchAccounts.missingNodes = env.fetchAccounts.missingNodes & nodeSpecs
return -1
@ -380,17 +391,17 @@ proc accountsHealingImpl(buddy: SnapBuddyRef): Future[int] {.async.} =
else:
# Node has been stored, double check
let (isLeaf, key, acc) = buddy.kvAccountLeaf(nodeSpecs[inx])
let (isLeaf, key, acc) = buddy.kvAccountLeaf(nodeSpecs[inx], env)
if isLeaf:
# Update `uprocessed` registry, collect storage roots (if any)
buddy.registerAccountLeaf(key, acc)
buddy.registerAccountLeaf(key, acc, env)
nLeafNodes.inc
else:
env.fetchAccounts.checkNodes.add nodePath
when extraTraceMessages:
trace logTxt "merged into database", peer,
ctx=buddy.healingCtx(), nNodes=nodeSpecs.len, nLeafNodes
ctx=buddy.healingCtx(env), nNodes=nodeSpecs.len, nLeafNodes
return nodeSpecs.len
@ -417,26 +428,26 @@ proc healAccounts*(buddy: SnapBuddyRef) {.async.} =
if env.nAccounts == 0 or
ctx.data.coveredAccounts.fullFactor < healAccountsTrigger:
#when extraTraceMessages:
# trace logTxt "postponed", peer, ctx=buddy.healingCtx()
# trace logTxt "postponed", peer, ctx=buddy.healingCtx(env)
return
when extraTraceMessages:
trace logTxt "started", peer, ctx=buddy.healingCtx()
trace logTxt "started", peer, ctx=buddy.healingCtx(env)
var
nNodesFetched = 0
nFetchLoop = 0
# Stop after `snapAccountsHealBatchFetchMax` nodes have been fetched
while nNodesFetched < snapAccountsHealBatchFetchMax:
var nNodes = await buddy.accountsHealingImpl()
var nNodes = await buddy.accountsHealingImpl(env)
if nNodes <= 0:
break
nNodesFetched.inc(nNodes)
nFetchLoop.inc
when extraTraceMessages:
trace logTxt "job done", peer, ctx=buddy.healingCtx(),
nNodesFetched, nFetchLoop
trace logTxt "job done", peer, ctx=buddy.healingCtx(env),
nNodesFetched, nFetchLoop, runState=buddy.ctrl.state
# ------------------------------------------------------------------------------
# End

View File

@ -13,7 +13,7 @@
##
## This module works similar to `heal_accounts` applied to each per-account
## storage slots hexary trie. These per-account trie work items are stored in
## the list `env.fetchStorage`.
## the pair of queues `env.fetchStorageFull` and `env.fetchStoragePart`.
##
## There is one additional short cut for speeding up processing. If a
## per-account storage slots hexary trie is marked inheritable, it will be
@ -55,10 +55,9 @@ template logTxt(info: static[string]): static[string] =
proc healingCtx(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
env: SnapPivotRef;
): string =
let
env = buddy.data.pivotEnv
slots = kvp.data.slots
let slots = kvp.data.slots
"{" &
"pivot=" & "#" & $env.stateHeader.blockNumber & "," &
"covered=" & slots.unprocessed.emptyFactor.toPC(0) & "," &
@ -96,7 +95,9 @@ proc acceptWorkItemAsIs(
proc updateMissingNodesList(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair) =
kvp: SnapSlotsQueuePair;
env: SnapPivotRef;
) =
## Check whether previously missing nodes from the `missingNodes` list
## have been magically added to the database since it was checked last
## time. These nodes will me moved to `checkNodes` for further processing.
@ -104,7 +105,6 @@ proc updateMissingNodesList(
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
env = buddy.data.pivotEnv
accKey = kvp.data.accKey
storageRoot = kvp.key
slots = kvp.data.slots
@ -126,6 +126,7 @@ proc updateMissingNodesList(
proc appendMoreDanglingNodesToMissingNodesList(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
env: SnapPivotRef;
): bool =
## Starting with a given set of potentially dangling intermediate trie nodes
## `checkNodes`, this set is filtered and processed. The outcome is fed back
@ -134,7 +135,6 @@ proc appendMoreDanglingNodesToMissingNodesList(
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
env = buddy.data.pivotEnv
accKey = kvp.data.accKey
storageRoot = kvp.key
slots = kvp.data.slots
@ -143,9 +143,9 @@ proc appendMoreDanglingNodesToMissingNodesList(
if rc.isErr:
when extraTraceMessages:
error logTxt "failed => stop", peer,
itCtx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists,
nStorageQueue=env.fetchStorage.len, error=rc.error
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
error logTxt "failed => stop", peer, itCtx=buddy.healingCtx(kvp,env),
nSlotLists=env.nSlotLists, nStorageQueue, error=rc.error
# Attempt to switch peers, there is not much else we can do here
buddy.ctrl.zombie = true
return false
@ -160,6 +160,7 @@ proc appendMoreDanglingNodesToMissingNodesList(
proc getMissingNodesFromNetwork(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
env: SnapPivotRef;
): Future[seq[NodeSpecs]]
{.async.} =
## Extract from `missingNodes` the next batch of nodes that need
@ -167,7 +168,6 @@ proc getMissingNodesFromNetwork(
let
ctx = buddy.ctx
peer = buddy.peer
env = buddy.data.pivotEnv
accKey = kvp.data.accKey
storageRoot = kvp.key
pivot = "#" & $env.stateHeader.blockNumber # for logging
@ -217,15 +217,17 @@ proc getMissingNodesFromNetwork(
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors):
discard
when extraTraceMessages:
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
trace logTxt "fetch nodes error => stop", peer,
itCtx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists,
nStorageQueue=env.fetchStorage.len, error
itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists,
nStorageQueue, error
else:
discard
when extraTraceMessages:
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
trace logTxt "fetch nodes error", peer,
itCtx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists,
nStorageQueue=env.fetchStorage.len, error
itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists,
nStorageQueue, error
return @[]
@ -234,12 +236,12 @@ proc kvStorageSlotsLeaf(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
node: NodeSpecs;
env: SnapPivotRef;
): (bool,NodeKey)
{.gcsafe, raises: [Defect,RlpError]} =
## Read leaf node from persistent database (if any)
let
peer = buddy.peer
env = buddy.data.pivotEnv
nodeRlp = rlpFromBytes node.data
(_,prefix) = hexPrefixDecode node.partialPath
@ -252,12 +254,13 @@ proc kvStorageSlotsLeaf(
proc registerStorageSlotsLeaf(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
slotKey: NodeKey) =
slotKey: NodeKey;
env: SnapPivotRef;
) =
## Process single trie node as would be done with an interval by
## the `storeStorageSlots()` function
let
peer = buddy.peer
env = buddy.data.pivotEnv
slots = kvp.data.slots
pt = slotKey.to(NodeTag)
@ -274,9 +277,60 @@ proc registerStorageSlotsLeaf(
discard ivSet.reduce(pt,pt)
when extraTraceMessages:
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
trace logTxt "single node", peer,
itCtx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists,
nStorageQueue=env.fetchStorage.len, slotKey=pt
itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists,
nStorageQueue, slotKey=pt
proc assembleWorkItemsQueue(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): (seq[SnapSlotsQueuePair],int) =
## ..
var
toBeHealed: seq[SnapSlotsQueuePair]
nAcceptedAsIs = 0
# Search the current slot item batch list for items to complete via healing
for kvp in env.fetchStoragePart.nextPairs:
# Marked items indicate that a partial sub-trie existsts which might have
# been inherited from an earlier storage root.
if kvp.data.inherit:
# Remove `kvp` work item from the queue object (which is allowed within a
# `for` loop over a `KeyedQueue` object type.)
env.fetchStorageFull.del(kvp.key)
# With some luck, the `kvp` work item refers to a complete storage trie
# that can be be accepted as-is in wich case `kvp` can be just dropped.
let rc = buddy.acceptWorkItemAsIs(kvp)
if rc.isOk and rc.value:
env.nSlotLists.inc
nAcceptedAsIs.inc # for logging
continue # dropping `kvp`
toBeHealed.add kvp
if healStoragesSlotsBatchMax <= toBeHealed.len:
return (toBeHealed, nAcceptedAsIs)
# Ditto for partial items queue
for kvp in env.fetchStoragePart.nextPairs:
if healSlorageSlotsTrigger <= kvp.data.slots.unprocessed.emptyFactor:
env.fetchStoragePart.del(kvp.key)
let rc = buddy.acceptWorkItemAsIs(kvp)
if rc.isOk and rc.value:
env.nSlotLists.inc
nAcceptedAsIs.inc # for logging
continue # dropping `kvp`
# Add to local batch to be processed, below
toBeHealed.add kvp
if healStoragesSlotsBatchMax <= toBeHealed.len:
break
(toBeHealed, nAcceptedAsIs)
# ------------------------------------------------------------------------------
# Private functions: do the healing for one work item (sub-trie)
@ -285,6 +339,7 @@ proc registerStorageSlotsLeaf(
proc storageSlotsHealing(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
env: SnapPivotRef;
): Future[bool]
{.async.} =
## Returns `true` is the sub-trie is complete (probably inherited), and
@ -293,29 +348,30 @@ proc storageSlotsHealing(
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
env = buddy.data.pivotEnv
accKey = kvp.data.accKey
slots = kvp.data.slots
when extraTraceMessages:
trace logTxt "started", peer, itCtx=buddy.healingCtx(kvp),
nSlotLists=env.nSlotLists, nStorageQueue=env.fetchStorage.len
block:
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
trace logTxt "started", peer, itCtx=buddy.healingCtx(kvp,env),
nSlotLists=env.nSlotLists, nStorageQueue
# Update for changes since last visit
buddy.updateMissingNodesList(kvp)
buddy.updateMissingNodesList(kvp, env)
# ???
if slots.checkNodes.len != 0:
if not buddy.appendMoreDanglingNodesToMissingNodesList(kvp):
if not buddy.appendMoreDanglingNodesToMissingNodesList(kvp,env):
return false
# Check whether the trie is complete.
if slots.missingNodes.len == 0:
trace logTxt "complete", peer, itCtx=buddy.healingCtx(kvp)
trace logTxt "complete", peer, itCtx=buddy.healingCtx(kvp,env)
return true
# Get next batch of nodes that need to be merged it into the database
let nodeSpecs = await buddy.getMissingNodesFromNetwork(kvp)
let nodeSpecs = await buddy.getMissingNodesFromNetwork(kvp,env)
if nodeSpecs.len == 0:
return
@ -323,17 +379,19 @@ proc storageSlotsHealing(
let report = db.importRawStorageSlotsNodes(peer, accKey, nodeSpecs)
if 0 < report.len and report[^1].slot.isNone:
# Storage error, just run the next lap (not much else that can be done)
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
error logTxt "error updating persistent database", peer,
itCtx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists,
nStorageQueue=env.fetchStorage.len, nNodes=nodeSpecs.len,
error=report[^1].error
itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists,
nStorageQueue, nNodes=nodeSpecs.len, error=report[^1].error
slots.missingNodes = slots.missingNodes & nodeSpecs
return false
when extraTraceMessages:
trace logTxt "nodes merged into database", peer,
itCtx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists,
nStorageQueue=env.fetchStorage.len, nNodes=nodeSpecs.len
block:
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
trace logTxt "nodes merged into database", peer,
itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists,
nStorageQueue, nNodes=nodeSpecs.len
# Filter out error and leaf nodes
var nLeafNodes = 0 # for logging
@ -354,23 +412,24 @@ proc storageSlotsHealing(
else:
# Node has been stored, double check
let (isLeaf, slotKey) =
buddy.kvStorageSlotsLeaf(kvp, nodeSpecs[inx])
buddy.kvStorageSlotsLeaf(kvp, nodeSpecs[inx], env)
if isLeaf:
# Update `uprocessed` registry, collect storage roots (if any)
buddy.registerStorageSlotsLeaf(kvp, slotKey)
buddy.registerStorageSlotsLeaf(kvp, slotKey, env)
nLeafNodes.inc
else:
slots.checkNodes.add nodePath
when extraTraceMessages:
trace logTxt "job done", peer,
itCtx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists,
nStorageQueue=env.fetchStorage.len, nLeafNodes
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
trace logTxt "job done", peer, itCtx=buddy.healingCtx(kvp,env),
nSlotLists=env.nSlotLists, nStorageQueue, nLeafNodes
proc healingIsComplete(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
env: SnapPivotRef;
): Future[bool]
{.async.} =
## Check whether the storage trie can be completely inherited and prepare for
@ -382,7 +441,6 @@ proc healingIsComplete(
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
env = buddy.data.pivotEnv
accKey = kvp.data.accKey
storageRoot = kvp.key
@ -392,9 +450,9 @@ proc healingIsComplete(
if rc.isErr:
# Oops, not much we can do here (looping trie?)
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
error logTxt "problem inspecting storage trie", peer,
nSlotLists=env.nSlotLists, nStorageQueue=env.fetchStorage.len,
storageRoot, error=rc.error
nSlotLists=env.nSlotLists, nStorageQueue, storageRoot, error=rc.error
return false
# Check whether the hexary trie can be inherited as-is.
@ -413,7 +471,7 @@ proc healingIsComplete(
NodeTagRange.new(low(NodeTag),high(NodeTag)))
# Proceed with healing
return await buddy.storageSlotsHealing(kvp)
return await buddy.storageSlotsHealing(kvp, env)
# ------------------------------------------------------------------------------
# Public functions
@ -427,72 +485,40 @@ proc healStorageSlots*(buddy: SnapBuddyRef) {.async.} =
peer = buddy.peer
env = buddy.data.pivotEnv
var
toBeHealed: seq[SnapSlotsQueuePair]
nAcceptedAsIs = 0
# Search the current slot item batch list for items to complete via healing
for kvp in env.fetchStorage.nextPairs:
# Marked items indicate that a partial sub-trie existsts which might have
# been inherited from an earlier storage root.
if not kvp.data.inherit:
let slots = kvp.data.slots
# Otherwise check partally fetched sub-tries only if they have a certain
# degree of completeness.
if slots.isNil or slots.unprocessed.emptyFactor < healSlorageSlotsTrigger:
continue
# Remove `kvp` work item from the queue object (which is allowed within a
# `for` loop over a `KeyedQueue` object type.)
env.fetchStorage.del(kvp.key)
# With some luck, the `kvp` work item refers to a complete storage trie
# that can be be accepted as-is in wich case `kvp` can be just dropped.
block:
let rc = buddy.acceptWorkItemAsIs(kvp)
if rc.isOk and rc.value:
env.nSlotLists.inc
nAcceptedAsIs.inc # for logging
continue # dropping `kvp`
# Add to local batch to be processed, below
toBeHealed.add kvp
if healStoragesSlotsBatchMax <= toBeHealed.len:
break
(toBeHealed, nAcceptedAsIs) = buddy.assembleWorkItemsQueue(env)
# Run against local batch
let nHealerQueue = toBeHealed.len
if 0 < nHealerQueue:
when extraTraceMessages:
trace logTxt "processing", peer,
nSlotLists=env.nSlotLists, nStorageQueue=env.fetchStorage.len,
nHealerQueue, nAcceptedAsIs
block:
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
trace logTxt "processing", peer,
nSlotLists=env.nSlotLists, nStorageQueue, nHealerQueue, nAcceptedAsIs
for n in 0 ..< toBeHealed.len:
let
kvp = toBeHealed[n]
isComplete = await buddy.healingIsComplete(kvp)
if isComplete:
env.nSlotLists.inc
nAcceptedAsIs.inc
else:
env.fetchStorage.merge kvp
let kvp = toBeHealed[n]
if buddy.ctrl.stopped:
# Oops, peer has gone
env.fetchStorage.merge toBeHealed[n+1 ..< toBeHealed.len]
break
if buddy.ctrl.running:
if await buddy.healingIsComplete(kvp,env):
env.nSlotLists.inc
nAcceptedAsIs.inc
continue
if kvp.data.slots.isNil:
env.fetchStorageFull.merge kvp # should be the exception
else:
env.fetchStoragePart.merge kvp
when extraTraceMessages:
trace logTxt "done", peer,
nSlotLists=env.nSlotLists, nStorageQueue=env.fetchStorage.len,
nHealerQueue, nAcceptedAsIs
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
trace logTxt "done", peer, nSlotLists=env.nSlotLists, nStorageQueue,
nHealerQueue, nAcceptedAsIs, runState=buddy.ctrl.state
elif 0 < nAcceptedAsIs:
trace logTxt "work items", peer,
nSlotLists=env.nSlotLists, nStorageQueue=env.fetchStorage.len,
nHealerQueue, nAcceptedAsIs
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
trace logTxt "work items", peer, nSlotLists=env.nSlotLists,
nStorageQueue, nHealerQueue, nAcceptedAsIs, runState=buddy.ctrl.state
# ------------------------------------------------------------------------------
# End

View File

@ -37,7 +37,7 @@ import
stew/[interval_set, keyed_queue],
stint,
../../sync_desc,
".."/[range_desc, worker_desc],
".."/[constants, range_desc, worker_desc],
./com/[com_error, get_account_range],
./db/snapdb_accounts
@ -50,12 +50,6 @@ const
extraTraceMessages = false or true
## Enabled additional logging noise
numChunksMax = 2000
## Bound for `numChunks()` (some fancy number)
addToFetchLoopMax = 4
## Add some extra when calculating number of fetch/store rounds
# ------------------------------------------------------------------------------
# Private logging helpers
# ------------------------------------------------------------------------------
@ -63,75 +57,51 @@ const
template logTxt(info: static[string]): static[string] =
"Accounts range " & info
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc numChunks(buddy: SnapBuddyRef): int =
var total = 0u64
for ivSet in buddy.data.pivotEnv.fetchAccounts.unprocessed:
total += ivSet.chunks.uint64
min(numChunksMax.uint64, total).int
proc withMaxLen(
proc dumpUnprocessed(
buddy: SnapBuddyRef;
iv: NodeTagRange;
maxlen: UInt256;
): NodeTagRange =
## Reduce accounts interval to maximal size
if 0 < iv.len and iv.len <= maxLen:
iv
else:
NodeTagRange.new(iv.minPt, iv.minPt + (maxLen - 1.u256))
env: SnapPivotRef;
): string =
## Debugging ...
let
peer = buddy.peer
pivot = "#" & $env.stateHeader.blockNumber # for logging
moan = proc(overlap: UInt256; iv: NodeTagRange) =
trace logTxt "unprocessed => overlap", peer, pivot, overlap, iv
env.fetchAccounts.unprocessed.dump(moan, 5)
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc getUnprocessed(buddy: SnapBuddyRef): Result[NodeTagRange,void] =
proc getUnprocessed(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): Result[NodeTagRange,void] =
## Fetch an interval from one of the account range lists.
let
env = buddy.data.pivotEnv
accountRangeMax = high(UInt256) div buddy.ctx.buddiesMax.u256
let accountRangeMax = high(UInt256) div buddy.ctx.buddiesMax.u256
for ivSet in env.fetchAccounts.unprocessed:
let rc = ivSet.ge()
if rc.isOk:
let iv = buddy.withMaxLen(rc.value, accountRangeMax)
discard ivSet.reduce(iv)
return ok(iv)
err()
proc putUnprocessed(buddy: SnapBuddyRef; iv: NodeTagRange) =
## Shortcut
discard buddy.data.pivotEnv.fetchAccounts.unprocessed[1].merge(iv)
proc delUnprocessed(buddy: SnapBuddyRef; iv: NodeTagRange) =
## Shortcut
discard buddy.data.pivotEnv.fetchAccounts.unprocessed[1].reduce(iv)
proc markGloballyProcessed(buddy: SnapBuddyRef; iv: NodeTagRange) =
## Shortcut
discard buddy.ctx.data.coveredAccounts.merge(iv)
env.fetchAccounts.unprocessed.fetch accountRangeMax
# ------------------------------------------------------------------------------
# Private functions: do the account fetching for one round
# ------------------------------------------------------------------------------
proc accountsRagefetchImpl(buddy: SnapBuddyRef): Future[bool] {.async.} =
proc accountsRangefetchImpl(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): Future[bool] {.async.} =
## Fetch accounts and store them in the database. Returns true while more
## data can probably be fetched.
let
ctx = buddy.ctx
peer = buddy.peer
env = buddy.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
pivot = "#" & $env.stateHeader.blockNumber # for logging
# Get a range of accounts to fetch from
let iv = block:
let rc = buddy.getUnprocessed()
let rc = buddy.getUnprocessed(env)
if rc.isErr:
when extraTraceMessages:
trace logTxt "currently all processed", peer, pivot
@ -142,7 +112,7 @@ proc accountsRagefetchImpl(buddy: SnapBuddyRef): Future[bool] {.async.} =
let dd = block:
let rc = await buddy.getAccountRange(stateRoot, iv, pivot)
if rc.isErr:
buddy.putUnprocessed(iv) # fail => interval back to pool
env.fetchAccounts.unprocessed.merge iv # fail => interval back to pool
let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors):
when extraTraceMessages:
@ -161,44 +131,56 @@ proc accountsRagefetchImpl(buddy: SnapBuddyRef): Future[bool] {.async.} =
# trace logTxt "fetched", peer, gotAccounts, gotStorage,
# pivot, reqLen=iv.len, gotLen=dd.consumed.len
block:
let rc = ctx.data.snapDb.importAccounts(peer, stateRoot, iv.minPt, dd.data)
# Now, as we fully own the scheduler and the original interval can savely be
# placed back for a moment -- to be corrected below.
env.fetchAccounts.unprocessed.merge iv
# Processed accounts hashes are set up as a set of intervals which is needed
# if the data range returned from the network contains holes.
let processed = NodeTagRangeSet.init()
if 0 < dd.data.accounts.len:
discard processed.merge(iv.minPt, dd.data.accounts[^1].accKey.to(NodeTag))
else:
discard processed.merge iv
let dangling = block:
# No left boundary check needed. If there is a gap, the partial path for
# that gap is returned by the import function to be registered, below.
let rc = ctx.data.snapDb.importAccounts(
peer, stateRoot, iv.minPt, dd.data, noBaseBoundCheck = true)
if rc.isErr:
# Bad data, just try another peer
buddy.putUnprocessed(iv)
buddy.ctrl.zombie = true
when extraTraceMessages:
trace logTxt "import failed => stop", peer, gotAccounts, gotStorage,
pivot, reqLen=iv.len, gotLen=dd.consumed.len, error=rc.error
pivot, reqLen=iv.len, gotLen=processed.total, error=rc.error
return
rc.value
# Statistics
env.nAccounts.inc(gotAccounts)
# Register consumed intervals on the accumulator over all state roots
buddy.markGloballyProcessed(dd.consumed)
# Punch holes into the reproted range from the network if it contains holes.
for w in dangling:
discard processed.reduce(
w.partialPath.min(NodeKey).to(NodeTag),
w.partialPath.max(NodeKey).to(Nodetag))
# Register consumed and bulk-imported (well, not yet) accounts range
block registerConsumed:
block:
# Both intervals `min(iv)` and `min(dd.consumed)` are equal
let rc = iv - dd.consumed
if rc.isOk:
# Now, `dd.consumed` < `iv`, return some unused range
buddy.putUnprocessed(rc.value)
break registerConsumed
block:
# The processed interval might be a bit larger
let rc = dd.consumed - iv
if rc.isOk:
# Remove from unprocessed data. If it is not unprocessed, anymore
# then it was doubly processed which is ok.
buddy.delUnprocessed(rc.value)
break registerConsumed
# End registerConsumed
# Update book keeping
for w in processed.increasing:
# Remove the processed range from the batch of unprocessed ones.
env.fetchAccounts.unprocessed.reduce w
# Register consumed intervals on the accumulator over all state roots.
discard buddy.ctx.data.coveredAccounts.merge w
# Store accounts on the storage TODO list.
env.fetchStorage.merge dd.withStorage
# Register accounts with storage slots on the storage TODO list.
env.fetchStorageFull.merge dd.withStorage
when extraTraceMessages:
trace logTxt "request done", peer, pivot,
nCheckNodes=env.fetchAccounts.checkNodes.len,
nMissingNodes=env.fetchAccounts.missingNodes.len,
imported=processed.dump(), unprocessed=buddy.dumpUnprocessed(env)
return true
@ -208,27 +190,32 @@ proc accountsRagefetchImpl(buddy: SnapBuddyRef): Future[bool] {.async.} =
proc rangeFetchAccounts*(buddy: SnapBuddyRef) {.async.} =
## Fetch accounts and store them in the database.
let numChunks = buddy.numChunks()
if 0 < numChunks:
let env = buddy.data.pivotEnv
if not env.fetchAccounts.unprocessed.isEmpty():
let
ctx = buddy.ctx
peer = buddy.peer
env = buddy.data.pivotEnv
pivot = "#" & $env.stateHeader.blockNumber # for logging
nFetchLoopMax = max(ctx.buddiesMax + 1, numChunks) + addToFetchLoopMax
when extraTraceMessages:
trace logTxt "start", peer, pivot, nFetchLoopMax
trace logTxt "start", peer, pivot
var nFetchAccounts = 0
while nFetchAccounts < nFetchLoopMax:
if not await buddy.accountsRagefetchImpl():
break
while not env.fetchAccounts.unprocessed.isEmpty() and
buddy.ctrl.running and
env == buddy.data.pivotEnv:
nFetchAccounts.inc
if not await buddy.accountsRangefetchImpl(env):
break
# Clean up storage slots queue first it it becomes too large
let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
if snapAccountsBuddyStoragesSlotsQuPrioThresh < nStoQu:
break
when extraTraceMessages:
trace logTxt "done", peer, pivot, nFetchAccounts, nFetchLoopMax
trace logTxt "done", peer, pivot, nFetchAccounts,
runState=buddy.ctrl.state
# ------------------------------------------------------------------------------
# End

View File

@ -34,7 +34,8 @@
##
## Legend:
## * `<..>`: some action, process, etc.
## * `{missing-storage-slots}`: list implemented as `env.fetchStorage`
## * `{missing-storage-slots}`: list implemented as pair of queues
## `env.fetchStorageFull` and `env.fetchStoragePart`
## * `(storage-slots}`: list is optimised out
## * `{completed}`: list is optimised out
## * `{partial}`: list is optimised out
@ -78,76 +79,24 @@ template logTxt(info: static[string]): static[string] =
"Storage slots range " & info
# ------------------------------------------------------------------------------
# Private functions
# Private helpers
# ------------------------------------------------------------------------------
proc getNextSlotItems(
proc getNextSlotItemsFull(
buddy: SnapBuddyRef;
noSubRange = false;
env: SnapPivotRef;
): seq[AccountSlotsHeader] =
## Get list of work item from the batch queue.
##
## * If the storage slots requested come with an explicit sub-range of slots
## (i.e. not an implied complete list), then the result has only on work
## item. An explicit list of slots is only calculated if there was a queue
## item with a partially completed slots download.
##
## * Otherwise, a list of at most `snapStoragesSlotsFetchMax` work items is
## returned. These work items were checked for that there was no trace of a
## previously installed (probably partial) storage trie on the database
## (e.g. inherited from an earlier state root pivot.)
##
## If there is an indication that the storage trie may have some data
## already it is ignored here and marked `inherit` so that it will be
## picked up by the healing process.
## Get list of full work item from the batch queue.
##
## If there is an indication that the storage trie may have some data
## already it is ignored here and marked `inherit` so that it will be
## picked up by the healing process.
let
ctx = buddy.ctx
peer = buddy.peer
env = buddy.data.pivotEnv
# Assemble first request which might come with a sub-range.
if not noSubRange:
let (reqKey, reqData) = block:
let rc = env.fetchStorage.first # peek
if rc.isErr:
return
(rc.value.key, rc.value.data)
while not reqData.slots.isNil:
# Extract first interval and return single item request queue
for ivSet in reqData.slots.unprocessed:
let rc = ivSet.ge()
if rc.isOk:
# Extraxt this interval from the range set
discard ivSet.reduce rc.value
# Delete from batch queue if the range set becomes empty
if reqData.slots.unprocessed.isEmpty:
env.fetchStorage.del(reqKey)
when extraTraceMessages:
trace logTxt "prepare fetch partial", peer,
nSlotLists=env.nSlotLists, nStorageQueue=env.fetchStorage.len,
nToProcess=1, subRange=rc.value, account=reqData.accKey
return @[AccountSlotsHeader(
accKey: reqData.accKey,
storageRoot: reqKey,
subRange: some rc.value)]
# Oops, empty range set? Remove range and move item to the right end
reqData.slots = nil
discard env.fetchStorage.lruFetch(reqKey)
# Done with partial slot ranges. Assemble maximal request queue.
var nInherit = 0
for kvp in env.fetchStorage.prevPairs:
if not kvp.data.slots.isNil:
# May happen when `noSubRange` is `true`. As the queue is read from the
# right end and all the partial slot ranges are on the left there will
# be no more non-partial slot ranges on the queue. So this loop is done.
break
var
nInherit = 0
for kvp in env.fetchStorageFull.nextPairs:
let it = AccountSlotsHeader(
accKey: kvp.data.accKey,
storageRoot: kvp.key)
@ -160,50 +109,91 @@ proc getNextSlotItems(
continue
result.add it
env.fetchStorage.del(kvp.key) # ok to delete this item from batch queue
env.fetchStorageFull.del(kvp.key) # ok to delete this item from batch queue
# Maximal number of items to fetch
if snapStoragesSlotsFetchMax <= result.len:
break
when extraTraceMessages:
trace logTxt "fetch", peer, nSlotLists=env.nSlotLists,
nStorageQueue=env.fetchStorage.len, nToProcess=result.len, nInherit
trace logTxt "fetch full", peer, nSlotLists=env.nSlotLists,
nStorageQuFull=env.fetchStorageFull.len, nToProcess=result.len, nInherit
proc getNextSlotItemPartial(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): seq[AccountSlotsHeader] =
## Get work item from the batch queue.
let
ctx = buddy.ctx
peer = buddy.peer
for kvp in env.fetchStoragePart.nextPairs:
if not kvp.data.slots.isNil:
# Extract first interval and return single item request queue
for ivSet in kvp.data.slots.unprocessed:
let rc = ivSet.ge()
if rc.isOk:
# Extraxt this interval from the range set
discard ivSet.reduce rc.value
# Delete from batch queue if the range set becomes empty
if kvp.data.slots.unprocessed.isEmpty:
env.fetchStoragePart.del(kvp.key)
when extraTraceMessages:
trace logTxt "fetch partial", peer,
nSlotLists=env.nSlotLists,
nStorageQuPart=env.fetchStoragePart.len,
subRange=rc.value, account=kvp.data.accKey
return @[AccountSlotsHeader(
accKey: kvp.data.accKey,
storageRoot: kvp.key,
subRange: some rc.value)]
# Oops, empty range set? Remove range and move item to the full requests
kvp.data.slots = nil
env.fetchStorageFull.merge kvp
proc backToSlotItemsQueue(env: SnapPivotRef; req: seq[AccountSlotsHeader]) =
if 0 < req.len:
if req[^1].subRange.isSome:
env.fetchStoragePart.merge req[^1]
env.fetchStorageFull.merge req[0 ..< req.len-1]
else:
env.fetchStorageFull.merge req
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc storeStoragesSingleBatch(
buddy: SnapBuddyRef;
noSubRange = false;
req: seq[AccountSlotsHeader];
env: SnapPivotRef;
) {.async.} =
## Fetch account storage slots and store them in the database.
let
ctx = buddy.ctx
peer = buddy.peer
env = buddy.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
pivot = "#" & $env.stateHeader.blockNumber # for logging
# Fetch storage data and save it on disk. Storage requests are managed by
# a request queue for handling partioal replies and re-fetch issues. For
# all practical puroses, this request queue should mostly be empty.
# Pull out the next request list from the queue
let req = buddy.getNextSlotItems()
if req.len == 0:
return # currently nothing to do
# Get storages slots data from the network
var stoRange = block:
let rc = await buddy.getStorageRanges(stateRoot, req, pivot)
if rc.isErr:
env.fetchStorage.merge req
env.backToSlotItemsQueue req
let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors):
discard
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
trace logTxt "fetch error => stop", peer, pivot,
nSlotLists=env.nSlotLists, nReq=req.len,
nStorageQueue=env.fetchStorage.len, error
nSlotLists=env.nSlotLists, nReq=req.len, nStorageQueue, error
return
rc.value
@ -213,25 +203,28 @@ proc storeStoragesSingleBatch(
var gotSlotLists = stoRange.data.storages.len
#when extraTraceMessages:
# trace logTxt "fetched", peer, pivot,
# nSlotLists=env.nSlotLists, nSlotLists=gotSlotLists, nReq=req.len,
# nStorageQueue=env.fetchStorage.len, nLeftOvers=stoRange.leftOver.len
# let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
# trace logTxt "fetched", peer, pivot, nSlotLists=env.nSlotLists,
# nSlotLists=gotSlotLists, nReq=req.len,
# nStorageQueue, nLeftOvers=stoRange.leftOver.len
if 0 < gotSlotLists:
# Verify/process storages data and save it to disk
let report = ctx.data.snapDb.importStorageSlots(peer, stoRange.data)
let report = ctx.data.snapDb.importStorageSlots(
peer, stoRange.data, noBaseBoundCheck = true)
if 0 < report.len:
let topStoRange = stoRange.data.storages.len - 1
if report[^1].slot.isNone:
# Failed to store on database, not much that can be done here
env.fetchStorage.merge req
env.backToSlotItemsQueue req
gotSlotLists.dec(report.len - 1) # for logging only
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
error logTxt "import failed", peer, pivot,
nSlotLists=env.nSlotLists, nSlotLists=gotSlotLists, nReq=req.len,
nStorageQueue=env.fetchStorage.len, error=report[^1].error
nStorageQueue, error=report[^1].error
return
# Push back error entries to be processed later
@ -246,33 +239,37 @@ proc storeStoragesSingleBatch(
# Reset any partial result (which would be the last entry) to
# requesting the full interval. So all the storage slots are
# re-fetched completely for this account.
env.fetchStorage.merge AccountSlotsHeader(
env.fetchStorageFull.merge AccountSlotsHeader(
accKey: stoRange.data.storages[inx].account.accKey,
storageRoot: stoRange.data.storages[inx].account.storageRoot)
# Last entry might be partial
if inx == topStoRange:
# No partial result processing anymore to consider
# Forget about partial result processing if the last partial entry
# was reported because
# * either there was an error processing it
# * or there were some gaps reprored as dangling links
stoRange.data.proof = @[]
# Update local statistics counter for `nSlotLists` counter update
gotSlotLists.dec
trace logTxt "processing error", peer, pivot, nSlotLists=env.nSlotLists,
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
error logTxt "processing error", peer, pivot, nSlotLists=env.nSlotLists,
nSlotLists=gotSlotLists, nReqInx=inx, nReq=req.len,
nStorageQueue=env.fetchStorage.len, error=report[inx].error
nStorageQueue, nDangling=w.dangling.len, error=w.error
# Update statistics
if gotSlotLists == 1 and
req[0].subRange.isSome and
env.fetchStorage.hasKey req[0].storageRoot:
env.fetchStoragePart.hasKey req[0].storageRoot:
# Successful partial request, but not completely done with yet.
gotSlotLists = 0
env.nSlotLists.inc(gotSlotLists)
# Return unprocessed left overs to batch queue
env.fetchStorage.merge stoRange.leftOver
env.backToSlotItemsQueue stoRange.leftOver
# ------------------------------------------------------------------------------
# Public functions
@ -287,43 +284,46 @@ proc rangeFetchStorageSlots*(buddy: SnapBuddyRef) {.async.} =
let
env = buddy.data.pivotEnv
peer = buddy.peer
fullRangeLen = env.fetchStorageFull.len
partRangeLen = env.fetchStoragePart.len
if 0 < env.fetchStorage.len:
# Run at most the minimum number of times to get the batch queue cleaned up.
var
fullRangeLoopCount =
1 + (env.fetchStorage.len - 1) div snapStoragesSlotsFetchMax
subRangeLoopCount = 0
# Add additional counts for partial slot range items
for reqData in env.fetchStorage.nextValues:
if reqData.slots.isNil:
break
subRangeLoopCount.inc
# Fetch storage data and save it on disk. Storage requests are managed by
# request queues for handling full/partial replies and re-fetch issues. For
# all practical puroses, this request queue should mostly be empty.
if 0 < fullRangeLen or 0 < partRangeLen:
when extraTraceMessages:
trace logTxt "start", peer, nSlotLists=env.nSlotLists,
nStorageQueue=env.fetchStorage.len, fullRangeLoopCount,
subRangeLoopCount
nStorageQueue=(fullRangeLen+partRangeLen)
# Processing the full range will implicitely handle inheritable storage
# slots first wich each batch item (see `getNextSlotItems()`.)
while 0 < fullRangeLoopCount and
0 < env.fetchStorage.len and
not buddy.ctrl.stopped:
fullRangeLoopCount.dec
await buddy.storeStoragesSingleBatch(noSubRange = true)
# slots first with each batch item (see `getNextSlotItemsFull()`.)
var fullRangeItemsleft = 1 + (fullRangeLen-1) div snapStoragesSlotsFetchMax
while 0 < fullRangeItemsleft and
buddy.ctrl.running and
env == buddy.data.pivotEnv:
# Pull out the next request list from the queue
let req = buddy.getNextSlotItemsFull(env)
if req.len == 0:
break
fullRangeItemsleft.dec
await buddy.storeStoragesSingleBatch(req, env)
while 0 < subRangeLoopCount and
0 < env.fetchStorage.len and
not buddy.ctrl.stopped:
subRangeLoopCount.dec
await buddy.storeStoragesSingleBatch(noSubRange = false)
var partialRangeItemsLeft = env.fetchStoragePart.len
while 0 < partialRangeItemsLeft and
buddy.ctrl.running and
env == buddy.data.pivotEnv:
# Pull out the next request list from the queue
let req = buddy.getNextSlotItemPartial(env)
if req.len == 0:
break
partialRangeItemsLeft.dec
await buddy.storeStoragesSingleBatch(req, env)
when extraTraceMessages:
trace logTxt "done", peer, nSlotLists=env.nSlotLists,
nStorageQueue=env.fetchStorage.len, fullRangeLoopCount,
subRangeLoopCount
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
trace logTxt "done", peer, nSlotLists=env.nSlotLists, nStorageQueue,
fullRangeItemsleft, partialRangeItemsLeft, runState=buddy.ctrl.state
# ------------------------------------------------------------------------------
# End

View File

@ -53,44 +53,49 @@ const
# Private functions: pretty printing
# ------------------------------------------------------------------------------
proc ppMs*(elapsed: times.Duration): string
{.gcsafe, raises: [Defect, ValueError]} =
result = $elapsed.inMilliseconds
let ns = elapsed.inNanoseconds mod 1_000_000 # fraction of a milli second
if ns != 0:
# to rounded deca milli seconds
let dm = (ns + 5_000i64) div 10_000i64
result &= &".{dm:02}"
result &= "ms"
# proc ppMs*(elapsed: times.Duration): string
# {.gcsafe, raises: [Defect, ValueError]} =
# result = $elapsed.inMilliseconds
# let ns = elapsed.inNanoseconds mod 1_000_000 # fraction of a milli second
# if ns != 0:
# # to rounded deca milli seconds
# let dm = (ns + 5_000i64) div 10_000i64
# result &= &".{dm:02}"
# result &= "ms"
#
# proc ppSecs*(elapsed: times.Duration): string
# {.gcsafe, raises: [Defect, ValueError]} =
# result = $elapsed.inSeconds
# let ns = elapsed.inNanoseconds mod 1_000_000_000 # fraction of a second
# if ns != 0:
# # round up
# let ds = (ns + 5_000_000i64) div 10_000_000i64
# result &= &".{ds:02}"
# result &= "s"
#
# proc ppMins*(elapsed: times.Duration): string
# {.gcsafe, raises: [Defect, ValueError]} =
# result = $elapsed.inMinutes
# let ns = elapsed.inNanoseconds mod 60_000_000_000 # fraction of a minute
# if ns != 0:
# # round up
# let dm = (ns + 500_000_000i64) div 1_000_000_000i64
# result &= &":{dm:02}"
# result &= "m"
#
# proc pp(d: times.Duration): string
# {.gcsafe, raises: [Defect, ValueError]} =
# if 40 < d.inSeconds:
# d.ppMins
# elif 200 < d.inMilliseconds:
# d.ppSecs
# else:
# d.ppMs
proc ppSecs*(elapsed: times.Duration): string
{.gcsafe, raises: [Defect, ValueError]} =
result = $elapsed.inSeconds
let ns = elapsed.inNanoseconds mod 1_000_000_000 # fraction of a second
if ns != 0:
# round up
let ds = (ns + 5_000_000i64) div 10_000_000i64
result &= &".{ds:02}"
result &= "s"
proc ppMins*(elapsed: times.Duration): string
{.gcsafe, raises: [Defect, ValueError]} =
result = $elapsed.inMinutes
let ns = elapsed.inNanoseconds mod 60_000_000_000 # fraction of a minute
if ns != 0:
# round up
let dm = (ns + 500_000_000i64) div 1_000_000_000i64
result &= &":{dm:02}"
result &= "m"
proc pp(d: times.Duration): string
{.gcsafe, raises: [Defect, ValueError]} =
if 40 < d.inSeconds:
d.ppMins
elif 200 < d.inMilliseconds:
d.ppSecs
else:
d.ppMs
proc pc99(val: float): string =
if 0.99 <= val and val < 1.0: "99%"
elif 0.0 < val and val <= 0.01: "1%"
else: val.toPC(0)
# ------------------------------------------------------------------------------
# Private functions: ticking log messages
@ -117,9 +122,9 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
pivot = "n/a"
nStoQue = "n/a"
let
accCov = data.accountsFill[0].toPC(0) &
"(" & data.accountsFill[1].toPC(0) & ")" &
"/" & data.accountsFill[2].toPC(0)
accCov = data.accountsFill[0].pc99 &
"(" & data.accountsFill[1].pc99 & ")" &
"/" & data.accountsFill[2].pc99
buddies = t.nBuddies
# With `int64`, there are more than 29*10^10 years range for seconds

View File

@ -38,17 +38,14 @@ type
slots*: SnapTrieRangeBatchRef ## slots to fetch, nil => all slots
inherit*: bool ## mark this trie seen already
SnapSlotsSet* = HashSet[SnapSlotsQueueItemRef]
## Ditto but without order, to be used as veto set
SnapAccountRanges* = array[2,NodeTagRangeSet]
## Pair of account hash range lists. The first entry must be processed
## first. This allows to coordinate peers working on different state roots
## to avoid ovelapping accounts as long as they fetch from the first entry.
SnapTodoRanges* = array[2,NodeTagRangeSet]
## Pair of node range lists. The first entry must be processed first. This
## allows to coordinate peers working on different state roots to avoid
## ovelapping accounts as long as they fetch from the first entry.
SnapTrieRangeBatch* = object
## `NodeTag` ranges to fetch, healing support
unprocessed*: SnapAccountRanges ## Range of slots not covered, yet
unprocessed*: SnapTodoRanges ## Range of slots not covered, yet
checkNodes*: seq[Blob] ## Nodes with prob. dangling child links
missingNodes*: seq[NodeSpecs] ## Dangling links to fetch and merge
@ -73,7 +70,8 @@ type
accountsState*: SnapHealingState ## All accounts have been processed
# Storage slots download
fetchStorage*: SnapSlotsQueue ## Fetch storage for these accounts
fetchStorageFull*: SnapSlotsQueue ## Fetch storage trie for these accounts
fetchStoragePart*: SnapSlotsQueue ## Partial storage trie to com[plete
storageDone*: bool ## Done with storage, block sync next
# Info
@ -121,7 +119,57 @@ proc hash*(a: Hash256): Hash =
a.data.hash
# ------------------------------------------------------------------------------
# Public helpers
# Public helpers: SnapTodoRanges
# ------------------------------------------------------------------------------
proc init*(q: var SnapTodoRanges) =
## Populate node range sets with maximal range in the first range set
q[0] = NodeTagRangeSet.init()
q[1] = NodeTagRangeSet.init()
discard q[0].merge(low(NodeTag),high(NodeTag))
proc merge*(q: var SnapTodoRanges; iv: NodeTagRange) =
## Unconditionally merge the node range into the account ranges list
discard q[0].reduce(iv)
discard q[1].merge(iv)
proc merge*(q: var SnapTodoRanges; minPt, maxPt: NodeTag) =
## Variant of `merge()`
q.merge NodeTagRange.new(minPt, maxPt)
proc reduce*(q: var SnapTodoRanges; iv: NodeTagRange) =
## Unconditionally remove the node range from the account ranges list
discard q[0].reduce(iv)
discard q[1].reduce(iv)
proc reduce*(q: var SnapTodoRanges; minPt, maxPt: NodeTag) =
## Variant of `reduce()`
q.reduce NodeTagRange.new(minPt, maxPt)
proc fetch*(q: var SnapTodoRanges; maxLen: UInt256): Result[NodeTagRange,void] =
## Fetch interval from node ranges with maximal size `maxLen`
# Swap batch queues if the first one is empty
if q[0].isEmpty:
swap(q[0], q[1])
# Fetch from first range list
let rc = q[0].ge()
if rc.isErr:
return err()
let
val = rc.value
iv = if 0 < val.len and val.len <= maxLen: val # val.len==0 => 2^256
else: NodeTagRange.new(val.minPt, val.minPt + (maxLen - 1.u256))
discard q[0].reduce(iv)
ok(iv)
# ------------------------------------------------------------------------------
# Public helpers: SlotsQueue
# ------------------------------------------------------------------------------
proc merge*(q: var SnapSlotsQueue; kvp: SnapSlotsQueuePair) =

View File

@ -73,7 +73,7 @@ proc running*(ctrl: BuddyCtrlRef): bool =
proc stopped*(ctrl: BuddyCtrlRef): bool =
## Getter, if `true`, if `ctrl.state()` is not `Running`
ctrl.runState in {Stopped, ZombieStop, ZombieRun}
ctrl.runState != Running
proc zombie*(ctrl: BuddyCtrlRef): bool =
## Getter, `true` if `ctrl.state()` is `Zombie` (i.e. not `running()` and

View File

@ -189,7 +189,7 @@ iterator undumpNextStorages*(gzFile: string): UndumpStorages =
if flds.len == 2:
data.data.storages[^1].data.add SnapStorage(
slotHash: Hash256.fromHex(flds[0]),
slotData: flds[1].toByteSeq)
slotData: flds[1].toByteSeq)
nSlots.dec
if 0 < nSlots:
continue
@ -211,6 +211,11 @@ iterator undumpNextStorages*(gzFile: string): UndumpStorages =
nProofs.dec
if nProofs <= 0:
state = UndumpCommit
# KLUDGE: set base (field was later added)
if 0 < data.data.storages.len:
let topList = data.data.storages[^1]
if 0 < topList.data.len:
data.data.base = topList.data[0].slotHash.to(NodeTag)
continue
state = UndumpError
say &"*** expected proof data, got {line}"

View File

@ -66,6 +66,7 @@ let
# Forces `check()` to print the error (as opposed when using `isOk()`)
OkHexDb = Result[void,HexaryDbError].ok()
OkStoDb = Result[void,seq[(int,HexaryDbError)]].ok()
OkImport = Result[seq[NodeSpecs],HexaryDbError].ok(@[])
# There was a problem with the Github/CI which results in spurious crashes
# when leaving the `runner()` if the persistent BaseChainDB initialisation
@ -300,7 +301,7 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) =
else: SnapDbRef.init(newMemoryDB())
dbDesc = SnapDbAccountsRef.init(dbBase, root, peer)
for n,w in accountsList:
check dbDesc.importAccounts(w.base, w.data, persistent) == OkHexDb
check dbDesc.importAccounts(w.base, w.data, persistent) == OkImport
test &"Merging {accountsList.len} proofs for state root ..{root.pp}":
let dbBase = if persistent: SnapDbRef.init(db.cdb[1])
@ -313,7 +314,9 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) =
packed = PackedAccountRange(
accounts: accountsList.mapIt(it.data.accounts).sortMerge,
proof: accountsList.mapIt(it.data.proof).flatten)
check desc.importAccounts(lowerBound, packed, true) == OkHexDb
# Merging intervals will produce gaps, so the result is expected OK but
# different from `OkImport`
check desc.importAccounts(lowerBound, packed, true).isOk
# check desc.merge(lowerBound, accounts) == OkHexDb
desc.assignPrettyKeys() # for debugging, make sure that state root ~ "$0"
@ -430,7 +433,7 @@ proc storagesRunner(
test &"Merging {accountsList.len} accounts for state root ..{root.pp}":
for w in accountsList:
let desc = SnapDbAccountsRef.init(dbBase, root, peer)
check desc.importAccounts(w.base, w.data, persistent) == OkHexDb
check desc.importAccounts(w.base, w.data, persistent) == OkImport
test &"Merging {storagesList.len} storages lists":
let
@ -512,7 +515,7 @@ proc inspectionRunner(
rootKey = root.to(NodeKey)
desc = SnapDbAccountsRef.init(memBase, root, peer)
for w in accList:
check desc.importAccounts(w.base, w.data, persistent=false) == OkHexDb
check desc.importAccounts(w.base, w.data, persistent=false)==OkImport
let rc = desc.inspectAccountsTrie(persistent=false)
check rc.isOk
let
@ -537,7 +540,7 @@ proc inspectionRunner(
dbBase = SnapDbRef.init(db.cdb[2+n])
desc = SnapDbAccountsRef.init(dbBase, root, peer)
for w in accList:
check desc.importAccounts(w.base, w.data, persistent) == OkHexDb
check desc.importAccounts(w.base, w.data, persistent) == OkImport
let rc = desc.inspectAccountsTrie(persistent=false)
check rc.isOk
let
@ -557,7 +560,7 @@ proc inspectionRunner(
rootKey = root.to(NodeKey)
desc = memDesc.dup(root,Peer())
for w in accList:
check desc.importAccounts(w.base, w.data, persistent=false) == OkHexDb
check desc.importAccounts(w.base, w.data, persistent=false)==OkImport
let rc = desc.inspectAccountsTrie(persistent=false)
check rc.isOk
let
@ -580,7 +583,7 @@ proc inspectionRunner(
rootSet = [rootKey].toHashSet
desc = perDesc.dup(root,Peer())
for w in accList:
check desc.importAccounts(w.base, w.data, persistent) == OkHexDb
check desc.importAccounts(w.base, w.data, persistent) == OkImport
let rc = desc.inspectAccountsTrie(persistent=false)
check rc.isOk
let
@ -607,7 +610,7 @@ proc inspectionRunner(
rootKey = root.to(NodeKey)
desc = cscDesc.dup(root,Peer())
for w in accList:
check desc.importAccounts(w.base,w.data,persistent=false) == OkHexDb
check desc.importAccounts(w.base,w.data,persistent=false)==OkImport
if cscStep.hasKeyOrPut(rootKey,(1,seq[Blob].default)):
cscStep[rootKey][0].inc
let
@ -639,7 +642,7 @@ proc inspectionRunner(
rootKey = root.to(NodeKey)
desc = cscDesc.dup(root,Peer())
for w in accList:
check desc.importAccounts(w.base,w.data,persistent) == OkHexDb
check desc.importAccounts(w.base,w.data,persistent) == OkImport
if cscStep.hasKeyOrPut(rootKey,(1,seq[Blob].default)):
cscStep[rootKey][0].inc
let