Snap sync refactor healing (#1397)

* Simplify accounts healing threshold management

why:
  Was over-engineered.

details:
  Previously, healing was based on recursive hexary trie perusal.

  Due to "cheap" envelope decomposition of a range complement for the
  hexary trie, the cost of running extra laps have become time-affordable
  again and a simple trigger mechanism for healing will do.

* Control number of dangling result nodes in `hexaryInspectTrie()`

also:
+ Returns number of visited nodes available for logging so the maximum
  number of nodes can be tuned accordingly.
+ Some code and docu update

* Update names of constants

why:
  Declutter, more systematic naming

* Re-implemented `worker_desc.merge()` for storage slots

why:
  Provided as proper queue management in `storage_queue_helper`.

details:
+ Several append modes (replaces `merge()`)
+ Added third queue to record entries currently fetched by a worker. So
  another parallel running worker can safe the complete set of storage
  slots in as checkpoint. This was previously lost.

* Refactor healing

why:
  Simplify and remove deep hexary trie perusal for finding completeness.

   Due to "cheap" envelope decomposition of a range complement for the
   hexary trie, the cost of running extra laps have become time-affordable
   again and a simple trigger mechanism for healing will do.

* Docu update

* Run a storage job only once in download loop

why:
  Download failure or rejection (i.e. missing data) lead to repeated
  fetch requests until peer disconnects, otherwise.
This commit is contained in:
Jordan Hrycaj 2022-12-24 09:54:18 +00:00 committed by GitHub
parent e36d2f432a
commit 88b315bb41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 965 additions and 1445 deletions

View File

@ -41,26 +41,30 @@ const
# --------------
snapRequestBytesLimit* = 2 * 1024 * 1024
## Soft bytes limit to request in `snap` protocol calls.
fetchRequestBytesLimit* = 2 * 1024 * 1024
## Soft bytes limit to request in `snap/1` protocol calls.
snapRequestTrieNodesFetchMax* = 1024
## Informal maximal number of trie nodes to fetch at once in `snap`
fetchRequestTrieNodesMax* = 1024
## Informal maximal number of trie nodes to fetch at once in `snap/1`
## protocol calls. This is not an official limit but found with several
## implementations (e.g. Geth.)
##
## Resticting the fetch list length early allows to better paralellise
## Resticting the fetch list length early allows to better parallelise
## healing.
fetchRequestStorageSlotsMax* = 2 * 1024
## Maximal number of storage tries to fetch with a single request message.
snapAccountsSaveProcessedChunksMax* = 1000
# --------------
accountsSaveProcessedChunksMax* = 1000
## Recovery data are stored if the processed ranges list contains no more
## than this many range *chunks*.
##
## If the range set is too much fragmented, no data will be saved and
## restart has to perform from scratch or an earlier checkpoint.
snapAccountsSaveStorageSlotsMax* = 20_000
accountsSaveStorageSlotsMax* = 20_000
## Recovery data are stored if the oustanding storage slots to process do
## not amount to more than this many entries.
##
@ -68,10 +72,12 @@ const
## has to perform from scratch or an earlier checkpoint.
snapStorageSlotsFetchMax* = 2 * 1024
## Maximal number of storage tries to fetch with a single message.
storageSlotsTrieInheritPerusalMax* = 30_000
## Maximal number of nodes to visit in order to find out whether this
## storage slots trie is complete. This allows to *inherit* the full trie
## for an existing root node if the trie is small enough.
snapStorageSlotsQuPrioThresh* = 5_000
storageSlotsQuPrioThresh* = 5_000
## For a new worker, prioritise processing the storage slots queue over
## processing accounts if the queue has more than this many items.
##
@ -81,68 +87,33 @@ const
# --------------
swapInAccountsCoverageTrigger* = 0.30
## Similar to `healAccountsCoverageTrigger` below only for trying to
## swap in from earlier pivots.
swapInAccountsPivotsMin* = 2
## Require at least this man pivots available before any swap in can
## take place (must be at least 2.) This value is typically combined
## with `swapInAccountsCoverageTrigger`.
# --------------
healInspectionBatch* = 10_000
## Number of nodes to inspect in a single batch. In between batches, a
## task/thread switch is allowed.
healInspectionBatchWaitNanoSecs* = 500
## Wait some time asynchroneously after processing `healInspectionBatch`
## nodes to allow for a pseudo -task switch.
healAccountsCoverageTrigger* = 1.3
healAccountsCoverageTrigger* = 1.01
## Apply accounts healing if the global snap download coverage factor
## exceeds this setting. The global coverage factor is derived by merging
## all account ranges retrieved for all pivot state roots (see
## `coveredAccounts` in `CtxData`.)
##
## A small value of this constant leads to early healing. This produces
## stray leaf account records so fragmenting larger intervals of missing
## account ranges. This in turn leads to smaller but more range requests
## over the network. More requests might be a disadvantage if peers only
## serve a maximum number requests (rather than data.)
## `coveredAccounts` in the object `CtxData`.) Note that a coverage factor
## greater than 100% is not exact but rather a lower bound estimate.
healAccountsPivotTriggerMinFactor* = 0.17
## Additional condition to meet before starting healing. The current
## pivot must have at least this much processed as recorded in the
## `processed` ranges set. This is the minimim value (see below.)
healAccountsPivotTriggerWeight* = 0.01
healAccountsPivotTriggerNMax* = 10
## Enable healing not before the `processed` ranges set fill factor has
## at least the following value.
## ::
## MinFactor + max(0, NMax - pivotTable.len) * Weight
healAccountsInspectionPlanBLevel* = 4
## Search this level deep for missing nodes if `hexaryEnvelopeDecompose()`
## only produces existing nodes.
##
## (the `healAccountsPivotTrigger` prefix of the constant names is ommited.)
##
## This effects in favouring late healing when more pivots have been
## downloaded.
## The maximal number of nodes visited at level 3 is *4KiB* and at level 4
## is *64Kib*.
healAccountsBatchFetchMax* = 10 * 1024
healAccountsBatchMax* = 10 * 1024
## Keep on gloing in healing task up until this many nodes have been
## fetched from the network or some error contition terminates the task.
##
## This constant should be larger than `snapStorageSlotsFetchMax`
## This constant should be larger than `fetchRequestStorageSlotsMax`
healSlorageSlotsTrigger* = 0.70
## Consider per account storage slost healing if a per-account hexary
## sub-trie has reached this factor of completeness.
healStorageSlotsInspectionBatch* = 10_000
## Similar to `healAccountsInspectionBatch` but for storage slots.
healStorageSlotsInspectionPlanBLevel* = 4
## Similar to `healAccountsInspectionPlanBLevel`
healStorageSlotsBatchMax* = 32
## Maximal number of storage tries to to heal in a single batch run. Only
@ -177,9 +148,19 @@ const
## Set 0 to disable.
static:
doAssert 1 < swapInAccountsPivotsMin
doAssert snapStorageSlotsQuPrioThresh < snapAccountsSaveStorageSlotsMax
doAssert snapStorageSlotsFetchMax < healAccountsBatchFetchMax
doAssert storageSlotsQuPrioThresh < accountsSaveStorageSlotsMax
doAssert fetchRequestTrieNodesMax < healAccountsBatchMax
# Deprecated, to be expired
const
healInspectionBatch* = 10_000
## Number of nodes to inspect in a single batch. In between batches, a
## task/thread switch is allowed.
healInspectionBatchWaitNanoSecs* = 500
## Wait some time asynchroneously after processing `healInspectionBatch`
## nodes to allow for a pseudo -task switch.
# ------------------------------------------------------------------------------
# End

View File

@ -289,43 +289,35 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} =
nSlotLists = env.nSlotLists
processed = env.fetchAccounts.processed.fullFactor.toPC(2)
nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
accHealThresh = env.healThresh.toPC(2)
trace "Multi sync runner", peer, pivot, nAccounts, nSlotLists, processed,
nStoQu, accHealThresh
nStoQu
# This one is the syncing work horse which downloads the database
await env.execSnapSyncAction(buddy)
if env.archived:
let
peer = buddy.peer
nAccounts = env.nAccounts
nSlotLists = env.nSlotLists
when extraTraceMessages:
trace "Mothballing", peer, pivot=("#" & $env.stateHeader.blockNumber),
nAccounts=env.nAccounts, nSlotLists=env.nSlotLists
env.pivotMothball()
return # pivot has changed
# Various logging entries (after accounts and storage slots download)
let
nAccounts = env.nAccounts
nSlotLists = env.nSlotLists
processed = env.fetchAccounts.processed.fullFactor.toPC(2)
nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
block:
if env.archived:
# Archive pivot if it became stale
when extraTraceMessages:
trace "Mothballing", peer, pivot, nAccounts, nSlotLists
env.pivotMothball()
else:
# Save state so sync can be partially resumed at next start up
let
nAccounts = env.nAccounts
nSlotLists = env.nSlotLists
processed = env.fetchAccounts.processed.fullFactor.toPC(2)
nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
accHealThresh = env.healThresh.toPC(2)
rc = env.saveCheckpoint(ctx)
let rc = env.saveCheckpoint(ctx)
if rc.isErr:
error "Failed to save recovery checkpoint", peer, pivot, nAccounts,
nSlotLists, processed, nStoQu, error=rc.error
else:
when extraTraceMessages:
trace "Saved recovery checkpoint", peer, pivot, nAccounts, nSlotLists,
processed, nStoQu, blobSize=rc.value, accHealThresh
if buddy.ctrl.stopped:
return # peer worker has gone
processed, nStoQu, blobSize=rc.value
# ------------------------------------------------------------------------------
# End

View File

@ -45,7 +45,7 @@ proc getAccountRangeReq(
peer = buddy.peer
try:
let reply = await peer.getAccountRange(
root, iv.minPt.to(Hash256), iv.maxPt.to(Hash256), snapRequestBytesLimit)
root, iv.minPt.to(Hash256), iv.maxPt.to(Hash256), fetchRequestBytesLimit)
return ok(reply)
except CatchableError as e:
trace trSnapRecvError & "waiting for GetAccountRange reply", peer, pivot,
@ -66,8 +66,7 @@ proc getAccountRange*(
let
peer = buddy.peer
if trSnapTracePacketsOk:
trace trSnapSendSending & "GetAccountRange", peer, pivot,
accRange=iv, bytesLimit=snapRequestBytesLimit
trace trSnapSendSending & "GetAccountRange", peer, pivot, accRange=iv
var dd = block:
let rc = await buddy.getAccountRangeReq(stateRoot, iv, pivot)

View File

@ -61,13 +61,13 @@ proc getStorageRangesReq(
root, accounts,
# here the interval bounds are an `array[32,byte]`
iv.get.minPt.to(Hash256).data, iv.get.maxPt.to(Hash256).data,
snapRequestBytesLimit)
fetchRequestBytesLimit)
else:
reply = await peer.getStorageRanges(
root, accounts,
# here the interval bounds are of empty `Blob` type
emptyBlob, emptyBlob,
snapRequestBytesLimit)
fetchRequestBytesLimit)
return ok(reply)
except CatchableError as e:
@ -101,8 +101,7 @@ proc getStorageRanges*(
return err(ComEmptyAccountsArguments)
if trSnapTracePacketsOk:
trace trSnapSendSending & "GetStorageRanges", peer, pivot,
nAccounts, bytesLimit=snapRequestBytesLimit
trace trSnapSendSending & "GetStorageRanges", peer, pivot, nAccounts
let
iv = accounts[0].subRange

View File

@ -43,7 +43,8 @@ proc getTrieNodesReq(
let
peer = buddy.peer
try:
let reply = await peer.getTrieNodes(stateRoot, paths, snapRequestBytesLimit)
let reply = await peer.getTrieNodes(
stateRoot, paths, fetchRequestBytesLimit)
return ok(reply)
except CatchableError as e:
@ -74,8 +75,7 @@ proc getTrieNodes*(
let nTotal = paths.mapIt(it.len).foldl(a+b, 0)
if trSnapTracePacketsOk:
trace trSnapSendSending & "GetTrieNodes", peer, pivot,
nPaths, nTotal, bytesLimit=snapRequestBytesLimit
trace trSnapSendSending & "GetTrieNodes", peer, pivot, nPaths, nTotal
let trieNodes = block:
let rc = await buddy.getTrieNodesReq(stateRoot, paths, pivot)

View File

@ -134,21 +134,6 @@ type
nodeKey*: RepairKey ## Leaf hash into hexary repair table
payload*: Blob ## Data payload
TrieNodeStatCtxRef* = ref object
## Context to resume searching for dangling links
case persistent*: bool
of true:
hddCtx*: seq[(NodeKey,NibblesSeq)]
else:
memCtx*: seq[(RepairKey,NibblesSeq)]
TrieNodeStat* = object
## Trie inspection report
dangling*: seq[NodeSpecs] ## Referes to nodes with incomplete refs
level*: int ## Maximum nesting depth of dangling nodes
stopped*: bool ## Potential loop detected if `true`
resumeCtx*: TrieNodeStatCtxRef ## Context for resuming inspection
HexaryTreeDbRef* = ref object
## Hexary trie plus helper structures
tab*: Table[RepairKey,RNodeRef] ## key-value trie table, in-memory db
@ -294,14 +279,6 @@ proc ppImpl(db: HexaryTreeDbRef; root: NodeKey): seq[string] =
except Exception as e:
result &= " ! Ooops ppImpl(): name=" & $e.name & " msg=" & e.msg
proc ppDangling(a: seq[NodeSpecs]; maxItems = 30): string =
proc ppBlob(w: Blob): string =
w.mapIt(it.toHex(2)).join.toLowerAscii
let
q = a.mapIt(it.partialPath.ppBlob)[0 ..< min(maxItems,a.len)]
andMore = if maxItems < a.len: ", ..[#" & $a.len & "].." else: ""
"{" & q.join(",") & andMore & "}"
# ------------------------------------------------------------------------------
# Public debugging helpers
# ------------------------------------------------------------------------------
@ -347,13 +324,6 @@ proc pp*(db: HexaryTreeDbRef; indent=4): string =
## varinat of `pp()` above
db.ppImpl(NodeKey.default).join(indent.toPfx)
proc pp*(a: TrieNodeStat; db: HexaryTreeDbRef; maxItems = 30): string =
result = "(" & $a.level
if a.stopped:
result &= "stopped,"
result &= $a.dangling.len & "," &
a.dangling.ppDangling(maxItems) & ")"
# ------------------------------------------------------------------------------
# Public constructor (or similar)
# ------------------------------------------------------------------------------

View File

@ -9,7 +9,7 @@
# except according to those terms.
import
std/tables,
std/[sequtils, strutils, tables],
chronicles,
eth/[common, trie/nibbles],
stew/results,
@ -21,12 +21,41 @@ import
logScope:
topics = "snap-db"
type
TrieNodeStatCtxRef* = ref object
## Context to resume searching for dangling links
case persistent*: bool
of true:
hddCtx*: seq[(NodeKey,NibblesSeq)]
else:
memCtx*: seq[(RepairKey,NibblesSeq)]
TrieNodeStat* = object
## Trie inspection report
dangling*: seq[NodeSpecs] ## Referes to nodes with incomplete refs
count*: uint64 ## Number of nodes visited
level*: uint8 ## Maximum nesting depth of dangling nodes
stopped*: bool ## Potential loop detected if `true`
resumeCtx*: TrieNodeStatCtxRef ## Context for resuming inspection
const
extraTraceMessages = false # or true
when extraTraceMessages:
import stew/byteutils
# ------------------------------------------------------------------------------
# Private helpers, debugging
# ------------------------------------------------------------------------------
proc ppDangling(a: seq[NodeSpecs]; maxItems = 30): string =
proc ppBlob(w: Blob): string =
w.mapIt(it.toHex(2)).join.toLowerAscii
let
q = a.mapIt(it.partialPath.ppBlob)[0 ..< min(maxItems,a.len)]
andMore = if maxItems < a.len: ", ..[#" & $a.len & "].." else: ""
"{" & q.join(",") & andMore & "}"
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
@ -114,34 +143,45 @@ proc to*(resumeCtx: TrieNodeStatCtxRef; T: type seq[NodeSpecs]): T =
proc hexaryInspectTrie*(
db: HexaryTreeDbRef; ## Database
root: NodeKey; ## State root
paths: seq[Blob] = @[]; ## Starting paths for search
resumeCtx: TrieNodeStatCtxRef = nil; ## Context for resuming inspection
suspendAfter = high(uint64); ## To be resumed
stopAtLevel = 64; ## Instead of loop detector
db: HexaryTreeDbRef; # Database
root: NodeKey; # State root
partialPaths: seq[Blob] = @[]; # Starting paths for search
resumeCtx: TrieNodeStatCtxRef = nil; # Context for resuming inspection
suspendAfter = high(uint64); # To be resumed
stopAtLevel = 64u8; # Width-first depth level
maxDangling = high(int); # Maximal number of dangling results
): TrieNodeStat
{.gcsafe, raises: [Defect,KeyError]} =
## Starting with the argument list `paths`, find all the non-leaf nodes in
## the hexary trie which have at least one node key reference missing in
## the trie database. The references for these nodes are collected and
## returned.
## * Search list `paths` argument entries that do not refer to a hexary node
## are ignored.
## * For any search list `paths` argument entry, this function stops if
## the search depth exceeds `stopAtLevel` levels of linked sub-nodes.
## * Argument `paths` list entries and partial paths on the way that do not
## refer to a valid extension or branch type node are silently ignored.
##
## Trie inspection can be automatically suspended after having visited
## `suspendAfter` nodes to be resumed at the last state. An application of
## this feature would look like
## ::
## var ctx = TrieNodeStatCtxRef()
## while not ctx.isNil:
## let state = hexaryInspectTrie(db, root, paths, resumeCtx=ctx, 1024)
## ...
## ctx = state.resumeCtx
## * Argument `partialPaths` list entries that do not refer to an existing
## and allocated hexary trie node are silently ignored. So are enytries
## that not refer to either a valid extension or a branch type node.
##
## * This function traverses the hexary trie in *width-first* mode
## simultaneously for any entry of the argument `partialPaths` list. Abart
## from completing the search there are three conditions when the search
## pauses to return the current state (via `resumeCtx`, see next bullet
## point):
## + The depth level of the running algorithm exceeds `stopAtLevel`.
## + The number of visited nodes exceeds `suspendAfter`.
## + Te number of cunnently collected dangling nodes exceeds `maxDangling`.
## If the function pauses because the current depth exceeds `stopAtLevel`
## then the `stopped` flag of the result object will be set, as well.
##
## * When paused for some of the reasons listed above, the `resumeCtx` field
## of the result object contains the current state so that the function
## can resume searching from where is paused. An application using this
## feature could look like:
## ::
## var ctx = TrieNodeStatCtxRef()
## while not ctx.isNil:
## let state = hexaryInspectTrie(db, root, paths, resumeCtx=ctx, 1024)
## ...
## ctx = state.resumeCtx
##
let rootKey = root.to(RepairKey)
if not db.tab.hasKey(rootKey):
@ -150,7 +190,6 @@ proc hexaryInspectTrie*(
var
reVisit: seq[(RepairKey,NibblesSeq)]
again: seq[(RepairKey,NibblesSeq)]
numActions = 0u64
resumeOk = false
# Initialise lists from previous session
@ -160,30 +199,32 @@ proc hexaryInspectTrie*(
resumeOk = true
reVisit = resumeCtx.memCtx
if paths.len == 0 and not resumeOk:
if partialPaths.len == 0 and not resumeOk:
reVisit.add (rootKey,EmptyNibbleRange)
else:
# Add argument paths
for w in paths:
for w in partialPaths:
let (isLeaf,nibbles) = hexPrefixDecode w
if not isLeaf:
let rc = nibbles.hexaryPathNodeKey(rootKey, db, missingOk=false)
if rc.isOk:
reVisit.add (rc.value.to(RepairKey), nibbles)
while 0 < reVisit.len and numActions <= suspendAfter:
# Stopping on `suspendAfter` has precedence over `stopAtLevel`
while 0 < reVisit.len and result.count <= suspendAfter:
if stopAtLevel < result.level:
result.stopped = true
break
for n in 0 ..< reVisit.len:
let (rKey,parentTrail) = reVisit[n]
if suspendAfter < numActions:
if suspendAfter < result.count or
maxDangling <= result.dangling.len:
# Swallow rest
again = again & reVisit[n ..< reVisit.len]
again &= reVisit[n ..< reVisit.len]
break
let
(rKey, parentTrail) = reVisit[n]
node = db.tab[rKey]
parent = rKey.convertTo(NodeKey)
@ -203,7 +244,7 @@ proc hexaryInspectTrie*(
# Ooops, forget node and key
discard
numActions.inc
result.count.inc
# End `for`
result.level.inc
@ -218,12 +259,13 @@ proc hexaryInspectTrie*(
proc hexaryInspectTrie*(
getFn: HexaryGetFn; ## Database abstraction
rootKey: NodeKey; ## State root
paths: seq[Blob] = @[]; ## Starting paths for search
resumeCtx: TrieNodeStatCtxRef = nil; ## Context for resuming inspection
suspendAfter = high(uint64); ## To be resumed
stopAtLevel = 64; ## Instead of loop detector
getFn: HexaryGetFn; # Database abstraction
rootKey: NodeKey; # State root
partialPaths: seq[Blob] = @[]; # Starting paths for search
resumeCtx: TrieNodeStatCtxRef = nil; # Context for resuming inspection
suspendAfter = high(uint64); # To be resumed
stopAtLevel = 64u8; # Width-first depth level
maxDangling = high(int); # Maximal number of dangling results
): TrieNodeStat
{.gcsafe, raises: [Defect,RlpError]} =
## Variant of `hexaryInspectTrie()` for persistent database.
@ -240,7 +282,6 @@ proc hexaryInspectTrie*(
var
reVisit: seq[(NodeKey,NibblesSeq)]
again: seq[(NodeKey,NibblesSeq)]
numActions = 0u64
resumeOk = false
# Initialise lists from previous session
@ -250,18 +291,19 @@ proc hexaryInspectTrie*(
resumeOk = true
reVisit = resumeCtx.hddCtx
if paths.len == 0 and not resumeOk:
if partialPaths.len == 0 and not resumeOk:
reVisit.add (rootKey,EmptyNibbleRange)
else:
# Add argument paths
for w in paths:
for w in partialPaths:
let (isLeaf,nibbles) = hexPrefixDecode w
if not isLeaf:
let rc = nibbles.hexaryPathNodeKey(rootKey, getFn, missingOk=false)
if rc.isOk:
reVisit.add (rc.value, nibbles)
while 0 < reVisit.len and numActions <= suspendAfter:
# Stopping on `suspendAfter` has precedence over `stopAtLevel`
while 0 < reVisit.len and result.count <= suspendAfter:
when extraTraceMessages:
trace "Hexary inspect processing", nPaths, maxLeafPaths,
level=result.level, nReVisit=reVisit.len, nDangling=result.dangling.len
@ -271,13 +313,15 @@ proc hexaryInspectTrie*(
break
for n in 0 ..< reVisit.len:
let (parent,parentTrail) = reVisit[n]
if suspendAfter < numActions:
if suspendAfter < result.count or
maxDangling <= result.dangling.len:
# Swallow rest
again = again & reVisit[n ..< reVisit.len]
break
let parentBlob = parent.to(Blob).getFn()
let
(parent, parentTrail) = reVisit[n]
parentBlob = parent.to(Blob).getFn()
if parentBlob.len == 0:
# Ooops, forget node and key
continue
@ -301,7 +345,7 @@ proc hexaryInspectTrie*(
# Ooops, forget node and key
discard
numActions.inc
result.count.inc
# End `for`
result.level.inc
@ -319,6 +363,17 @@ proc hexaryInspectTrie*(
level=result.level, nResumeCtx=reVisit.len, nDangling=result.dangling.len,
maxLevel=stopAtLevel, stopped=result.stopped
# ------------------------------------------------------------------------------
# Public functions, debugging
# ------------------------------------------------------------------------------
proc pp*(a: TrieNodeStat; db: HexaryTreeDbRef; maxItems = 30): string =
result = "(" & $a.level
if a.stopped:
result &= "stopped,"
result &= $a.dangling.len & "," &
a.dangling.ppDangling(maxItems) & ")"
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -42,9 +42,6 @@ proc pp(w: seq[RPathXStep]; db: HexaryTreeDbRef; indent = 4): string =
let pfx = "\n" & " ".repeat(indent)
w.mapIt(it.pp(db)).join(pfx)
proc pp(rc: Result[TrieNodeStat, HexaryError]; db: HexaryTreeDbRef): string =
if rc.isErr: $rc.error else: rc.value.pp(db)
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------

View File

@ -355,10 +355,10 @@ proc hexaryNearbyRight*(
## with non-decreasing path value). This is similar to the
## `hexary_path.next()` function, only that this algorithm does not
## backtrack if there are dangling links in between and rather returns
## a error.
## an error.
##
## This code is intended be used for verifying a left-bound proof to verify
## that there is no leaf node.
## This code is intended to be used for verifying a left-bound proof to
## verify that there is no leaf node *right* of a boundary path value.
# Some easy cases
if path.path.len == 0:
@ -543,8 +543,8 @@ proc hexaryNearbyLeft*(
{.gcsafe, raises: [Defect,KeyError]} =
## Similar to `hexaryNearbyRight()`.
##
## This code is intended be used for verifying a right-bound proof to verify
## that there is no leaf node.
## This code is intended to be used for verifying a right-bound proof to
## verify that there is no leaf node *left* to a boundary path value.
# Some easy cases
if path.path.len == 0:

View File

@ -1,283 +0,0 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Check/analyse DB completeness
## =============================
import
chronicles,
eth/[common, p2p, trie/trie_defs],
stew/keyed_queue,
../../../../utils/prettify,
../../../sync_desc,
"../.."/[range_desc, worker_desc],
"."/[hexary_desc, hexary_error, hexary_inspect,
snapdb_accounts, snapdb_storage_slots]
{.push raises: [Defect].}
logScope:
topics = "snap-db"
const
extraTraceMessages = false or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private logging helpers
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"Check DB " & info
proc accountsCtx(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): string =
let
ctx = buddy.ctx
"{" &
"pivot=" & "#" & $env.stateHeader.blockNumber & "," &
"nAccounts=" & $env.nAccounts & "," &
("covered=" & env.fetchAccounts.processed.fullFactor.toPC(0) & "/" &
ctx.data.coveredAccounts.fullFactor.toPC(0)) & "," &
"nNodesCheck=" & $env.fetchAccounts.nodes.check.len & "," &
"nNodesMissing=" & $env.fetchAccounts.nodes.missing.len & "}"
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc storageSlotsCtx(
buddy: SnapBuddyRef;
storageRoot: Hash256;
env: SnapPivotRef;
): string =
let
ctx = buddy.ctx
rc = block:
let rcx = env.fetchStorageFull.eq(storageRoot)
if rcx.isOk: rcx
else: env.fetchStoragePart.eq(storageRoot)
if rc.isErr:
return "n/a"
let
data = rc.value
slots = data.slots
result = "{" &
"inherit=" & (if data.inherit: "t" else: "f") & ","
if not slots.isNil:
result &= "" &
"covered=" & slots.processed.fullFactor.toPC(0) &
"nNodesCheck=" & $slots.nodes.check.len & "," &
"nNodesMissing=" & $slots.nodes.missing.len
result &= "}"
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc checkStorageSlotsTrie(
buddy: SnapBuddyRef;
accKey: NodeKey;
storageRoot: Hash256;
env: SnapPivotRef;
): Result[bool,HexaryError] =
## Check whether a storage slots hexary trie is complete.
let
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
rc = db.inspectStorageSlotsTrie(peer, accKey, storageRoot)
if rc.isErr:
return err(rc.error)
ok(rc.value.dangling.len == 0)
iterator accountsWalk(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): (NodeKey,Account,HexaryError) =
let
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
stateRoot = env.stateHeader.stateRoot
walk = SnapDbAccountsRef.init(db, stateRoot, peer)
var
accKey = NodeKey.default
count = 0
runOk = true
while runOk:
count.inc
let nextKey = block:
let rc = walk.nextAccountsChainDbKey(accKey)
if rc.isErr:
if rc.error != AccountNotFound:
error logTxt "accounts walk stopped", peer,
account=accKey.to(NodeTag),
ctx=buddy.accountsCtx(env), count, reason=rc.error
runOk = false
continue
rc.value
accKey = nextKey
let accData = block:
let rc = walk.getAccountsData(accKey, persistent = true)
if rc.isErr:
error logTxt "accounts walk error", peer, account=accKey,
ctx=buddy.accountsCtx(env), count, error=rc.error
runOk = false
continue
rc.value
yield (accKey, accData, NothingSerious)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc checkAccountsTrieIsComplete*(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): bool =
## Check whether accounts hexary trie is complete
let
ctx = buddy.ctx
peer = buddy.peer
db = ctx.data.snapDb
rootKey = env.stateHeader.stateRoot.to(NodeKey)
var
error: HexaryError
try:
let stats = db.getAccountFn.hexaryInspectTrie(rootKey)
if not stats.stopped:
return stats.dangling.len == 0
error = TrieLoopAlert
except RlpError:
error = RlpEncoding
except KeyError as e:
raiseAssert "Not possible @ importRawAccountNodes: " & e.msg
except Exception as e:
raiseAssert "Ooops checkAccountsTrieIsComplete(): name=" &
$e.name & " msg=" & e.msg
error logTxt "accounts health check failed", peer,
ctx=buddy.accountsCtx(env), error
return false
proc checkAccountsListOk*(
buddy: SnapBuddyRef;
env: SnapPivotRef;
noisy = false;
): bool =
## Loop over accounts, returns `false` for some error.
let
ctx = buddy.ctx
peer = buddy.peer
var
accounts = 0
storage = 0
nextMsgThresh = 1
for (key,accData,error) in buddy.accountsWalk(env):
if error != NothingSerious:
error logTxt "accounts loop stopped", peer, ctx=buddy.accountsCtx(env),
accounts, storage, error
return false
accounts.inc
if accData.storageRoot != emptyRlpHash:
storage.inc
when extraTraceMessages:
if noisy and nextMsgThresh <= accounts:
debug logTxt "accounts loop check point", peer,
ctx=buddy.accountsCtx(env), accounts, storage
nextMsgThresh *= 2
when extraTraceMessages:
let isComplete = buddy.checkAccountsTrieIsComplete(env)
debug logTxt "accounts list report", peer, ctx=buddy.accountsCtx(env),
accounts, storage, isComplete
true
proc checkStorageSlotsTrieIsComplete*(
buddy: SnapBuddyRef;
accKey: NodeKey;
storageRoot: Hash256;
env: SnapPivotRef;
): bool =
## Check whether a storage slots hexary trie is complete.
let
peer = buddy.peer
rc = buddy.checkStorageSlotsTrie(accKey, storageRoot, env)
if rc.isOk:
return rc.value
when extraTraceMessages:
let nStoQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
debug logTxt "atorage slots health check failed", peer, nStoQueue,
ctx=buddy.storageSlotsCtx(storageRoot, env), error=rc.error
proc checkStorageSlotsTrieIsComplete*(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): bool =
## Check for all accounts thye whether storage slot hexary tries are complete.
let
ctx = buddy.ctx
peer = buddy.peer
var
accounts = 0
incomplete = 0
complete = 0
for (accKey,accData,error) in buddy.accountsWalk(env):
if error != NothingSerious:
let nStoQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
error logTxt "atorage slots accounts loop stopped", peer, nStoQueue,
accounts, incomplete, complete, error
return false
accounts.inc
let storageRoot = accData.storageRoot
if storageRoot == emptyRlpHash:
continue
let rc = buddy.checkStorageSlotsTrie(accKey, storageRoot, env)
if rc.isOk and rc.value:
complete.inc
else:
incomplete.inc
when extraTraceMessages:
let nStoQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
debug logTxt "storage slots report", peer, ctx=buddy.accountsCtx(env),
nStoQueue, accounts, incomplete, complete
0 < accounts and incomplete == 0
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -490,34 +490,6 @@ proc inspectStorageSlotsTrie*(
pathList, resumeCtx, suspendAfter, persistent=true, ignoreError)
proc getStorageSlotsNodeKey*(
ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor
path: Blob; ## Partial node path
persistent = false; ## Read data from disk
): Result[NodeKey,HexaryError] =
## For a partial node path argument `path`, return the raw node key.
var rc: Result[NodeKey,void]
noRlpExceptionOops("getStorageSlotsNodeKey()"):
if persistent:
rc = path.hexarypathNodeKey(ps.root, ps.getStorageSlotsFn)
else:
rc = path.hexarypathNodeKey(ps.root, ps.hexaDb)
if rc.isOk:
return ok(rc.value)
err(NodeNotFound)
proc getStorageSlotsNodeKey*(
pv: SnapDbRef; ## Base descriptor on `ChainDBRef`
peer: Peer; ## For log messages, only
accKey: NodeKey; ## Account key
root: Hash256; ## state root
path: Blob; ## Partial node path
): Result[NodeKey,HexaryError] =
## Variant of `getStorageSlotsNodeKey()` for persistent storage.
SnapDbStorageSlotsRef.init(
pv, accKey, root, peer).getStorageSlotsNodeKey(path, persistent=true)
proc getStorageSlotsData*(
ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor
path: NodeKey; ## Account to visit
@ -553,31 +525,6 @@ proc getStorageSlotsData*(
SnapDbStorageSlotsRef.init(
pv, accKey, root, peer).getStorageSlotsData(path, persistent=true)
proc haveStorageSlotsData*(
ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor
persistent = false; ## Read data from disk
): bool =
## Return `true` if there is at least one intermediate hexary node for this
## accounts storage slots trie.
##
## Caveat: There is no unit test yet
noGenericExOrKeyError("haveStorageSlotsData()"):
if persistent:
return 0 < ps.getStorageSlotsFn()(ps.root.ByteArray32).len
else:
return ps.hexaDb.tab.hasKey(ps.root.to(RepairKey))
proc haveStorageSlotsData*(
pv: SnapDbRef; ## Base descriptor on `ChainDBRef`
peer: Peer, ## For log messages, only
accKey: NodeKey; ## Account key
root: Hash256; ## state root
): bool =
## Variant of `haveStorageSlotsData()` for persistent storage.
SnapDbStorageSlotsRef.init(
pv, accKey, root, peer).haveStorageSlotsData(persistent=true)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -9,8 +9,7 @@
# except according to those terms.
import
std/[math, sequtils],
bearssl/rand,
std/[math, sets, sequtils],
chronos,
eth/[common, trie/trie_defs],
stew/[interval_set, keyed_queue, sorted_set],
@ -18,7 +17,8 @@ import
".."/[constants, range_desc, worker_desc],
./db/[hexary_error, snapdb_accounts, snapdb_pivot],
./pivot/[heal_accounts, heal_storage_slots,
range_fetch_accounts, range_fetch_storage_slots],
range_fetch_accounts, range_fetch_storage_slots,
storage_queue_helper],
./ticker
{.push raises: [Defect].}
@ -27,13 +27,32 @@ const
extraAsserts = false or true
## Enable some asserts
proc pivotAccountsHealingOk*(env: SnapPivotRef;ctx: SnapCtxRef): bool {.gcsafe.}
proc pivotMothball*(env: SnapPivotRef) {.gcsafe.}
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc accountsHealingOk(
env: SnapPivotRef; # Current pivot environment
ctx: SnapCtxRef; # Some global context
): bool =
## Returns `true` if accounts healing is enabled for this pivot.
not env.fetchAccounts.processed.isEmpty and
healAccountsCoverageTrigger <= ctx.pivotAccountsCoverage()
proc coveredAccounts100PcRollOver(
ctx: SnapCtxRef;
) =
## Roll over `coveredAccounts` registry when it reaches 100%.
if ctx.data.coveredAccounts.isFull:
# All of accounts hashes are covered by completed range fetch processes
# for all pivot environments. So reset covering and record full-ness level.
ctx.data.covAccTimesFull.inc
ctx.data.coveredAccounts.clear()
proc init(
batch: SnapRangeBatchRef;
stateRoot: Hash256;
@ -46,11 +65,7 @@ proc init(
# Initialise accounts range fetch batch, the pair of `fetchAccounts[]`
# range sets.
if ctx.data.coveredAccounts.isFull:
# All of accounts hashes are covered by completed range fetch processes
# for all pivot environments. So reset covering and record full-ness level.
ctx.data.covAccTimesFull.inc
ctx.data.coveredAccounts.clear()
ctx.coveredAccounts100PcRollOver()
# Deprioritise already processed ranges by moving it to the second set.
for iv in ctx.data.coveredAccounts.increasing:
@ -96,7 +111,7 @@ proc update*(
# Calculate minimum block distance.
let minBlockDistance = block:
let rc = pivotTable.lastValue
if rc.isOk and rc.value.pivotAccountsHealingOk(ctx):
if rc.isOk and rc.value.accountsHealingOk(ctx):
pivotBlockDistanceThrottledPivotChangeMin
else:
pivotBlockDistanceMin
@ -112,7 +127,6 @@ proc update*(
fetchAccounts: SnapRangeBatchRef())
env.fetchAccounts.init(header.stateRoot, ctx)
env.storageAccounts.init()
var topEnv = env
# Append per-state root environment to LRU queue
if reverse:
@ -124,20 +138,10 @@ proc update*(
let rc = pivotTable.secondKey
if rc.isOk:
pivotTable.del rc.value
# Update healing threshold for top pivot entry
topEnv = pivotTable.lastValue.value
else:
discard pivotTable.lruAppend(
header.stateRoot, env, pivotTableLruEntriesMax)
# Update healing threshold
let
slots = max(0, healAccountsPivotTriggerNMax - pivotTable.len)
delta = slots.float * healAccountsPivotTriggerWeight
topEnv.healThresh = healAccountsPivotTriggerMinFactor + delta
proc tickerStats*(
pivotTable: var SnapPivotTable; # Pivot table
@ -183,8 +187,8 @@ proc tickerStats*(
procChunks = 0
if not env.isNil:
pivotBlock = some(env.stateHeader.blockNumber)
stoQuLen = some(env.fetchStorageFull.len + env.fetchStoragePart.len)
procChunks = env.fetchAccounts.processed.chunks
stoQuLen = some(env.storageQueueTotal())
TickerStats(
pivotBlock: pivotBlock,
@ -223,25 +227,6 @@ proc pivotMothball*(env: SnapPivotRef) =
env.archived = true
proc pivotAccountsHealingOk*(
env: SnapPivotRef; # Current pivot environment
ctx: SnapCtxRef; # Some global context
): bool =
## Returns `true` if accounts healing is enabled for this pivot.
##
# Only start accounts healing if there is some completion level, already.
#
# We check against the global coverage factor, i.e. a measure for how much
# of the total of all accounts have been processed. Even if the hexary trie
# database for the current pivot state root is sparsely filled, there is a
# good chance that it can inherit some unchanged sub-trie from an earlier
# pivot state root download. The healing process then works like sort of
# glue.
if healAccountsCoverageTrigger <= ctx.pivotAccountsCoverage():
if env.healThresh <= env.fetchAccounts.processed.fullFactor:
return true
proc execSnapSyncAction*(
env: SnapPivotRef; # Current pivot environment
buddy: SnapBuddyRef; # Worker peer
@ -253,7 +238,7 @@ proc execSnapSyncAction*(
block:
# Clean up storage slots queue first it it becomes too large
let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
if snapStorageSlotsQuPrioThresh < nStoQu:
if storageSlotsQuPrioThresh < nStoQu:
await buddy.rangeFetchStorageSlots(env)
if buddy.ctrl.stopped or env.archived:
return
@ -261,6 +246,9 @@ proc execSnapSyncAction*(
if not env.fetchAccounts.processed.isFull:
await buddy.rangeFetchAccounts(env)
# Update 100% accounting
ctx.coveredAccounts100PcRollOver()
# Run at least one round fetching storage slosts even if the `archived`
# flag is set in order to keep the batch queue small.
if not buddy.ctrl.stopped:
@ -269,7 +257,7 @@ proc execSnapSyncAction*(
if buddy.ctrl.stopped or env.archived:
return
if env.pivotAccountsHealingOk(ctx):
if env.accountsHealingOk(ctx):
await buddy.healAccounts(env)
if buddy.ctrl.stopped or env.archived:
return
@ -282,7 +270,7 @@ proc execSnapSyncAction*(
# Don't bother with storage slots healing before accounts healing takes
# place. This saves communication bandwidth. The pivot might change soon,
# anyway.
if env.pivotAccountsHealingOk(ctx):
if env.accountsHealingOk(ctx):
await buddy.healStorageSlots(env)
@ -295,12 +283,12 @@ proc saveCheckpoint*(
##
let
fa = env.fetchAccounts
nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
nStoQu = env.storageQueueTotal()
if snapAccountsSaveProcessedChunksMax < fa.processed.chunks:
if accountsSaveProcessedChunksMax < fa.processed.chunks:
return err(TooManyProcessedChunks)
if snapAccountsSaveStorageSlotsMax < nStoQu:
if accountsSaveStorageSlotsMax < nStoQu:
return err(TooManySlotAccounts)
ctx.data.snapDb.savePivot SnapDbPivotRegistry(
@ -310,7 +298,8 @@ proc saveCheckpoint*(
processed: toSeq(env.fetchAccounts.processed.increasing)
.mapIt((it.minPt,it.maxPt)),
slotAccounts: (toSeq(env.fetchStorageFull.nextKeys) &
toSeq(env.fetchStoragePart.nextKeys)).mapIt(it.to(NodeKey)))
toSeq(env.fetchStoragePart.nextKeys)).mapIt(it.to(NodeKey)) &
toSeq(env.parkedStorage.items))
proc recoverPivotFromCheckpoint*(
@ -350,9 +339,7 @@ proc recoverPivotFromCheckpoint*(
discard env.fetchAccounts.processed.reduce pt
env.fetchAccounts.unprocessed.merge pt
elif rc.value.storageRoot != emptyRlpHash:
env.fetchStorageFull.merge AccountSlotsHeader(
accKey: w,
storageRoot: rc.value.storageRoot)
env.storageQueueAppendFull(rc.value.storageRoot, w)
# Handle mothballed pivots for swapping in (see `pivotMothball()`)
if not topLevel:

View File

@ -8,8 +8,8 @@
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Heal accounts DB:
## =================
## Heal accounts DB
## ================
##
## This module is a variation of the `swap-in` module in the sense that it
## searches for missing nodes in the database (which means that links to
@ -22,32 +22,34 @@
## * Run `swapInAccounts()` so that inheritable sub-tries are imported from
## previous pivots.
##
## * Find nodes with envelopes that have no account in common with any range
## interval of the `processed` set of the current pivot. Stop if there are
## no such nodes.
## * Find dangling nodes in the current account trie by trying plan A, and
## continuing with plan B only if A fails.
##
## * Extract the missing nodes from the previous step, i.e. the nodes that
## are known to exist but are not allocated. If all nodes are allocated,
## employ the `hexaryInspect()` function in a limited mode do find dangling
## (i.e. missing) sub-nodes of these allocated nodes. Stop if this function
## fails to find any such nodes.
## A. Try to find nodes with envelopes that have no account in common with
## any range interval of the `processed` set of the accounts trie. This
## action will
## + either determine that there are no such envelopes implying that the
## accounts trie is complete (then stop here)
## + or result in envelopes related to nodes that are all allocated on the
## accounts trie (fail, use *plan B* below)
## + or result in some envelopes related to dangling nodes.
##
## * From the nodes of the previous step, extract non-allocated nodes and
## fetch them from the network.
## B. Employ the `hexaryInspect()` trie perusal function in a limited mode
## for finding dangling (i.e. missing) sub-nodes below the allocated nodes.
##
## * Install that nodes from the network.
##
## * Rinse and repeat
##
## Discussion:
## -----------
## Discussion
## ----------
##
## The worst case scenario in the third step might also be solved by allocating
## more accounts and running this healing algorith again.
## A worst case scenario of a failing *plan B* must be solved by fetching and
## storing more accounts and running this healing algorithm again.
##
## Due to its potentially poor performance there is no way to recursively
## search the whole database hexary trie for more dangling nodes using the
## `hexaryInspect()` function.
## Due to the potentially poor performance using `hexaryInspect()`.there is no
## general solution for *plan B* by recursively searching the whole accounts
## hexary trie database for more dangling nodes.
##
import
std/[math, sequtils, tables],
@ -61,7 +63,7 @@ import
../com/[com_error, get_trie_nodes],
../db/[hexary_desc, hexary_envelope, hexary_error, hexary_inspect,
snapdb_accounts],
./swap_in
"."/[storage_queue_helper, swap_in]
{.push raises: [Defect].}
@ -129,42 +131,55 @@ proc compileMissingNodesList(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): seq[NodeSpecs] =
## Find some missing glue nodes in current database to be fetched
## individually.
## Find some missing glue nodes in accounts database to be fetched.
let
ctx = buddy.ctx
peer = buddy.peer
rootKey = env.stateHeader.stateRoot.to(NodeKey)
getFn = ctx.data.snapDb.getAccountFn
fa = env.fetchAccounts
# Import from earlier run
while buddy.swapInAccounts(env) != 0:
discard
while buddy.ctx.swapInAccounts(env) != 0:
if buddy.ctrl.stopped:
return
var nodes: seq[NodeSpecs]
noExceptionOops("getMissingNodesList"):
noExceptionOops("compileMissingNodesList"):
# Get unallocated nodes to be fetched
let rc = fa.processed.hexaryEnvelopeDecompose(rootKey, getFn)
if rc.isOk:
nodes = rc.value
# Check whether the hexary trie is complete
if nodes.len == 0:
# Fill gaps
discard fa.processed.merge(low(NodeTag),high(NodeTag))
fa.unprocessed.clear()
return
# Remove allocated nodes
let missingNodes = nodes.filterIt(it.nodeKey.ByteArray32.getFn().len == 0)
if 0 < missingNodes.len:
let missing = nodes.filterIt(it.nodeKey.ByteArray32.getFn().len == 0)
if 0 < missing.len:
when extraTraceMessages:
trace logTxt "missing nodes", ctx=buddy.healingCtx(env),
nResult=missingNodes.len, result=missingNodes.toPC
return missingNodes
trace logTxt "missing nodes", peer, ctx=buddy.healingCtx(env),
nResult=missing.len, result=missing.toPC
return missing
# Plan B, carefully employ `hexaryInspect()`
if 0 < nodes.len:
try:
let stats = getFn.hexaryInspectTrie(
rootKey, nodes.mapIt(it.partialPath), suspendAfter=healInspectionBatch)
if 0 < stats.dangling.len:
trace logTxt "missing nodes (plan B)", ctx=buddy.healingCtx(env),
let
paths = nodes.mapIt it.partialPath
stats = getFn.hexaryInspectTrie(rootKey, paths,
stopAtLevel = healAccountsInspectionPlanBLevel,
maxDangling = fetchRequestTrieNodesMax)
result = stats.dangling
when extraTraceMessages:
trace logTxt "missing nodes (plan B)", peer, ctx=buddy.healingCtx(env),
nLevel=stats.level, nVisited=stats.count,
nResult=stats.dangling.len, result=stats.dangling.toPC
return stats.dangling
except:
discard
@ -184,7 +199,7 @@ proc fetchMissingNodes(
pivot = "#" & $env.stateHeader.blockNumber # for logging
nMissingNodes= missingNodes.len
nFetchNodes = max(0, nMissingNodes - snapRequestTrieNodesFetchMax)
nFetchNodes = max(0, nMissingNodes - fetchRequestTrieNodesMax)
# There is no point in fetching too many nodes as it will be rejected. So
# rest of the `missingNodes` list is ignored to be picked up later.
@ -276,9 +291,7 @@ proc registerAccountLeaf(
# Update storage slots batch
if acc.storageRoot != emptyRlpHash:
env.fetchStorageFull.merge AccountSlotsHeader(
acckey: accKey,
storageRoot: acc.storageRoot)
env.storageQueueAppendFull(acc.storageRoot, accKey)
# ------------------------------------------------------------------------------
# Private functions: do the healing for one round
@ -297,9 +310,12 @@ proc accountsHealingImpl(
peer = buddy.peer
fa = env.fetchAccounts
# Update for changes since last visit
missingNodes = buddy.compileMissingNodesList(env)
# Import from earlier runs (if any)
while ctx.swapInAccounts(env) != 0:
discard
# Update for changes since last visit
let missingNodes = buddy.compileMissingNodesList(env)
if missingNodes.len == 0:
# Nothing to do
trace logTxt "nothing to do", peer, ctx=buddy.healingCtx(env)
@ -370,8 +386,8 @@ proc healAccounts*(
var
nNodesFetched = 0
nFetchLoop = 0
# Stop after `healAccountsBatchFetchMax` nodes have been fetched
while nNodesFetched < healAccountsBatchFetchMax:
# Stop after `healAccountsBatchMax` nodes have been fetched
while nNodesFetched < healAccountsBatchMax:
var nNodes = await buddy.accountsHealingImpl(env)
if nNodes <= 0:
break

View File

@ -8,34 +8,63 @@
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Heal storage DB:
## ================
## Heal storage slots DB
## =====================
##
## This module works similar to `heal_accounts` applied to each per-account
## storage slots hexary trie. These per-account trie work items are stored in
## the pair of queues `env.fetchStorageFull` and `env.fetchStoragePart`.
## the queue `env.fetchStoragePart`.
##
## There is one additional short cut for speeding up processing. If a
## per-account storage slots hexary trie is marked inheritable, it will be
## checked whether it is complete and can be used wholesale.
## Theere is another such queue `env.fetchStorageFull` which is not used here.
##
## Inheritable tries appear after a pivot state root change. Typically, not all
## account data have changed and so the same per-account storage slots are
## valid.
## In order to be able to checkpoint the current list of storage accounts (by
## a parallel running process), unfinished storage accounts are temporarily
## held in the set `env.parkedStorage`.
##
## Algorithm applied to each entry of `env.fetchStoragePart`
## --------------------------------------------------------
##
## * Find dangling nodes in the current slot trie by trying plan A, and
## continuing with plan B only if A fails.
##
## A. Try to find nodes with envelopes that have no slot in common with
## any range interval of the `processed` set of the current slot trie. This
## action will
## + either determine that there are no such envelopes implying that the
## current slot trie is complete (then stop here)
## + or result in envelopes related to nodes that are all allocated on the
## current slot trie (fail, use *plan B* below)
## + or result in some envelopes related to dangling nodes.
##
## B. Employ the `hexaryInspect()` trie perusal function in a limited mode
## for finding dangling (i.e. missing) sub-nodes below the allocated nodes.
##
## * Install that nodes from the network.
##
## * Rinse and repeat
##
## Discussion
## ----------
##
## A worst case scenario of a failing *plan B* must be solved by fetching
## and storing more slots and running this healing algorithm again.
##
## Due to the potentially poor performance using `hexaryInspect()`.there is
## no general solution for *plan B* by recursively searching the whole slot
## hexary trie database for more dangling nodes.
##
import
std/[sequtils, tables],
std/[math, sequtils, tables],
chronicles,
chronos,
eth/[common, p2p, trie/nibbles, trie/trie_defs, rlp],
stew/[interval_set, keyed_queue],
stew/[byteutils, interval_set, keyed_queue],
../../../../utils/prettify,
../../../sync_desc,
"../../.."/[sync_desc, types],
"../.."/[constants, range_desc, worker_desc],
../com/[com_error, get_trie_nodes],
../db/[hexary_desc, hexary_error, snapdb_storage_slots],
./sub_tries_helper
../db/[hexary_desc, hexary_envelope, hexary_inspect, snapdb_storage_slots],
./storage_queue_helper
{.push raises: [Defect].}
@ -53,136 +82,132 @@ const
template logTxt(info: static[string]): static[string] =
"Storage slots healing " & info
proc `$`(node: NodeSpecs): string =
node.partialPath.toHex
proc `$`(rs: NodeTagRangeSet): string =
rs.fullFactor.toPC(0)
proc `$`(iv: NodeTagRange): string =
iv.fullFactor.toPC(3)
proc toPC(w: openArray[NodeSpecs]; n: static[int] = 3): string =
let sumUp = w.mapIt(it.hexaryEnvelope.len).foldl(a+b, 0.u256)
(sumUp.to(float) / (2.0^256)).toPC(n)
proc healingCtx(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): string =
"{" &
"pivot=" & "#" & $env.stateHeader.blockNumber & "," &
"runState=" & $buddy.ctrl.state & "," &
"nStoQu=" & $env.storageQueueTotal() & "," &
"nSlotLists=" & $env.nSlotLists & "}"
proc healingCtx(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
env: SnapPivotRef;
): string =
let slots = kvp.data.slots
"{" &
"pivot=" & "#" & $env.stateHeader.blockNumber & "," &
"covered=" & slots.unprocessed.emptyFactor.toPC(0) & "," &
"nNodesCheck=" & $slots.nodes.check.len & "," &
"nNodesMissing=" & $slots.nodes.missing.len & "}"
"runState=" & $buddy.ctrl.state & "," &
"covered=" & $kvp.data.slots.processed & "," &
"nStoQu=" & $env.storageQueueTotal() & "," &
"nSlotLists=" & $env.nSlotLists & "}"
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
#template discardRlpError(info: static[string]; code: untyped) =
# try:
# code
# except RlpError as e:
# discard
template noExceptionOops(info: static[string]; code: untyped) =
try:
code
except Defect as e:
raise e
except Exception as e:
raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc acceptWorkItemAsIs(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
): Result[bool,HexaryError] =
## Check whether this work item is done and the corresponding storage trie
## can be completely inherited.
if kvp.data.inherit:
let
ctx = buddy.ctx
peer = buddy.peer
db = ctx.data.snapDb
accKey = kvp.data.accKey
storageRoot = kvp.key
rc = db.inspectStorageSlotsTrie(peer, accKey, storageRoot)
# Check whether the hexary trie is complete
if rc.isOk:
return ok(rc.value.dangling.len == 0)
return err(rc.error)
ok(false)
proc verifyStillMissingNodes(
proc compileMissingNodesList(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
env: SnapPivotRef;
) =
## Check whether previously missing nodes from the `nodes.missing` list
## have been magically added to the database since it was checked last
## time. These nodes will me moved to `nodes.check` for further processing.
): seq[NodeSpecs] =
## Find some missing glue nodes in storage slots database to be fetched.
let
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
accKey = kvp.data.accKey
storageRoot = kvp.key
slots = kvp.data.slots
rootKey = kvp.key.to(NodeKey)
getFn = ctx.data.snapDb.getStorageSlotsFn(kvp.data.accKey)
var delayed: seq[NodeSpecs]
for w in slots.nodes.missing:
let rc = db.getStorageSlotsNodeKey(peer, accKey, storageRoot, w.partialPath)
if rc.isOk:
# Check nodes for dangling links
slots.nodes.check.add w
else:
# Node is still missing
delayed.add w
var nodes: seq[NodeSpecs]
noExceptionOops("compileMissingNodesList"):
if not slots.processed.isEmpty:
# Get unallocated nodes to be fetched
let rc = slots.processed.hexaryEnvelopeDecompose(rootKey, getFn)
if rc.isOk:
nodes = rc.value
# Must not modify sequence while looping over it
slots.nodes.missing = slots.nodes.missing & delayed
# Check whether the hexary trie is complete
if nodes.len == 0:
# Fill gaps
discard slots.processed.merge(low(NodeTag),high(NodeTag))
slots.unprocessed.clear()
return
# Remove allocated nodes
let missing = nodes.filterIt(it.nodeKey.ByteArray32.getFn().len == 0)
if 0 < missing.len:
when extraTraceMessages:
trace logTxt "missing nodes", peer, ctx=buddy.healingCtx(kvp,env),
nResult=missing.len, result=missing.toPC
return missing
# Plan B, carefully employ `hexaryInspect()`
if 0 < nodes.len:
try:
let
paths = nodes.mapIt it.partialPath
stats = getFn.hexaryInspectTrie(rootKey, paths,
stopAtLevel = healStorageSlotsInspectionPlanBLevel,
maxDangling = fetchRequestTrieNodesMax)
result = stats.dangling
when extraTraceMessages:
trace logTxt "missing nodes (plan B)", peer,
ctx=buddy.healingCtx(kvp,env), nLevel=stats.level,
nVisited=stats.count, nResult=stats.dangling.len
except:
discard
proc updateMissingNodesList(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
env: SnapPivotRef;
): Future[bool]
{.async.} =
## Starting with a given set of potentially dangling intermediate trie nodes
## `nodes.check`, this set is filtered and processed. The outcome is fed back
## to the vey same list `nodes.check`.
let
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
accKey = kvp.data.accKey
storageRoot = kvp.key
slots = kvp.data.slots
let rc = await db.getStorageSlotsFn(accKey).subTriesFromPartialPaths(
storageRoot, # State root related to storage slots
slots, # Storage slots download specs
snapRequestTrieNodesFetchMax) # Maxinmal datagram request size
if rc.isErr:
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
if rc.error == TrieIsLockedForPerusal:
trace logTxt "failed", peer, itCtx=buddy.healingCtx(kvp,env),
nSlotLists=env.nSlotLists, nStorageQueue, error=rc.error
else:
error logTxt "failed => stop", peer, itCtx=buddy.healingCtx(kvp,env),
nSlotLists=env.nSlotLists, nStorageQueue, error=rc.error
# Attempt to switch pivot, there is not much else one can do here
buddy.ctrl.zombie = true
return false
return true
proc getMissingNodesFromNetwork(
proc getNodesFromNetwork(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
missing: seq[NodeSpecs];
env: SnapPivotRef;
): Future[seq[NodeSpecs]]
{.async.} =
## Extract from `nodes.missing` the next batch of nodes that need
## Extract from `missing` the next batch of nodes that need
## to be merged it into the database
let
ctx = buddy.ctx
peer = buddy.peer
accKey = kvp.data.accKey
accPath = kvp.data.accKey.to(Blob)
storageRoot = kvp.key
pivot = "#" & $env.stateHeader.blockNumber # for logging
slots = kvp.data.slots
nSickSubTries = slots.nodes.missing.len
inxLeft = max(0, nSickSubTries - snapRequestTrieNodesFetchMax)
# There is no point in processing too many nodes at the same time. So leave
# the rest on the `nodes.missing` queue to be handled later.
let fetchNodes = slots.nodes.missing[inxLeft ..< nSickSubTries]
slots.nodes.missing.setLen(inxLeft)
fetchNodes = missing[0 ..< fetchRequestTrieNodesMax]
# Initalise for `getTrieNodes()` for fetching nodes from the network
var
@ -192,148 +217,39 @@ proc getMissingNodesFromNetwork(
pathList.add @[w.partialPath]
nodeKey[w.partialPath] = w.nodeKey
# Fetch nodes from the network. Note that the remainder of the `nodes.missing`
# list might be used by another process that runs semi-parallel.
# Fetch nodes from the network.
let
req = @[accKey.to(Blob)] & fetchNodes.mapIt(it.partialPath)
pivot = "#" & $env.stateHeader.blockNumber # for logging
req = @[accPath] & fetchNodes.mapIt(it.partialPath)
rc = await buddy.getTrieNodes(storageRoot, @[req], pivot)
if rc.isOk:
# Reset error counts for detecting repeated timeouts, network errors, etc.
buddy.data.errors.resetComError()
# Register unfetched missing nodes for the next pass
for w in rc.value.leftOver:
for n in 1 ..< w.len:
slots.nodes.missing.add NodeSpecs(
partialPath: w[n],
nodeKey: nodeKey[w[n]])
return rc.value.nodes.mapIt(NodeSpecs(
partialPath: it.partialPath,
nodeKey: nodeKey[it.partialPath],
data: it.data))
# Restore missing nodes list now so that a task switch in the error checker
# allows other processes to access the full `nodes.missing` list.
slots.nodes.missing = slots.nodes.missing & fetchNodes
let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors):
discard
when extraTraceMessages:
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
trace logTxt "fetch nodes error => stop", peer,
itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists,
nStorageQueue, error
else:
discard
when extraTraceMessages:
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
trace logTxt "fetch nodes error", peer,
itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists,
nStorageQueue, error
return @[]
ctx=buddy.healingCtx(kvp,env), error
proc kvStorageSlotsLeaf(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
node: NodeSpecs;
env: SnapPivotRef;
): (bool,NodeKey)
{.gcsafe, raises: [Defect,RlpError]} =
proc slotKey(node: NodeSpecs): (bool,NodeKey) =
## Read leaf node from persistent database (if any)
let
peer = buddy.peer
nodeRlp = rlpFromBytes node.data
(_,prefix) = hexPrefixDecode node.partialPath
(_,segment) = hexPrefixDecode nodeRlp.listElem(0).toBytes
nibbles = prefix & segment
if nibbles.len == 64:
return (true, nibbles.getBytes.convertTo(NodeKey))
proc registerStorageSlotsLeaf(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
slotKey: NodeKey;
env: SnapPivotRef;
) =
## Process single trie node as would be done with an interval by
## the `storeStorageSlots()` function
let
peer = buddy.peer
slots = kvp.data.slots
pt = slotKey.to(NodeTag)
# Find range set (from list) containing `pt`
var ivSet: NodeTagRangeSet
block foundCoveringRange:
for w in slots.unprocessed:
if 0 < w.covered(pt,pt):
ivSet = w
break foundCoveringRange
return # already processed, forget this account leaf
# Register this isolated leaf node that was added
discard ivSet.reduce(pt,pt)
when extraTraceMessages:
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
trace logTxt "single node", peer,
itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists,
nStorageQueue, slotKey=pt
proc assembleWorkItemsQueue(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): (seq[SnapSlotsQueuePair],int) =
## ..
var
toBeHealed: seq[SnapSlotsQueuePair]
nAcceptedAsIs = 0
# Search the current slot item batch list for items to complete via healing
for kvp in env.fetchStoragePart.nextPairs:
# Marked items indicate that a partial sub-trie existsts which might have
# been inherited from an earlier storage root.
if kvp.data.inherit:
# Remove `kvp` work item from the queue object (which is allowed within a
# `for` loop over a `KeyedQueue` object type.)
env.fetchStorageFull.del(kvp.key)
# With some luck, the `kvp` work item refers to a complete storage trie
# that can be be accepted as-is in wich case `kvp` can be just dropped.
let rc = buddy.acceptWorkItemAsIs(kvp)
if rc.isOk and rc.value:
env.nSlotLists.inc
nAcceptedAsIs.inc # for logging
continue # dropping `kvp`
toBeHealed.add kvp
if healStorageSlotsBatchMax <= toBeHealed.len:
return (toBeHealed, nAcceptedAsIs)
# Ditto for partial items queue
for kvp in env.fetchStoragePart.nextPairs:
if healSlorageSlotsTrigger <= kvp.data.slots.unprocessed.emptyFactor:
env.fetchStoragePart.del(kvp.key)
let rc = buddy.acceptWorkItemAsIs(kvp)
if rc.isOk and rc.value:
env.nSlotLists.inc
nAcceptedAsIs.inc # for logging
continue # dropping `kvp`
# Add to local batch to be processed, below
toBeHealed.add kvp
if healStorageSlotsBatchMax <= toBeHealed.len:
break
(toBeHealed, nAcceptedAsIs)
try:
let
nodeRlp = rlpFromBytes node.data
(_,prefix) = hexPrefixDecode node.partialPath
(_,segment) = hexPrefixDecode nodeRlp.listElem(0).toBytes
nibbles = prefix & segment
if nibbles.len == 64:
return (true, nibbles.getBytes.convertTo(NodeKey))
except:
discard
# ------------------------------------------------------------------------------
# Private functions: do the healing for one work item (sub-trie)
@ -343,131 +259,62 @@ proc storageSlotsHealing(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
env: SnapPivotRef;
): Future[bool]
{.async.} =
) {.async.} =
## Returns `true` is the sub-trie is complete (probably inherited), and
## `false` if there are nodes left to be completed.
let
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
accKey = kvp.data.accKey
slots = kvp.data.slots
missing = buddy.compileMissingNodesList(kvp, env)
if missing.len == 0:
trace logTxt "nothing to do", peer, ctx=buddy.healingCtx(kvp,env)
return
when extraTraceMessages:
block:
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
trace logTxt "started", peer, itCtx=buddy.healingCtx(kvp,env),
nSlotLists=env.nSlotLists, nStorageQueue
# Update for changes since last visit
buddy.verifyStillMissingNodes(kvp, env)
# ???
if slots.nodes.check.len != 0:
if not await buddy.updateMissingNodesList(kvp,env):
return false
# Check whether the trie is complete.
if slots.nodes.missing.len == 0:
trace logTxt "complete", peer, itCtx=buddy.healingCtx(kvp,env)
return true
trace logTxt "started", peer, ctx=buddy.healingCtx(kvp,env)
# Get next batch of nodes that need to be merged it into the database
let nodeSpecs = await buddy.getMissingNodesFromNetwork(kvp,env)
let nodeSpecs = await buddy.getNodesFromNetwork(kvp, missing, env)
if nodeSpecs.len == 0:
return
# Store nodes onto disk
let report = db.importRawStorageSlotsNodes(peer, accKey, nodeSpecs)
let report = db.importRawStorageSlotsNodes(peer, kvp.data.accKey, nodeSpecs)
if 0 < report.len and report[^1].slot.isNone:
# Storage error, just run the next lap (not much else that can be done)
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
error logTxt "error updating persistent database", peer,
itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists,
nStorageQueue, nNodes=nodeSpecs.len, error=report[^1].error
slots.nodes.missing = slots.nodes.missing & nodeSpecs
return false
error logTxt "database error", peer, ctx=buddy.healingCtx(kvp,env),
nNodes=nodeSpecs.len, error=report[^1].error
return
when extraTraceMessages:
block:
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
trace logTxt "nodes merged into database", peer,
itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists,
nStorageQueue, nNodes=nodeSpecs.len
trace logTxt "nodes merged into database", peer,
ctx=buddy.healingCtx(kvp,env), nNodes=nodeSpecs.len
# Filter out error and leaf nodes
# Filter out leaf nodes
var nLeafNodes = 0 # for logging
for w in report:
if w.slot.isSome: # non-indexed entries appear typically at the end, though
let inx = w.slot.unsafeGet
if w.slot.isSome and w.kind.get(otherwise = Branch) == Leaf:
if w.error != NothingSerious or w.kind.isNone:
# error, try downloading again
slots.nodes.missing.add nodeSpecs[inx]
# Leaf Node has been stored, so register it
let
inx = w.slot.unsafeGet
(isLeaf, slotKey) = nodeSpecs[inx].slotKey
if isLeaf:
let
slotTag = slotKey.to(NodeTag)
iv = NodeTagRange.new(slotTag,slotTag)
kvp.data.slots.unprocessed.reduce iv
discard kvp.data.slots.processed.merge iv
nLeafNodes.inc
elif w.kind.unsafeGet != Leaf:
# re-check this node
slots.nodes.check.add nodeSpecs[inx]
else:
# Node has been stored, double check
let (isLeaf, slotKey) =
buddy.kvStorageSlotsLeaf(kvp, nodeSpecs[inx], env)
if isLeaf:
# Update `uprocessed` registry, collect storage roots (if any)
buddy.registerStorageSlotsLeaf(kvp, slotKey, env)
nLeafNodes.inc
else:
slots.nodes.check.add nodeSpecs[inx]
when extraTraceMessages:
trace logTxt "stored slot", peer,
ctx=buddy.healingCtx(kvp,env), slotKey=slotTag
when extraTraceMessages:
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
trace logTxt "job done", peer, itCtx=buddy.healingCtx(kvp,env),
nSlotLists=env.nSlotLists, nStorageQueue, nLeafNodes
proc healingIsComplete(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
env: SnapPivotRef;
): Future[bool]
{.async.} =
## Check whether the storage trie can be completely inherited and prepare for
## healing if not.
##
## Returns `true` is the sub-trie is complete (probably inherited), and
## `false` if there are nodes left to be completed.
let
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
accKey = kvp.data.accKey
storageRoot = kvp.key
# Check whether this work item can be completely inherited
if kvp.data.inherit:
let rc = db.inspectStorageSlotsTrie(peer, accKey, storageRoot)
if rc.isErr:
# Oops, not much we can do here (looping trie?)
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
error logTxt "problem inspecting storage trie", peer,
nSlotLists=env.nSlotLists, nStorageQueue, storageRoot, error=rc.error
return false
# Check whether the hexary trie can be inherited as-is.
if rc.value.dangling.len == 0:
return true # done
# Full range covered by unprocessed items
kvp.data.slots = SnapRangeBatchRef(
nodes: SnapTodoNodes(
missing: rc.value.dangling))
kvp.data.slots.unprocessed.init()
# Proceed with healing
return await buddy.storageSlotsHealing(kvp, env)
trace logTxt "job done", peer, ctx=buddy.healingCtx(kvp,env), nLeafNodes
# ------------------------------------------------------------------------------
# Public functions
@ -480,43 +327,46 @@ proc healStorageSlots*(
## Fetching and merging missing slorage slots trie database nodes.
let
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
var
(toBeHealed, nAcceptedAsIs) = buddy.assembleWorkItemsQueue(env)
# Extract healing slot items from partial slots list
var toBeHealed: seq[SnapSlotsQueuePair]
for kvp in env.fetchStoragePart.nextPairs:
# Delete from queue and process this entry
env.fetchStoragePart.del kvp.key
# Move to returned list unless duplicated in full slots list
if env.fetchStorageFull.eq(kvp.key).isErr:
toBeHealed.add kvp
env.parkedStorage.incl kvp.data.accKey # temporarily parked
if healStorageSlotsBatchMax <= toBeHealed.len:
break
# Run against local batch
let nHealerQueue = toBeHealed.len
if 0 < nHealerQueue:
when extraTraceMessages:
block:
let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
trace logTxt "processing", peer,
nSlotLists=env.nSlotLists, nStoQu, nHealerQueue, nAcceptedAsIs
trace logTxt "processing", peer, ctx=buddy.healingCtx(env), nHealerQueue
for n in 0 ..< toBeHealed.len:
# Stop processing, hand back the rest
if buddy.ctrl.stopped:
for m in n ..< toBeHealed.len:
let kvp = toBeHealed[n]
discard env.fetchStoragePart.append(kvp.key, kvp.data)
env.parkedStorage.excl kvp.data.accKey
break
let kvp = toBeHealed[n]
await buddy.storageSlotsHealing(kvp, env)
if buddy.ctrl.running:
if await buddy.healingIsComplete(kvp,env):
env.nSlotLists.inc
nAcceptedAsIs.inc
continue
# Re-queue again unless ready
env.parkedStorage.excl kvp.data.accKey # un-register
if not kvp.data.slots.processed.isFull:
discard env.fetchStoragePart.append(kvp.key, kvp.data)
if kvp.data.slots.isNil:
env.fetchStorageFull.merge kvp # should be the exception
else:
env.fetchStoragePart.merge kvp
when extraTraceMessages:
let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
trace logTxt "done", peer, nSlotLists=env.nSlotLists, nStoQu,
nHealerQueue, nAcceptedAsIs, runState=buddy.ctrl.state
elif 0 < nAcceptedAsIs:
let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
trace logTxt "work items", peer, nSlotLists=env.nSlotLists,
nStoQu, nHealerQueue, nAcceptedAsIs, runState=buddy.ctrl.state
when extraTraceMessages:
trace logTxt "done", peer, ctx=buddy.healingCtx(env), nHealerQueue
# ------------------------------------------------------------------------------
# End

View File

@ -8,26 +8,38 @@
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Fetch account ranges
## ====================
## Fetch accounts DB ranges
## ========================
##
## Acccount ranges not on the database yet are organised in the set
## `env.fetchAccounts.unprocessed` of intervals (of account hashes.)
## Acccount ranges allocated on the database are organised in the set
## `env.fetchAccounts.processed` and the ranges that can be fetched are in
## the pair of range sets `env.fetchAccounts.unprocessed`. The ranges of these
## sets are mutually disjunct yet the union of all ranges does not fully
## comprise the complete `[0,2^256]` range. The missing parts are the ranges
## currently processed by worker peers.
##
## When processing, the followin happens.
## Algorithm
## ---------
##
## * Some interval `iv` is removed from the `env.fetchAccounts.unprocessed`
## set. This interval set might then be safely accessed and manipulated by
## other worker instances.
## pair of set (so the interval `iv` is protected from other worker
## instances and might be safely accessed and manipulated by this function.)
## Stop if there are no more intervals.
##
## * The data points in the interval `iv` (aka ccount hashes) are fetched from
## another peer over the network.
## * The accounts data points in the interval `iv` (aka account hashes) are
## fetched from the network. This results in *key-value* pairs for accounts.
##
## * The received data points of the interval `iv` are verified and merged
## into the persistent database.
## * The received *key-value* pairs from the previous step are verified and
## merged into the accounts hexary trie persistent database.
##
## * Data points in `iv` that were invalid or not recevied from the network
## are merged back it the set `env.fetchAccounts.unprocessed`.
## * *Key-value* pairs that were invalid or were not recevied from the network
## are merged back into the range set `env.fetchAccounts.unprocessed`. The
## remainder of successfully added ranges (and verified key gaps) are merged
## into `env.fetchAccounts.processed`.
##
## * For *Key-value* pairs that have an active account storage slot sub-trie,
## the account including administrative data is queued in
## `env.fetchStorageFull`.
##
import
chronicles,
@ -40,7 +52,7 @@ import
"../.."/[constants, range_desc, worker_desc],
../com/[com_error, get_account_range],
../db/[hexary_envelope, snapdb_accounts],
./swap_in
"."/[storage_queue_helper, swap_in]
{.push raises: [Defect].}
@ -58,6 +70,22 @@ const
template logTxt(info: static[string]): static[string] =
"Accounts range " & info
proc `$`(rs: NodeTagRangeSet): string =
rs.fullFactor.toPC(0)
proc `$`(iv: NodeTagRange): string =
iv.fullFactor.toPC(3)
proc fetchCtx(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): string =
"{" &
"pivot=" & "#" & $env.stateHeader.blockNumber & "," &
"runState=" & $buddy.ctrl.state & "," &
"nStoQu=" & $env.storageQueueTotal() & "," &
"nSlotLists=" & $env.nSlotLists & "}"
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
@ -88,26 +116,28 @@ proc accountsRangefetchImpl(
db = ctx.data.snapDb
fa = env.fetchAccounts
stateRoot = env.stateHeader.stateRoot
pivot = "#" & $env.stateHeader.blockNumber # for logging
# Get a range of accounts to fetch from
let iv = block:
let rc = buddy.getUnprocessed(env)
if rc.isErr:
when extraTraceMessages:
trace logTxt "currently all processed", peer, pivot
trace logTxt "currently all processed", peer, ctx=buddy.fetchCtx(env)
return
rc.value
# Process received accounts and stash storage slots to fetch later
let dd = block:
let rc = await buddy.getAccountRange(stateRoot, iv, pivot)
let
pivot = "#" & $env.stateHeader.blockNumber
rc = await buddy.getAccountRange(stateRoot, iv, pivot)
if rc.isErr:
fa.unprocessed.merge iv # fail => interval back to pool
let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors):
when extraTraceMessages:
trace logTxt "fetch error => stop", peer, pivot, reqLen=iv.len, error
trace logTxt "fetch error", peer, ctx=buddy.fetchCtx(env),
reqLen=iv.len, error
return
rc.value
@ -118,10 +148,6 @@ proc accountsRangefetchImpl(
gotAccounts = dd.data.accounts.len # comprises `gotStorage`
gotStorage = dd.withStorage.len
#when extraTraceMessages:
# trace logTxt "fetched", peer, gotAccounts, gotStorage,
# pivot, reqLen=iv.len, gotLen=dd.consumed.len
# Now, we fully own the scheduler. The original interval will savely be placed
# back for a moment (the `unprocessed` range set to be corrected below.)
fa.unprocessed.merge iv
@ -143,8 +169,8 @@ proc accountsRangefetchImpl(
# Bad data, just try another peer
buddy.ctrl.zombie = true
when extraTraceMessages:
trace logTxt "import failed => stop", peer, gotAccounts, gotStorage,
pivot, reqLen=iv.len, gotLen=covered.total, error=rc.error
trace logTxt "import failed", peer, ctx=buddy.fetchCtx(env),
gotAccounts, gotStorage, reqLen=iv.len, covered, error=rc.error
return
rc.value
@ -165,25 +191,21 @@ proc accountsRangefetchImpl(
discard fa.processed.merge w
# Register accounts with storage slots on the storage TODO list.
env.fetchStorageFull.merge dd.withStorage
env.storageQueueAppend dd.withStorage
# Swap in from other pivots unless mothballed, already
var nSwapInLaps = 0
if not env.archived and
swapInAccountsCoverageTrigger <= ctx.pivotAccountsCoverage():
# Swap in from other pivots
if not env.archived:
when extraTraceMessages:
trace logTxt "before swap in", peer, pivot, gotAccounts, gotStorage,
coveredHere=covered.fullFactor.toPC(2),
processed=fa.processed.fullFactor.toPC(2),
trace logTxt "before swap in", peer, ctx=buddy.fetchCtx(env), covered,
gotAccounts, gotStorage, processed=fa.processed,
nProcessedChunks=fa.processed.chunks.uint.toSI
if swapInAccountsPivotsMin <= ctx.data.pivotTable.len:
nSwapInLaps = buddy.swapInAccounts(env)
nSwapInLaps = ctx.swapInAccounts env
when extraTraceMessages:
trace logTxt "request done", peer, pivot, gotAccounts, gotStorage,
nSwapInLaps, coveredHere=covered.fullFactor.toPC(2),
processed=fa.processed.fullFactor.toPC(2),
trace logTxt "request done", peer, ctx=buddy.fetchCtx(env), gotAccounts,
gotStorage, nSwapInLaps, covered, processed=fa.processed,
nProcessedChunks=fa.processed.chunks.uint.toSI
return true
@ -204,10 +226,9 @@ proc rangeFetchAccounts*(
let
ctx = buddy.ctx
peer = buddy.peer
pivot = "#" & $env.stateHeader.blockNumber # for logging
when extraTraceMessages:
trace logTxt "start", peer, pivot
trace logTxt "start", peer, ctx=buddy.fetchCtx(env)
var nFetchAccounts = 0 # for logging
while not fa.processed.isFull() and
@ -219,12 +240,11 @@ proc rangeFetchAccounts*(
# Clean up storage slots queue first it it becomes too large
let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
if snapStorageSlotsQuPrioThresh < nStoQu:
if storageSlotsQuPrioThresh < nStoQu:
break
when extraTraceMessages:
trace logTxt "done", peer, pivot, nFetchAccounts,
runState=buddy.ctrl.state
trace logTxt "done", peer, ctx=buddy.fetchCtx(env), nFetchAccounts
# ------------------------------------------------------------------------------
# End

View File

@ -8,50 +8,59 @@
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Fetch storage slots
## ===================
## Fetch storage slots DB ranges
## =============================
##
## Flow chart for storage slots download
## -------------------------------------
## ::
## {missing-storage-slots} <-----------------+
## | |
## v |
## <fetch-storage-slots-from-network> |
## | |
## v |
## {storage-slots} |
## | |
## v |
## <merge-to-persistent-database> |
## | | |
## v v |
## {completed} {partial} |
## | | |
## | +------------------------+
## v
## <done-for-this-account>
## In principle, this algorithm is a generalised version of the one for
## installing on the accounts hexary trie database. The difference is that
## there are many such storage slots hexary trie database which are typically
## quite small. So the normal action is to download and install a full hexary
## trie rather than merging a partial one.
##
## Legend:
## * `<..>`: some action, process, etc.
## * `{missing-storage-slots}`: list implemented as pair of queues
## `env.fetchStorageFull` and `env.fetchStoragePart`
## * `(storage-slots}`: list is optimised out
## * `{completed}`: list is optimised out
## * `{partial}`: list is optimised out
## Algorithm
## ---------
##
## * Handle full storage slot hexary trie entries
##
## + Remove a list of full storage slot hexary trie entries from the queue of
## full requests `env.fetchStorageFull`.
##
## The *full* adjective indicates that a complete trie will be installed
## rather an a partial one merged. Stop if there are no more full entries
## and proceed with handling partial entries.
##
## + Fetch and install the full trie entries of that list from the network.
##
## + For a list entry that was partially received (there is only one per
## reply message), store the remaining parts to install on the queue of
## partial storage slot hexary trie entries `env.fetchStoragePart`.
##
## + Rinse and repeat
##
## * Handle partial storage slot hexary trie entries
##
## + Remove a single partial storage slot hexary trie entry from the queue
## of partial requests `env.fetchStoragePart`.
##
## The detailed handling of this entry resembles the algorithm described
## for fetching accounts regarding sets of ranges `processed` and
## `unprocessed`. Stop if there are no more entries.
##
## + Fetch and install the partial trie entry from the network.
##
## + Rinse and repeat
##
## Discussion
## ----------
## Handling storage slots can be seen as an generalisation of handling account
## ranges (see `range_fetch_accounts` module.) Contrary to the situation with
## accounts, storage slots are typically downloaded in the size of a full list
## that can be expanded to a full hexary trie for the given storage root.
##
## Only in rare cases a storage slots list is incomplete, a partial hexary
## trie. In that case, the list of storage slots is processed as described
## for accounts (see `range_fetch_accounts` module.)
## If there is a hexary trie integrity problem when storing a response to a
## full or partial entry request, re-queue the entry on the queue of partial
## requests `env.fetchStoragePart` with the next partial range to fetch half
## of the current request.
##
## In general, if an error occurs, the entry that caused the error is moved
## or re-stored onto the queue of partial requests `env.fetchStoragePart`.
##
import
chronicles,
chronos,
@ -59,9 +68,10 @@ import
stew/[interval_set, keyed_queue],
stint,
../../../sync_desc,
"../.."/[constants, range_desc, worker_desc],
"../.."/[range_desc, worker_desc],
../com/[com_error, get_storage_ranges],
../db/[hexary_error, snapdb_storage_slots]
../db/[hexary_error, snapdb_storage_slots],
./storage_queue_helper
{.push raises: [Defect].}
@ -78,90 +88,20 @@ const
template logTxt(info: static[string]): static[string] =
"Storage slots range " & info
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc getNextSlotItemsFull(
proc fetchCtx(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): seq[AccountSlotsHeader] =
## Get list of full work item from the batch queue.
##
## If there is an indication that the storage trie may have some data
## already it is ignored here and marked `inherit` so that it will be
## picked up by the healing process.
): string =
let
ctx = buddy.ctx
peer = buddy.peer
var
nInherit = 0
for kvp in env.fetchStorageFull.nextPairs:
let it = AccountSlotsHeader(
accKey: kvp.data.accKey,
storageRoot: kvp.key)
# Verify whether a storage sub-trie exists, already
if kvp.data.inherit or
ctx.data.snapDb.haveStorageSlotsData(peer, it.accKey, it.storageRoot):
kvp.data.inherit = true
nInherit.inc # update for logging
continue
result.add it
env.fetchStorageFull.del(kvp.key) # ok to delete this item from batch queue
# Maximal number of items to fetch
if snapStorageSlotsFetchMax <= result.len:
break
when extraTraceMessages:
trace logTxt "fetch full", peer, nSlotLists=env.nSlotLists,
nStorageQuFull=env.fetchStorageFull.len, nToProcess=result.len, nInherit
proc getNextSlotItemPartial(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): seq[AccountSlotsHeader] =
## Get work item from the batch queue.
let
ctx = buddy.ctx
peer = buddy.peer
for kvp in env.fetchStoragePart.nextPairs:
if not kvp.data.slots.isNil:
# Extract range and return single item request queue
let rc = kvp.data.slots.unprocessed.fetch(maxLen = high(UInt256))
if rc.isOk:
# Delete from batch queue if the range set becomes empty
if kvp.data.slots.unprocessed.isEmpty:
env.fetchStoragePart.del(kvp.key)
when extraTraceMessages:
trace logTxt "fetch partial", peer,
nSlotLists=env.nSlotLists,
nStorageQuPart=env.fetchStoragePart.len,
subRange=rc.value, account=kvp.data.accKey
return @[AccountSlotsHeader(
accKey: kvp.data.accKey,
storageRoot: kvp.key,
subRange: some rc.value)]
# Oops, empty range set? Remove range and move item to the full requests
kvp.data.slots = nil
env.fetchStorageFull.merge kvp
proc backToSlotItemsQueue(env: SnapPivotRef; req: seq[AccountSlotsHeader]) =
if 0 < req.len:
if req[^1].subRange.isSome:
env.fetchStoragePart.merge req[^1]
env.fetchStorageFull.merge req[0 ..< req.len-1]
else:
env.fetchStorageFull.merge req
nStoQu = (env.fetchStorageFull.len +
env.fetchStoragePart.len +
env.parkedStorage.len)
"{" &
"pivot=" & "#" & $env.stateHeader.blockNumber & "," &
"runState=" & $buddy.ctrl.state & "," &
"nStoQu=" & $nStoQu & "," &
"nSlotLists=" & $env.nSlotLists & "}"
# ------------------------------------------------------------------------------
# Private functions
@ -171,7 +111,8 @@ proc storeStoragesSingleBatch(
buddy: SnapBuddyRef;
req: seq[AccountSlotsHeader];
env: SnapPivotRef;
) {.async.} =
): Future[bool]
{.async.} =
## Fetch account storage slots and store them in the database.
let
ctx = buddy.ctx
@ -183,28 +124,19 @@ proc storeStoragesSingleBatch(
var stoRange = block:
let rc = await buddy.getStorageRanges(stateRoot, req, pivot)
if rc.isErr:
env.backToSlotItemsQueue req
let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors):
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
trace logTxt "fetch error => stop", peer, pivot,
nSlotLists=env.nSlotLists, nReq=req.len, nStorageQueue, error
return
trace logTxt "fetch error => stop", peer, ctx=buddy.fetchCtx(env),
nReq=req.len, error
return false # all of `req` failed
rc.value
# Reset error counts for detecting repeated timeouts, network errors, etc.
buddy.data.errors.resetComError()
var gotSlotLists = stoRange.data.storages.len
#when extraTraceMessages:
# let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
# trace logTxt "fetched", peer, pivot, nSlotLists=env.nSlotLists,
# nSlotLists=gotSlotLists, nReq=req.len,
# nStorageQueue, nLeftOvers=stoRange.leftOver.len
if 0 < gotSlotLists:
# Verify/process storages data and save it to disk
let report = ctx.data.snapDb.importStorageSlots(
peer, stoRange.data, noBaseBoundCheck = true)
@ -214,14 +146,11 @@ proc storeStoragesSingleBatch(
if report[^1].slot.isNone:
# Failed to store on database, not much that can be done here
env.backToSlotItemsQueue req
gotSlotLists.dec(report.len - 1) # for logging only
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
error logTxt "import failed", peer, pivot,
nSlotLists=env.nSlotLists, nSlotLists=gotSlotLists, nReq=req.len,
nStorageQueue, error=report[^1].error
return
error logTxt "import failed", peer, ctx=buddy.fetchCtx(env),
nSlotLists=gotSlotLists, nReq=req.len, error=report[^1].error
return false # all of `req` failed
# Push back error entries to be processed later
for w in report:
@ -234,44 +163,19 @@ proc storeStoragesSingleBatch(
if w.error == RootNodeMismatch:
# Some pathological case, needs further investigation. For the
# moment, provide partial fetches.
const
halfTag = (high(UInt256) div 2).NodeTag
halfTag1 = halfTag + 1.u256
env.fetchStoragePart.merge [
AccountSlotsHeader(
accKey: acc.accKey,
storageRoot: acc.storageRoot,
subRange: some NodeTagRange.new(low(NodeTag), halfTag)),
AccountSlotsHeader(
accKey: acc.accKey,
storageRoot: acc.storageRoot,
subRange: some NodeTagRange.new(halfTag1, high(NodeTag)))]
env.storageQueueAppendPartialBisect acc
elif w.error == RightBoundaryProofFailed and
acc.subRange.isSome and 1 < acc.subRange.unsafeGet.len:
# Some pathological case, needs further investigation. For the
# moment, provide a partial fetches.
let
iv = acc.subRange.unsafeGet
halfTag = iv.minPt + (iv.len div 2)
halfTag1 = halfTag + 1.u256
env.fetchStoragePart.merge [
AccountSlotsHeader(
accKey: acc.accKey,
storageRoot: acc.storageRoot,
subRange: some NodeTagRange.new(iv.minPt, halfTag)),
AccountSlotsHeader(
accKey: acc.accKey,
storageRoot: acc.storageRoot,
subRange: some NodeTagRange.new(halfTag1, iv.maxPt))]
env.storageQueueAppendPartialBisect acc
else:
# Reset any partial result (which would be the last entry) to
# requesting the full interval. So all the storage slots are
# re-fetched completely for this account.
env.fetchStorageFull.merge AccountSlotsHeader(
accKey: acc.accKey,
storageRoot: acc.storageRoot)
env.storageQueueAppendFull acc
# Last entry might be partial (if any)
#
@ -284,10 +188,9 @@ proc storeStoragesSingleBatch(
# Update local statistics counter for `nSlotLists` counter update
gotSlotLists.dec
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
error logTxt "processing error", peer, pivot, nSlotLists=env.nSlotLists,
error logTxt "processing error", peer, ctx=buddy.fetchCtx(env),
nSlotLists=gotSlotLists, nReqInx=inx, nReq=req.len,
nStorageQueue, nDangling=w.dangling.len, error=w.error
nDangling=w.dangling.len, error=w.error
# Update statistics
if gotSlotLists == 1 and
@ -299,7 +202,8 @@ proc storeStoragesSingleBatch(
env.nSlotLists.inc(gotSlotLists)
# Return unprocessed left overs to batch queue
env.backToSlotItemsQueue stoRange.leftOver
env.storageQueueAppend(stoRange.leftOver, req[^1].subRange)
return true
# ------------------------------------------------------------------------------
# Public functions
@ -314,50 +218,62 @@ proc rangeFetchStorageSlots*(
## each work item on the queue at least once.For partial partial slot range
## items this means in case of success that the outstanding range has become
## at least smaller.
let
peer = buddy.peer
fullRangeLen = env.fetchStorageFull.len
partRangeLen = env.fetchStoragePart.len
# Fetch storage data and save it on disk. Storage requests are managed by
# request queues for handling full/partial replies and re-fetch issues. For
# all practical puroses, this request queue should mostly be empty.
if 0 < fullRangeLen or 0 < partRangeLen:
if 0 < env.fetchStorageFull.len or 0 < env.fetchStoragePart.len:
let
ctx = buddy.ctx
peer = buddy.peer
when extraTraceMessages:
trace logTxt "start", peer, nSlotLists=env.nSlotLists,
nStorageQueue=(fullRangeLen+partRangeLen)
trace logTxt "start", peer, ctx=buddy.fetchCtx(env)
# Processing the full range will implicitely handle inheritable storage
# slots first with each batch item (see `getNextSlotItemsFull()`.)
#
# Run this batch even if `archived` flag is set in order to shrink the
# batch queue.
var fullRangeItemsleft = 1 + (fullRangeLen-1) div snapStorageSlotsFetchMax
while 0 < fullRangeItemsleft and
buddy.ctrl.running:
# Run batch even if `archived` flag is set in order to shrink the queues.
var delayed: seq[AccountSlotsHeader]
while buddy.ctrl.running:
# Pull out the next request list from the queue
let req = buddy.getNextSlotItemsFull(env)
let (req, nComplete, nPartial) = ctx.storageQueueFetchFull(env)
if req.len == 0:
break
fullRangeItemsleft.dec
await buddy.storeStoragesSingleBatch(req, env)
when extraTraceMessages:
trace logTxt "fetch full", peer, ctx=buddy.fetchCtx(env),
nStorageQuFull=env.fetchStorageFull.len, nReq=req.len,
nPartial, nComplete
var partialRangeItemsLeft = env.fetchStoragePart.len
while 0 < partialRangeItemsLeft and
buddy.ctrl.running:
# Pull out the next request list from the queue
let req = buddy.getNextSlotItemPartial(env)
if req.len == 0:
if await buddy.storeStoragesSingleBatch(req, env):
for w in req:
env.parkedStorage.excl w.accKey # Done with these items
else:
delayed &= req
env.storageQueueAppend delayed
# Ditto for partial queue
delayed.setLen(0)
while buddy.ctrl.running:
# Pull out the next request item from the queue
let rc = env.storageQueueFetchPartial()
if rc.isErr:
break
partialRangeItemsLeft.dec
await buddy.storeStoragesSingleBatch(req, env)
when extraTraceMessages:
let
subRange = rc.value.subRange.get
account = rc.value.accKey
trace logTxt "fetch partial", peer, ctx=buddy.fetchCtx(env),
nStorageQuPart=env.fetchStoragePart.len, subRange, account
if await buddy.storeStoragesSingleBatch(@[rc.value], env):
env.parkedStorage.excl rc.value.accKey # Done with this item
else:
delayed.add rc.value
env.storageQueueAppend delayed
when extraTraceMessages:
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
trace logTxt "done", peer, nSlotLists=env.nSlotLists, nStorageQueue,
fullRangeItemsleft, partialRangeItemsLeft, runState=buddy.ctrl.state
trace logTxt "done", peer, ctx=buddy.fetchCtx(env)
# ------------------------------------------------------------------------------
# End

View File

@ -0,0 +1,292 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
eth/[common, p2p],
stew/[interval_set, keyed_queue],
../../../sync_desc,
"../.."/[constants, range_desc, worker_desc],
../db/[hexary_inspect, snapdb_storage_slots]
{.push raises: [Defect].}
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
template noExceptionOops(info: static[string]; code: untyped) =
try:
code
except Defect as e:
raise e
except Exception as e:
raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc getOrMakePartial(
env: SnapPivotRef;
stoRoot: Hash256;
accKey: NodeKey;
): (SnapSlotsQueueItemRef, bool) =
## Create record on `fetchStoragePart` or return existing one
let rc = env.fetchStoragePart.lruFetch stoRoot
if rc.isOk:
result = (rc.value, true) # Value exists
else:
result = (SnapSlotsQueueItemRef(accKey: accKey), false) # New value
env.parkedStorage.excl accKey # Un-park
discard env.fetchStoragePart.append(stoRoot, result[0])
if result[0].slots.isNil:
result[0].slots = SnapRangeBatchRef(processed: NodeTagRangeSet.init())
result[0].slots.unprocessed.init()
# ------------------------------------------------------------------------------
# Public helpers
# ------------------------------------------------------------------------------
proc storageQueueTotal*(env: SnapPivotRef): int =
## Total number of entries on the storage queues
env.fetchStorageFull.len + env.fetchStoragePart.len + env.parkedStorage.len
# ------------------------------------------------------------------------------
# Public functions, append queue items
# ------------------------------------------------------------------------------
proc storageQueueAppendFull*(
env: SnapPivotRef;
stoRoot: Hash256;
accKey: NodeKey;
) =
## Append item to `fetchStorageFull` queue
env.fetchStoragePart.del stoRoot # Not a partial item anymore (if any)
env.parkedStorage.excl accKey # Un-park
discard env.fetchStorageFull.append(
stoRoot, SnapSlotsQueueItemRef(accKey: accKey))
proc storageQueueAppendFull*(
env: SnapPivotRef;
acc: AccountSlotsHeader;
) =
## variant of `storageQueueAppendFull()`
env.storageQueueAppendFull(acc.storageRoot, acc.accKey)
proc storageQueueAppendFull*(
env: SnapPivotRef;
kvp: SnapSlotsQueuePair;
) =
## variant of `storageQueueAppendFull()`
env.storageQueueAppendFull(kvp.key, kvp.data.accKey)
proc storageQueueAppendPartialBisect*(
env: SnapPivotRef;
acc: AccountSlotsHeader;
) =
## Append to partial queue so that the next fetch range is half the size of
## the current next range.
# Fetch/rotate queue item
let data = env.getOrMakePartial(acc.storageRoot, acc.accKey)[0]
# Derive unprocessed ranges => into lower priority queue
data.slots.unprocessed.clear()
discard data.slots.unprocessed[1].merge(low(NodeTag),high(NodeTag))
for iv in data.slots.processed.increasing:
discard data.slots.unprocessed[1].reduce iv # complements processed ranges
# Prioritise half of first unprocessed range
let rc = data.slots.unprocessed[1].ge()
if rc.isErr:
env.fetchStoragePart.del acc.storageRoot # Oops, nothing to do
return # Done
let halfTag = rc.value.minPt + ((rc.value.maxPt - rc.value.minPt) div 2)
data.slots.unprocessed.merge(rc.value.minPt, halfTag)
proc storageQueueAppend*(
env: SnapPivotRef;
reqList: openArray[AccountSlotsHeader];
subRange = none(NodeTagRange); # For a partially fetched slot
) =
for n,w in reqList:
env.parkedStorage.excl w.accKey # Un-park
# Only last item (when `n+1 == reqList.len`) may be registered partial
if w.subRange.isNone or n + 1 < reqList.len:
env.storageQueueAppendFull w
else:
env.fetchStorageFull.del w.storageRoot
let
(data, hasItem) = env.getOrMakePartial(w.storageRoot, w.accKey)
iv = w.subRange.unsafeGet
# Register partial range
if subRange.isSome:
# The `subRange` is the original request, `iv` the uncompleted part
let reqRange = subRange.unsafeGet
if not hasItem:
# Re-initialise book keeping
discard data.slots.processed.merge(low(NodeTag),high(NodeTag))
discard data.slots.processed.reduce reqRange
data.slots.unprocessed.clear()
# Calculate `reqRange - iv` which are the completed ranges
let temp = NodeTagRangeSet.init()
discard temp.merge reqRange
discard temp.reduce iv
# Update `processed` ranges by adding `reqRange - iv`
for w in temp.increasing:
discard data.slots.processed.merge w
# Update `unprocessed` ranges
data.slots.unprocessed.merge reqRange
data.slots.unprocessed.reduce iv
elif hasItem:
# Restore unfetched request
data.slots.unprocessed.merge iv
else:
# Makes no sense with a `leftOver` item
env.storageQueueAppendFull w
# ------------------------------------------------------------------------------
# Public functions, make/create queue items
# ------------------------------------------------------------------------------
proc storageQueueGetOrMakePartial*(
env: SnapPivotRef;
stoRoot: Hash256;
accKey: NodeKey;
): SnapSlotsQueueItemRef =
## Create record on `fetchStoragePart` or return existing one
env.getOrMakePartial(stoRoot, accKey)[0]
proc storageQueueGetOrMakePartial*(
env: SnapPivotRef;
acc: AccountSlotsHeader;
): SnapSlotsQueueItemRef =
## Variant of `storageQueueGetOrMakePartial()`
env.getOrMakePartial(acc.storageRoot, acc.accKey)[0]
# ------------------------------------------------------------------------------
# Public functions, fetch and remove queue items
# ------------------------------------------------------------------------------
proc storageQueueFetchFull*(
ctx: SnapCtxRef; # Global context
env: SnapPivotRef; # Current pivot environment
): (seq[AccountSlotsHeader],int,int) =
## Fetch a list of at most `fetchRequestStorageSlotsMax` full work items
## from the batch queue.
##
## This function walks through the items queue and collects work items where
## the hexary trie has not been fully or partially allocated on the database
## already. These collected items are returned as first item of the return
## code tuple.
##
## There will be a sufficient (but not necessary) quick check whether a
## partally allocated work item is complete, already. In which case it is
## removed from the queue. The number of removed items is returned as
## second item of the return code tuple.
##
## Otherwise, a partially allocated item is meoved to the partial queue. The
## number of items moved to the partial queue is returned as third item of
## the return code tuple.
##
var
rcList: seq[AccountSlotsHeader]
nComplete = 0
nPartial = 0
noExceptionOops("getNextSlotItemsFull"):
for kvp in env.fetchStorageFull.nextPairs:
let
getFn = ctx.data.snapDb.getStorageSlotsFn kvp.data.accKey
rootKey = kvp.key.to(NodeKey)
accItem = AccountSlotsHeader(
accKey: kvp.data.accKey,
storageRoot: kvp.key)
# This item will either be returned, discarded, or moved to the partial
# queue subject for healing. So it will be removed from this queue.
env.fetchStorageFull.del kvp.key # OK to delete current link
# Check whether the tree is fully empty
if rootKey.ByteArray32.getFn.len == 0:
# Collect for return
rcList.add accItem
env.parkedStorage.incl accItem.accKey # Registerd as absent
# Maximal number of items to fetch
if fetchRequestStorageSlotsMax <= rcList.len:
break
else:
# Check how much there is below the top level storage slots node. For
# a small storage trie, this check will be exhaustive.
let stats = getFn.hexaryInspectTrie(rootKey,
suspendAfter = storageSlotsTrieInheritPerusalMax,
maxDangling = 1)
if stats.dangling.len == 0 and stats.resumeCtx.isNil:
# This storage trie could be fully searched and there was no dangling
# node. So it is complete and can be fully removed from the batch.
nComplete.inc # Update for logging
else:
# This item becomes a partially available slot
let data = env.storageQueueGetOrMakePartial accItem
nPartial.inc # Update for logging
(rcList, nComplete, nPartial)
proc storageQueueFetchPartial*(
env: SnapPivotRef;
): Result[AccountSlotsHeader,void] =
## Get work item from the batch queue. This will typically return the full
## work item and remove it from the queue unless the parially completed
## range is fragmented.
block findItem:
for kvp in env.fetchStoragePart.nextPairs:
# Extract range and return single item request queue
let rc = kvp.data.slots.unprocessed.fetch(maxLen = high(UInt256))
if rc.isOk:
result = ok(AccountSlotsHeader(
accKey: kvp.data.accKey,
storageRoot: kvp.key,
subRange: some rc.value))
# Delete from batch queue if the `unprocessed` range set becomes empty
# and the `processed` set is the complemet of `rc.value`.
if kvp.data.slots.unprocessed.isEmpty and
high(UInt256) - rc.value.len <= kvp.data.slots.processed.total:
env.fetchStoragePart.del kvp.key
env.parkedStorage.incl kvp.data.accKey # Temporarily parked
return
else:
# Otherwise rotate queue
break findItem
# End for()
return err()
# Rotate queue item
discard env.fetchStoragePart.lruFetch result.value.storageRoot
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -1,131 +0,0 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
std/sequtils,
chronicles,
chronos,
eth/[common, p2p],
stew/interval_set,
"../.."/[constants, range_desc, worker_desc],
../db/[hexary_desc, hexary_error, hexary_inspect]
{.push raises: [Defect].}
logScope:
topics = "snap-subtrie"
const
extraTraceMessages = false or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private logging helpers
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"Sub-trie helper " & info
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc doInspect(
getFn: HexaryGetFn; ## Abstract database access
rootKey: NodeKey; ## Start of hexary trie
nodes: seq[NodeSpecs]; ## Nodes with prob. dangling child links
resumeCtx: TrieNodeStatCtxRef; ## Resume previous inspection
): Result[TrieNodeStat,HexaryError]
{.gcsafe, raises: [Defect,RlpError].} =
## ..
let stats = getFn.hexaryInspectTrie(
rootKey, nodes.mapIt(it.partialPath), resumeCtx, healInspectionBatch)
if stats.stopped:
return err(TrieLoopAlert)
ok(stats)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc subTriesFromPartialPaths*(
getFn: HexaryGetFn; ## Abstract database access
stateRoot: Hash256; ## Start of hexary trie
batch: SnapRangeBatchRef; ## Healing data support
nodesMissingMaxLen = high(int); ## Max length of `nodes.missing`
): Future[Result[void,HexaryError]]
{.async.} =
## Starting with a given set of potentially dangling account nodes
## `checkNodes`, this set is filtered and processed. The outcome is
## fed back to the vey same list `checkNodes`
# Process might be expensive, so only a single instance is allowed to run it
if batch.lockTriePerusal:
return err(TrieIsLockedForPerusal)
batch.lockTriePerusal = true
let
rootKey = stateRoot.to(NodeKey)
var
error: HexaryError
count = 0 # for logging
start = Moment.now() # for logging
block errorWhenOutside:
try:
while batch.nodes.missing.len < nodesMissingMaxLen:
# Inspect hexary trie for dangling nodes
let rc = getFn.doInspect(rootKey, batch.nodes.check, batch.resumeCtx)
if rc.isErr:
error = rc.error
break errorWhenOutside
count.inc
# Update context for async threading environment
batch.resumeCtx = rc.value.resumeCtx
batch.nodes.check.setLen(0)
# Collect result
batch.nodes.missing = batch.nodes.missing & rc.value.dangling
# Done unless there is some resumption context
if rc.value.resumeCtx.isNil:
break
when extraTraceMessages:
trace logTxt "inspection wait", count,
elapsed=(Moment.now()-start),
sleep=healInspectionBatchWaitNanoSecs,
nodesMissingLen=batch.nodes.missing.len, nodesMissingMaxLen,
resumeCtxLen = batch.resumeCtx.hddCtx.len
# Allow async task switch and continue. Note that some other task might
# steal some of the `nodes.missing` var argument.
await sleepAsync healInspectionBatchWaitNanoSecs.nanoseconds
batch.lockTriePerusal = false
return ok()
except RlpError:
error = RlpEncoding
batch.nodes.missing = batch.nodes.missing & batch.resumeCtx.to(seq[NodeSpecs])
batch.resumeCtx = nil
batch.lockTriePerusal = false
return err(error)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -140,15 +140,16 @@ proc uncoveredEnvelopes(
var decomposed = "n/a"
let rc = processed.hexaryEnvelopeDecompose(rootKey, getFn)
if rc.isOk:
# Remove non-allocated nodes
# Return allocated nodes only
result = rc.value.filterIt(0 < it.nodeKey.ByteArray32.getFn().len)
when extraTraceMessages:
decomposed = rc.value.toPC
when extraTraceMessages:
trace logTxt "uncovered envelopes", processed, nProcessed=processed.chunks,
decomposed, nResult=result.len, result=result.toPC
trace logTxt "unprocessed envelopes", processed,
nProcessed=processed.chunks, decomposed,
nResult=result.len, result=result.toPC
proc otherProcessedRanges(
@ -233,17 +234,17 @@ proc swapIn(
merged += processed.merge iv # Import range as processed
unprocessed.reduce iv # No need to re-fetch
when extraTraceMessages:
trace logTxt "inherited ranges", lapCount, nCheckNodes=checkNodes.len,
merged=((merged.to(float) / (2.0^256)).toPC(3)),
allMerged=((allMerged.to(float) / (2.0^256)).toPC(3))
if merged == 0: # Loop control
break
lapCount.inc
allMerged += merged # Statistics, logging
when extraTraceMessages:
trace logTxt "inherited ranges", lapCount, nCheckNodes=checkNodes.len,
merged=((merged.to(float) / (2.0^256)).toPC(3)),
allMerged=((allMerged.to(float) / (2.0^256)).toPC(3))
# End while()
(swappedIn,lapCount)
@ -326,15 +327,6 @@ proc swapInAccounts*(
nLaps
proc swapInAccounts*(
buddy: SnapBuddyRef; # Worker peer
env: SnapPivotRef; # Current pivot environment
loopMax = 100; # Prevent from looping too often
): int =
## Variant of `swapInAccounts()`
buddy.ctx.swapInAccounts(env, loopMax)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -15,7 +15,7 @@ import
../../db/select_backend,
../sync_desc,
./worker/com/com_error,
./worker/db/[hexary_desc, snapdb_desc, snapdb_pivot],
./worker/db/[snapdb_desc, snapdb_pivot],
./worker/ticker,
./range_desc
@ -41,7 +41,6 @@ type
## range + healing support.
accKey*: NodeKey ## Owner account
slots*: SnapRangeBatchRef ## slots to fetch, nil => all slots
inherit*: bool ## mark this trie seen already
SnapTodoRanges* = array[2,NodeTagRangeSet]
## Pair of sets of ``unprocessed`` node ranges that need to be fetched and
@ -50,30 +49,22 @@ type
## This data structure is used for coordinating peers that run quasi
## parallel.
SnapTodoNodes* = object
## Pair of node lists subject to swap-in and healing
check*: seq[NodeSpecs] ## Existing nodes, sub-trie unknown
missing*: seq[NodeSpecs] ## Top ref for sub-tries to be healed
SnapRangeBatchRef* = ref object
## `NodeTag` ranges to fetch, healing support
unprocessed*: SnapTodoRanges ## Range of slots to be fetched
processed*: NodeTagRangeSet ## Node ranges definitely processed
nodes*: SnapTodoNodes ## Single nodes to double check
resumeCtx*: TrieNodeStatCtxRef ## State for resuming trie inpection
lockTriePerusal*: bool ## Only one process at a time
SnapPivotRef* = ref object
## Per-state root cache for particular snap data environment
stateHeader*: BlockHeader ## Pivot state, containg state root
# Accounts download
# Accounts download coverage
fetchAccounts*: SnapRangeBatchRef ## Set of accounts ranges to fetch
healThresh*: float ## Start healing when fill factor reached
# Storage slots download
fetchStorageFull*: SnapSlotsQueue ## Fetch storage trie for these accounts
fetchStoragePart*: SnapSlotsQueue ## Partial storage trie to com[plete
parkedStorage*: HashSet[NodeKey] ## Storage batch items in use
storageDone*: bool ## Done with storage, block sync next
# Info
@ -147,7 +138,7 @@ proc pivotAccountsCoverage*(ctx: SnapCtxRef): float =
proc init*(q: var SnapTodoRanges) =
## Populate node range sets with maximal range in the first range set. This
## kind of pair or interval sets is manages as follows:
## kind of pair or interval sets is managed as follows:
## * As long as possible, fetch and merge back intervals on the first set.
## * If the first set is empty and some intervals are to be fetched, swap
## first and second interval lists.
@ -157,6 +148,11 @@ proc init*(q: var SnapTodoRanges) =
q[1] = NodeTagRangeSet.init()
discard q[0].merge(low(NodeTag),high(NodeTag))
proc clear*(q: var SnapTodoRanges) =
## Reset argument range sets empty.
q[0].clear()
q[1].clear()
proc merge*(q: var SnapTodoRanges; iv: NodeTagRange) =
## Unconditionally merge the node range into the account ranges list.
@ -221,81 +217,6 @@ proc verify*(q: var SnapTodoRanges): bool =
return false
true
# ------------------------------------------------------------------------------
# Public helpers: SlotsQueue
# ------------------------------------------------------------------------------
proc merge*(q: var SnapSlotsQueue; kvp: SnapSlotsQueuePair) =
## Append/prepend a queue item record into the batch queue.
let
reqKey = kvp.key
rc = q.eq(reqKey)
if rc.isErr:
# Append to list
discard q.append(reqKey, kvp.data)
else:
# Entry exists already
let qData = rc.value
if not qData.slots.isNil:
# So this entry is not maximal and can be extended
if kvp.data.slots.isNil:
# Remove restriction for this entry and move it to the right end
qData.slots = nil
discard q.lruFetch reqKey
else:
# Merge argument intervals into target set
for ivSet in kvp.data.slots.unprocessed:
for iv in ivSet.increasing:
qData.slots.unprocessed.reduce iv
proc merge*(q: var SnapSlotsQueue; fetchReq: AccountSlotsHeader) =
## Append/prepend a slot header record into the batch queue. If there is
## a range merger, the argument range will be sortred in a way so that it
## is processed separately with highest priority.
let
reqKey = fetchReq.storageRoot
rc = q.eq(reqKey)
if rc.isOk:
# Entry exists already
let qData = rc.value
if not qData.slots.isNil:
# So this entry is not maximal and can be extended
if fetchReq.subRange.isNone:
# Remove restriction for this entry and move it to the right end
qData.slots = nil
discard q.lruFetch reqKey
else:
# Merge argument interval into target separated from the already
# existing sets (note that this works only for the last set)
for iv in qData.slots.unprocessed[0].increasing:
# Move all to second set
discard qData.slots.unprocessed[1].merge iv
# Clear first set and add argument range
qData.slots.unprocessed[0].clear()
qData.slots.unprocessed.merge fetchReq.subRange.unsafeGet
elif fetchReq.subRange.isNone:
# Append full range to the list
discard q.append(reqKey, SnapSlotsQueueItemRef(
accKey: fetchReq.accKey))
else:
# Partial range, add healing support and interval
var unprocessed = [NodeTagRangeSet.init(), NodeTagRangeSet.init()]
discard unprocessed[0].merge(fetchReq.subRange.unsafeGet)
discard q.append(reqKey, SnapSlotsQueueItemRef(
accKey: fetchReq.accKey,
slots: SnapRangeBatchRef(
unprocessed: unprocessed,
processed: NodeTagRangeSet.init())))
proc merge*(
q: var SnapSlotsQueue;
reqList: openArray[SnapSlotsQueuePair|AccountSlotsHeader]) =
## Variant fof `merge()` for a list argument
for w in reqList:
q.merge w
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------