Snap sync refactor accounts healing (#1392)

* Relocated mothballing (i.e. swap-in preparation) logic

details:
  Mothballing was previously tested & started after downloading
  account ranges in `range_fetch_accounts`.

  Whenever current download or healing stops because of a pivot change,
  swap-in preparation is needed (otherwise some storage slots may get
  lost when swap-in takes place.)

  Also, `execSnapSyncAction()` has been moved back to `pivot_helper`.

* Reorganised source file directories

details:
  Grouped pivot focused modules into `pivot` directory

* Renamed `checkNodes`, `sickSubTries` as `nodes.check`, `nodes.missing`

why:
  Both lists are typically used together as pair. Renaming `sickSubTries`
  reflects moving away from a healing centric view towards a swap-in
  attitude.

* Multi times coverage recording

details:
  Per pivot account ranges are accumulated into coverage range set. This
  set fill eventually contain a singe range of account hashes [0..2^256]
  which amounts to 100% capacity.

  A counter has been added that is incremented whenever max capacity is
  reached. The accumulated range is then reset to empty.

  The effect of this setting is that the coverage can be evenly duplicated.
  So 200% would not accumulate on a particular region.

* Update range length comparisons (mod 2^256)

why:
  A range interval can have sizes 1..2^256 as it cannot be empty by
  definition. The number of points in a range intervals set can have
  0..2^256 points. As the scalar range is a residue class modulo 2^256,
  the residue class 0 means length 2^256 for a range interval, but can
  be 0 or 2^256 for the number of points in a range intervals set.

* Generalised `hexaryEnvelopeDecompose()`

details:
  Compile the complement of the union of some (processed) intervals and
  express this complement as a list of envelopes of sub-tries.

  This facility is directly applicable to swap-in book-keeping.

* Re-factor `swapIn()`

why:
  Good idea but baloney implementation. The main algorithm is based on
  the generalised version of `hexaryEnvelopeDecompose()` which has been
  derived from this implementation.

* Refactor `healAccounts()` using `hexaryEnvelopeDecompose()` as main driver

why:
  Previously, the hexary trie was searched recursively for dangling nodes
  which has a poor worst case performance already when the trie  is
  reasonably populated.

  The function `hexaryEnvelopeDecompose()` is a magnitude faster because
  it does not peruse existing sub-tries in order to find missing nodes
  although result is not fully compatible with the previous function.

  So recursive search is used in a limited mode only when the decomposer
  will not deliver a useful result.

* Logging & maintenance fixes

details:
  Preparation for abandoning buddy-global healing variables `node`,
  `resumeCtx`, and `lockTriePerusal`. These variable are trie-perusal
  centric which will be run on the back burner in favour of
  `hexaryEnvelopeDecompose()` which is used for accounts healing already.
This commit is contained in:
Jordan Hrycaj 2022-12-19 21:22:09 +00:00 committed by GitHub
parent 05ac755e84
commit bd42ebb193
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1123 additions and 1097 deletions

View File

@ -60,7 +60,7 @@ const
## If the range set is too much fragmented, no data will be saved and ## If the range set is too much fragmented, no data will be saved and
## restart has to perform from scratch or an earlier checkpoint. ## restart has to perform from scratch or an earlier checkpoint.
snapAccountsSaveStorageSlotsMax* = 10_000 snapAccountsSaveStorageSlotsMax* = 20_000
## Recovery data are stored if the oustanding storage slots to process do ## Recovery data are stored if the oustanding storage slots to process do
## not amount to more than this many entries. ## not amount to more than this many entries.
## ##
@ -101,7 +101,7 @@ const
## nodes to allow for a pseudo -task switch. ## nodes to allow for a pseudo -task switch.
healAccountsCoverageTrigger* = 0.999 healAccountsCoverageTrigger* = 1.3
## Apply accounts healing if the global snap download coverage factor ## Apply accounts healing if the global snap download coverage factor
## exceeds this setting. The global coverage factor is derived by merging ## exceeds this setting. The global coverage factor is derived by merging
## all account ranges retrieved for all pivot state roots (see ## all account ranges retrieved for all pivot state roots (see
@ -114,7 +114,7 @@ const
## serve a maximum number requests (rather than data.) ## serve a maximum number requests (rather than data.)
healAccountsPivotTriggerMinFactor* = 0.17 healAccountsPivotTriggerMinFactor* = 0.17
## Additional condition to meed before starting healing. The current ## Additional condition to meet before starting healing. The current
## pivot must have at least this much processed as recorded in the ## pivot must have at least this much processed as recorded in the
## `processed` ranges set. This is the minimim value (see below.) ## `processed` ranges set. This is the minimim value (see below.)
@ -178,7 +178,6 @@ const
static: static:
doAssert 1 < swapInAccountsPivotsMin doAssert 1 < swapInAccountsPivotsMin
doAssert healAccountsCoverageTrigger < 1.0 # larger values make no sense
doAssert snapStorageSlotsQuPrioThresh < snapAccountsSaveStorageSlotsMax doAssert snapStorageSlotsQuPrioThresh < snapAccountsSaveStorageSlotsMax
doAssert snapStorageSlotsFetchMax < healAccountsBatchFetchMax doAssert snapStorageSlotsFetchMax < healAccountsBatchFetchMax

View File

@ -280,32 +280,18 @@ proc leafRangePp*(a, b: NodeTag): string =
result &= ',' & $b result &= ',' & $b
result &= "]" result &= "]"
proc leafRangePp*(iv: NodeTagRange): string =
## Variant of `leafRangePp()`
leafRangePp(iv.minPt, iv.maxPt)
proc `$`*(a, b: NodeTag): string = proc `$`*(a, b: NodeTag): string =
## Prettyfied prototype ## Prettyfied prototype
leafRangePp(a,b) leafRangePp(a,b)
proc `$`*(iv: NodeTagRange): string = proc `$`*(iv: NodeTagRange): string =
leafRangePp(iv.minPt, iv.maxPt) leafRangePp iv
proc `$`*(n: NodeSpecs): string =
## Prints `(path,key,node-hash)`
let nHash = if n.data.len == 0: NodeKey.default
else: n.data.digestTo(NodeKey)
result = "("
if n.partialPath.len != 0:
result &= n.partialPath.toHex
result &= ","
if n.nodeKey != NodeKey.default:
result &= $n.nodeKey
if n.nodeKey != nHash:
result &= "(!)"
result &= ","
if nHash != NodeKey.default:
if n.nodeKey != nHash:
result &= $nHash
else:
result &= "ditto"
result &= ")"
proc dump*( proc dump*(
ranges: openArray[NodeTagRangeSet]; ranges: openArray[NodeTagRangeSet];

View File

@ -18,8 +18,7 @@ import
../../utils/prettify, ../../utils/prettify,
../misc/best_pivot, ../misc/best_pivot,
".."/[protocol, sync_desc], ".."/[protocol, sync_desc],
./worker/[heal_accounts, heal_storage_slots, pivot_helper, ./worker/[pivot, ticker],
range_fetch_accounts, range_fetch_storage_slots, ticker],
./worker/com/com_error, ./worker/com/com_error,
./worker/db/[hexary_desc, snapdb_desc, snapdb_pivot], ./worker/db/[hexary_desc, snapdb_desc, snapdb_pivot],
"."/[constants, range_desc, worker_desc] "."/[constants, range_desc, worker_desc]
@ -127,7 +126,7 @@ proc updateSinglePivot(buddy: SnapBuddyRef): Future[bool] {.async.} =
let rc = ctx.data.pivotTable.lastValue let rc = ctx.data.pivotTable.lastValue
if rc.isOk and rc.value.storageDone: if rc.isOk and rc.value.storageDone:
# No neede to change # No neede to change
if extraTraceMessages: when extraTraceMessages:
trace "No need to change snap pivot", peer, trace "No need to change snap pivot", peer,
pivot=("#" & $rc.value.stateHeader.blockNumber), pivot=("#" & $rc.value.stateHeader.blockNumber),
stateRoot=rc.value.stateHeader.stateRoot, stateRoot=rc.value.stateHeader.stateRoot,
@ -141,47 +140,6 @@ proc updateSinglePivot(buddy: SnapBuddyRef): Future[bool] {.async.} =
return true return true
proc execSnapSyncAction(
env: SnapPivotRef; # Current pivot environment
buddy: SnapBuddyRef; # Worker peer
) {.async.} =
## Execute a synchronisation run.
let
ctx = buddy.ctx
block:
# Clean up storage slots queue first it it becomes too large
let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
if snapStorageSlotsQuPrioThresh < nStoQu:
await buddy.rangeFetchStorageSlots(env)
if buddy.ctrl.stopped or env.archived:
return
if not env.pivotAccountsComplete():
await buddy.rangeFetchAccounts(env)
if buddy.ctrl.stopped or env.archived:
return
await buddy.rangeFetchStorageSlots(env)
if buddy.ctrl.stopped or env.archived:
return
if env.pivotAccountsHealingOk(ctx):
await buddy.healAccounts(env)
if buddy.ctrl.stopped or env.archived:
return
# Some additional storage slots might have been popped up
await buddy.rangeFetchStorageSlots(env)
if buddy.ctrl.stopped or env.archived:
return
# Don't bother with storage slots healing before accounts healing takes
# place. This saves communication bandwidth. The pivot might change soon,
# anyway.
if env.pivotAccountsHealingOk(ctx):
await buddy.healStorageSlots(env)
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Public start/stop and admin functions # Public start/stop and admin functions
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
@ -327,20 +285,26 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} =
when extraTraceMessages: when extraTraceMessages:
block: block:
let let
nCheckNodes = env.fetchAccounts.checkNodes.len
nSickSubTries = env.fetchAccounts.sickSubTries.len
nAccounts = env.nAccounts nAccounts = env.nAccounts
nSlotLists = env.nSlotLists nSlotLists = env.nSlotLists
processed = env.fetchAccounts.processed.fullFactor.toPC(2) processed = env.fetchAccounts.processed.fullFactor.toPC(2)
nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
accHealThresh = env.healThresh.toPC(2) accHealThresh = env.healThresh.toPC(2)
trace "Multi sync runner", peer, pivot, nAccounts, nSlotLists, processed, trace "Multi sync runner", peer, pivot, nAccounts, nSlotLists, processed,
nStoQu, accHealThresh, nCheckNodes, nSickSubTries nStoQu, accHealThresh
# This one is the syncing work horse which downloads the database # This one is the syncing work horse which downloads the database
await env.execSnapSyncAction(buddy) await env.execSnapSyncAction(buddy)
if env.archived: if env.archived:
let
peer = buddy.peer
nAccounts = env.nAccounts
nSlotLists = env.nSlotLists
when extraTraceMessages:
trace "Mothballing", peer, pivot=("#" & $env.stateHeader.blockNumber),
nAccounts=env.nAccounts, nSlotLists=env.nSlotLists
env.pivotMothball()
return # pivot has changed return # pivot has changed
block: block:

View File

@ -43,8 +43,12 @@
## ##
## * ``iv`` is a range of leaf node paths (of type ``NodeTagRange``) ## * ``iv`` is a range of leaf node paths (of type ``NodeTagRange``)
## ##
## and assume further that for `iv` there are left and right *boundary proofs* ## and assume further that
## in the database (e.g. as downloaded via the `snap/1` protocol.) ##
## * ``partialPath`` points to an allocated node
##
## * for `iv` there are left and right *boundary proofs in the database
## (e.g. as downloaded via the `snap/1` protocol.)
## ##
## The decomposition ## The decomposition
## ^^^^^^^^^^^^^^^^^ ## ^^^^^^^^^^^^^^^^^
@ -155,7 +159,7 @@ proc padPartialPath(pfx: NibblesSeq; dblNibble: byte): NodeKey =
(addr result.ByteArray32[0]).copyMem(unsafeAddr bytes[0], bytes.len) (addr result.ByteArray32[0]).copyMem(unsafeAddr bytes[0], bytes.len)
proc decomposeLeft( proc doDecomposeLeft(
envPt: RPath|XPath; envPt: RPath|XPath;
ivPt: RPath|XPath; ivPt: RPath|XPath;
): Result[seq[NodeSpecs],HexaryError] = ): Result[seq[NodeSpecs],HexaryError] =
@ -205,7 +209,7 @@ proc decomposeLeft(
ok(collect) ok(collect)
proc decomposeRight( proc doDecomposeRight(
envPt: RPath|XPath; envPt: RPath|XPath;
ivPt: RPath|XPath; ivPt: RPath|XPath;
): Result[seq[NodeSpecs],HexaryError] = ): Result[seq[NodeSpecs],HexaryError] =
@ -239,23 +243,19 @@ proc decomposeRight(
ok(collect) ok(collect)
proc decomposeImpl( proc decomposeLeftImpl(
partialPath: Blob; # Hex encoded partial path env: NodeTagRange; # Envelope for some partial path
rootKey: NodeKey; # State root rootKey: NodeKey; # State root
iv: NodeTagRange; # Proofed range of leaf paths iv: NodeTagRange; # Proofed range of leaf paths
db: HexaryGetFn|HexaryTreeDbRef; # Database abstraction db: HexaryGetFn|HexaryTreeDbRef; # Database abstraction
): Result[seq[NodeSpecs],HexaryError] ): Result[seq[NodeSpecs],HexaryError]
{.gcsafe, raises: [Defect,RlpError,KeyError].} = {.gcsafe, raises: [Defect,RlpError,KeyError].} =
## Database agnostic implementation of `hexaryEnvelopeDecompose()`. ## Database agnostic implementation of `hexaryEnvelopeDecompose()`.
let env = partialPath.hexaryEnvelope
if iv.maxPt < env.minPt or env.maxPt < iv.minPt:
return err(DecomposeDisjuct) # empty result
var nodeSpex: seq[NodeSpecs] var nodeSpex: seq[NodeSpecs]
# So ranges do overlap. The case that the `partialPath` envelope is fully # So ranges do overlap. The case that the `partialPath` envelope is fully
# contained in `iv` results in `@[]` which is implicitely handled by # contained in `iv` results in `@[]` which is implicitely handled by
# non-matching any of the cases, below. # non-matching of the below if clause.
if env.minPt < iv.minPt: if env.minPt < iv.minPt:
let let
envPt = env.minPt.hexaryPath(rootKey, db) envPt = env.minPt.hexaryPath(rootKey, db)
@ -266,11 +266,23 @@ proc decomposeImpl(
return err(rc.error) return err(rc.error)
rc.value rc.value
block: block:
let rc = envPt.decomposeLeft ivPt let rc = envPt.doDecomposeLeft ivPt
if rc.isErr: if rc.isErr:
return err(rc.error) return err(rc.error)
nodeSpex &= rc.value nodeSpex &= rc.value
ok(nodeSpex)
proc decomposeRightImpl(
env: NodeTagRange; # Envelope for some partial path
rootKey: NodeKey; # State root
iv: NodeTagRange; # Proofed range of leaf paths
db: HexaryGetFn|HexaryTreeDbRef; # Database abstraction
): Result[seq[NodeSpecs],HexaryError]
{.gcsafe, raises: [Defect,RlpError,KeyError].} =
## Database agnostic implementation of `hexaryEnvelopeDecompose()`.
var nodeSpex: seq[NodeSpecs]
if iv.maxPt < env.maxPt: if iv.maxPt < env.maxPt:
let let
envPt = env.maxPt.hexaryPath(rootKey, db) envPt = env.maxPt.hexaryPath(rootKey, db)
@ -280,7 +292,7 @@ proc decomposeImpl(
return err(rc.error) return err(rc.error)
rc.value rc.value
block: block:
let rc = envPt.decomposeRight ivPt let rc = envPt.doDecomposeRight ivPt
if rc.isErr: if rc.isErr:
return err(rc.error) return err(rc.error)
nodeSpex &= rc.value nodeSpex &= rc.value
@ -313,8 +325,10 @@ proc hexaryEnvelopeUniq*(
{.gcsafe, raises: [Defect,KeyError].} = {.gcsafe, raises: [Defect,KeyError].} =
## Sort and simplify a list of partial paths by sorting envelopes while ## Sort and simplify a list of partial paths by sorting envelopes while
## removing nested entries. ## removing nested entries.
var tab: Table[NodeTag,(Blob,bool)] if partialPaths.len < 2:
return partialPaths.toSeq
var tab: Table[NodeTag,(Blob,bool)]
for w in partialPaths: for w in partialPaths:
let iv = w.hexaryEnvelope let iv = w.hexaryEnvelope
tab[iv.minPt] = (w,true) # begin entry tab[iv.minPt] = (w,true) # begin entry
@ -323,8 +337,8 @@ proc hexaryEnvelopeUniq*(
# When sorted, nested entries look like # When sorted, nested entries look like
# #
# 123000000.. (w0, true) # 123000000.. (w0, true)
# 123400000.. (w1, true) # 123400000.. (w1, true) <--- nested
# 1234fffff.. (, false) # 1234fffff.. (, false) <--- nested
# 123ffffff.. (, false) # 123ffffff.. (, false)
# ... # ...
# 777000000.. (w2, true) # 777000000.. (w2, true)
@ -345,8 +359,10 @@ proc hexaryEnvelopeUniq*(
{.gcsafe, raises: [Defect,KeyError].} = {.gcsafe, raises: [Defect,KeyError].} =
## Variant of `hexaryEnvelopeUniq` for sorting a `NodeSpecs` list by ## Variant of `hexaryEnvelopeUniq` for sorting a `NodeSpecs` list by
## partial paths. ## partial paths.
var tab: Table[NodeTag,(NodeSpecs,bool)] if nodes.len < 2:
return nodes.toSeq
var tab: Table[NodeTag,(NodeSpecs,bool)]
for w in nodes: for w in nodes:
let iv = w.partialPath.hexaryEnvelope let iv = w.partialPath.hexaryEnvelope
tab[iv.minPt] = (w,true) # begin entry tab[iv.minPt] = (w,true) # begin entry
@ -371,9 +387,17 @@ proc hexaryEnvelopeTouchedBy*(
## returns the complete set of intervals from the argument set `rangeSet` ## returns the complete set of intervals from the argument set `rangeSet`
## that have a common point with the envelope (i.e. they are non-disjunct to ## that have a common point with the envelope (i.e. they are non-disjunct to
## the envelope.) ## the envelope.)
result = NodeTagRangeSet.init() ##
## Note that this function always returns a new set (which might be equal to
## the argument set `rangeSet`.)
let probe = partialPath.hexaryEnvelope let probe = partialPath.hexaryEnvelope
# `probe.len==0`(mod 2^256) => `probe==[0,high]` as `probe` cannot be empty
if probe.len == 0:
return rangeSet.clone
result = NodeTagRangeSet.init() # return empty set unless coverage
if 0 < rangeSet.covered probe: if 0 < rangeSet.covered probe:
# Find an interval `start` that starts before the `probe` interval. # Find an interval `start` that starts before the `probe` interval.
# Preferably, this interval is the rightmost one starting before `probe`. # Preferably, this interval is the rightmost one starting before `probe`.
@ -424,6 +448,8 @@ proc hexaryEnvelopeTouchedBy*(
discard result.merge w discard result.merge w
elif probe.maxPt < w.minPt: elif probe.maxPt < w.minPt:
break # all the `w` following will be disjuct, too break # all the `w` following will be disjuct, too
# End if
proc hexaryEnvelopeTouchedBy*( proc hexaryEnvelopeTouchedBy*(
rangeSet: NodeTagRangeSet; # Set of intervals (aka ranges) rangeSet: NodeTagRangeSet; # Set of intervals (aka ranges)
@ -454,7 +480,9 @@ proc hexaryEnvelopeDecompose*(
## search may be amended to ignore nodes the envelope of is fully contained ## search may be amended to ignore nodes the envelope of is fully contained
## in some range `iv`. For a fully allocated hexary trie, there will be at ## in some range `iv`. For a fully allocated hexary trie, there will be at
## least one sub-trie of length *N* with leafs not in `iv`. So the number ## least one sub-trie of length *N* with leafs not in `iv`. So the number
## of nodes visited is *O(16^N)* for some *N* at most 63. ## of nodes visited is *O(16^N)* for some *N* at most 63 (note that *N*
## itself is *O(log M)* where M is the size of the leaf elements *M*, and
## *O(16^N)* = *O(M)*.)
## ##
## The function `hexaryEnvelopeDecompose()` take the left or rightmost leaf ## The function `hexaryEnvelopeDecompose()` take the left or rightmost leaf
## path from `iv`, calculates a chain length *N* of nodes from the state ## path from `iv`, calculates a chain length *N* of nodes from the state
@ -478,8 +506,24 @@ proc hexaryEnvelopeDecompose*(
## `hexaryEnvelopeDecompose()` is a fast one and `hexaryInspect()` the ## `hexaryEnvelopeDecompose()` is a fast one and `hexaryInspect()` the
## thorough one of last resort. ## thorough one of last resort.
## ##
let env = partialPath.hexaryEnvelope
if iv.maxPt < env.minPt or env.maxPt < iv.minPt:
return err(DecomposeDisjunct) # empty result
noRlpErrorOops("in-memory hexaryEnvelopeDecompose"): noRlpErrorOops("in-memory hexaryEnvelopeDecompose"):
return partialPath.decomposeImpl(rootKey, iv, db) let left = block:
let rc = env.decomposeLeftImpl(rootKey, iv, db)
if rc.isErr:
return rc
rc.value
let right = block:
let rc = env.decomposeRightImpl(rootKey, iv, db)
if rc.isErr:
return rc
rc.value
return ok(left & right)
# Notreached
proc hexaryEnvelopeDecompose*( proc hexaryEnvelopeDecompose*(
partialPath: Blob; # Hex encoded partial path partialPath: Blob; # Hex encoded partial path
@ -488,9 +532,155 @@ proc hexaryEnvelopeDecompose*(
getFn: HexaryGetFn; # Database abstraction getFn: HexaryGetFn; # Database abstraction
): Result[seq[NodeSpecs],HexaryError] ): Result[seq[NodeSpecs],HexaryError]
{.gcsafe, raises: [Defect,RlpError].} = {.gcsafe, raises: [Defect,RlpError].} =
## Variant of `decompose()` for persistent database. ## Variant of `hexaryEnvelopeDecompose()` for persistent database.
let env = partialPath.hexaryEnvelope
if iv.maxPt < env.minPt or env.maxPt < iv.minPt:
return err(DecomposeDisjunct) # empty result
noKeyErrorOops("persistent hexaryEnvelopeDecompose"): noKeyErrorOops("persistent hexaryEnvelopeDecompose"):
return partialPath.decomposeImpl(rootKey, iv, getFn) let left = block:
let rc = env.decomposeLeftImpl(rootKey, iv, getFn)
if rc.isErr:
return rc
rc.value
let right = block:
let rc = env.decomposeRightImpl(rootKey, iv, getFn)
if rc.isErr:
return rc
rc.value
return ok(left & right)
# Notreached
proc hexaryEnvelopeDecompose*(
partialPath: Blob; # Hex encoded partial path
ranges: NodeTagRangeSet; # To be complemented
rootKey: NodeKey; # State root
db: HexaryGetFn|HexaryTreeDbRef; # Database abstraction
): Result[seq[NodeSpecs],HexaryError] =
## Variant of `hexaryEnvelopeDecompose()` for an argument set `ranges` of
## intervals rather than a single one.
##
## Given that for the arguement `partialPath` there is an allocated node,
## and all intervals in the `ranges` argument are boundary proofed, then
## this function compiles the complement of the union of the interval
## elements `ranges` relative to the envelope of the argument `partialPath`.
## The function expresses this complement as a list of envelopes of
## sub-tries. In other words, it finds a list `L` with
##
## * ``L`` is a list of (existing but not necessarily allocated) nodes.
##
## * The union ``U(L)`` of envelopes of elements of ``L`` is a subset of the
## envelope ``E(partialPath)`` of ``partialPath``.
##
## * ``U(L)`` has no common point with any interval of the set ``ranges``.
##
## * ``L`` is maximal in the sense that any node ``w`` which is prefixed by
## a node from ``E(partialPath)`` and with an envelope ``E(w)`` without
## common node for any interval of ``ranges`` is also prefixed by a node
## from ``L``.
##
## * The envelopes of the nodes in ``L`` are disjunct (i.e. the size of `L`
## is minimal.)
##
## The function fails if `E(partialPath)` is disjunct from any interval of
## `ranges`. The function returns an empty list if `E(partialPath)` overlaps
## with some interval from `ranges` but there exists no common nodes. Nodes
## that cause *RLP* decoding errors are ignored and will get lost.
##
## Note: Two intervals over the set of nodes might not be disjunct but
## nevertheless have no node in common simply fot the fact that thre
## are no such nodes in the database (with a path in the intersection
## of the two intervals.)
##
# Find all intervals from the set of `ranges` ranges that have a point
# in common with `partialPath`.
let touched = ranges.hexaryEnvelopeTouchedBy partialPath
if touched.chunks == 0:
return err(DecomposeDisjunct)
# Decompose the the complement of the `node` envelope off `iv` into
# envelopes/sub-tries.
let
startNode = NodeSpecs(partialPath: partialPath)
var
leftQueue: seq[NodeSpecs] # To be appended only in loop below
rightQueue = @[startNode] # To be replaced/modified in loop below
for iv in touched.increasing:
#
# For the interval `iv` and the list `rightQueue`, the following holds:
# * `iv` is larger (to the right) of its predecessor `iu` (if any)
# * all nodes `w` of the list `rightQueue` are larger than `iu` (if any)
#
# So collect all intervals to the left `iv` and keep going with the
# remainder to the right:
# ::
# before decomposing:
# v---------v v---------v v--------v -- right queue envelopes
# |-----------| -- iv
#
# after decomposing the right queue:
# v---v -- left queue envelopes
# v----v v--------v -- right queue envelopes
# |-----------| -- iv
#
var delayed: seq[NodeSpecs]
for n,w in rightQueue:
let env = w.hexaryEnvelope
if env.maxPt < iv.minPt:
leftQueue.add w # Envelope fully to the left of `iv`
continue
if iv.maxPt < env.minPt:
# All remaining entries are fullly to the right of `iv`.
delayed &= rightQueue[n ..< rightQueue.len]
# Node that `w` != `startNode` because otherwise `touched` would
# have been empty.
break
try:
block:
let rc = env.decomposeLeftImpl(rootKey, iv, db)
if rc.isOk:
leftQueue &= rc.value # Queue left side smaller than `iv`
block:
let rc = env.decomposeRightImpl(rootKey, iv, db)
if rc.isOk:
delayed &= rc.value # Queue right side for next lap
except RlpError, KeyError:
# Cannot decompose `w`, so just drop it
discard
# At this location in code, `delayed` can never contain `startNode` as it
# is decomosed in the algorithm above.
rightQueue = delayed
# End for() loop over `touched`
ok(leftQueue & rightQueue)
proc hexaryEnvelopeDecompose*(
node: NodeSpecs; # The envelope of which to be complemented
ranges: NodeTagRangeSet; # To be complemented
rootKey: NodeKey; # State root
db: HexaryGetFn|HexaryTreeDbRef; # Database abstraction
): Result[seq[NodeSpecs],HexaryError] =
## Variant of `hexaryEnvelopeDecompose()` for ranges and a `NodeSpecs`
## argument rather than a partial path.
node.partialPath.hexaryEnvelopeDecompose(ranges, rootKey, db)
proc hexaryEnvelopeDecompose*(
ranges: NodeTagRangeSet; # To be complemented
rootKey: NodeKey; # State root
db: HexaryGetFn|HexaryTreeDbRef; # Database abstraction
): Result[seq[NodeSpecs],HexaryError] =
## Variant of `hexaryEnvelopeDecompose()` for ranges and an implicit maximal
## partial path envelope.
## argument rather than a partial path.
@[0.byte].hexaryEnvelopeDecompose(ranges, rootKey, db)
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# End # End

View File

@ -39,7 +39,7 @@ type
# envelope # envelope
DecomposeDegenerated DecomposeDegenerated
DecomposeDisjuct DecomposeDisjunct
# import # import
DifferentNodeValueExists DifferentNodeValueExists

View File

@ -48,8 +48,8 @@ proc accountsCtx(
"nAccounts=" & $env.nAccounts & "," & "nAccounts=" & $env.nAccounts & "," &
("covered=" & env.fetchAccounts.processed.fullFactor.toPC(0) & "/" & ("covered=" & env.fetchAccounts.processed.fullFactor.toPC(0) & "/" &
ctx.data.coveredAccounts.fullFactor.toPC(0)) & "," & ctx.data.coveredAccounts.fullFactor.toPC(0)) & "," &
"nCheckNodes=" & $env.fetchAccounts.checkNodes.len & "," & "nNodesCheck=" & $env.fetchAccounts.nodes.check.len & "," &
"nSickSubTries=" & $env.fetchAccounts.sickSubTries.len & "}" "nNodesMissing=" & $env.fetchAccounts.nodes.missing.len & "}"
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Private helpers # Private helpers
@ -76,8 +76,8 @@ proc storageSlotsCtx(
if not slots.isNil: if not slots.isNil:
result &= "" & result &= "" &
"covered=" & slots.processed.fullFactor.toPC(0) & "covered=" & slots.processed.fullFactor.toPC(0) &
"nCheckNodes=" & $slots.checkNodes.len & "," & "nNodesCheck=" & $slots.nodes.check.len & "," &
"nSickSubTries=" & $slots.sickSubTries.len "nNodesMissing=" & $slots.nodes.missing.len
result &= "}" result &= "}"
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------

View File

@ -1,461 +0,0 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Heal accounts DB:
## =================
##
## Flow chart for healing algorithm
## --------------------------------
## ::
## START
## |
## | +--------------------------------+
## | | |
## | v |
## | <inspect-trie> |
## | | |
## | | +-----------------------+ |
## | | | +------------------+ | |
## | | | | | | |
## v v v v | | |
## {missing-nodes} | | |
## | | | |
## v | | |
## <get-trie-nodes-via-snap/1> ---+ | |
## | | |
## v | |
## <merge-nodes-into-database> -----+ |
## | | |
## v v |
## {leaf-nodes} {check-nodes} -------+
## |
## v \
## <update-accounts-batch> |
## | | similar actions for single leaf
## v \ nodes that otherwise would be
## {storage-roots} / done for account hash ranges in
## | | the function storeAccounts()
## v |
## <update-storage-processor-batch> /
##
## Legend:
## * `<..>`: some action, process, etc.
## * `{missing-nodes}`: list implemented as `env.fetchAccounts.sickSubTries`
## * `{leaf-nodes}`: list is optimised out
## * `{check-nodes}`: list implemented as `env.fetchAccounts.checkNodes`
## * `{storage-roots}`: list implemented as pair of queues
## `env.fetchStorageFull` and `env.fetchStoragePart`
##
## Discussion of flow chart
## ------------------------
## * If there are no missing nodes, START proceeds with the `<inspect-trie>`
## process.
##
## * Nodes of the `{missing-nodes}` list are fetched from the network and
## merged into the persistent accounts trie database.
## + Successfully merged non-leaf nodes are collected in the `{check-nodes}`
## list which is fed back into the `<inspect-trie>` process.
## + Successfully merged leaf nodes are processed as single entry accounts
## node ranges.
##
## * Input nodes for `<inspect-trie>` are checked for dangling child node
## links which in turn are collected as output.
##
## * If there is a problem with a node travelling from the source list
## `{missing-nodes}` towards either target list `{leaf-nodes}` or
## `{check-nodes}`, this problem node will fed back to the `{missing-nodes}`
## source list.
##
## * In order to avoid double processing, the `{missing-nodes}` list is
## regularly checked for whether nodes are still missing or some other
## process has done the magic work of merging some of then into the
## trie database.
##
## Competing with other trie algorithms
## ------------------------------------
## * Healing runs (semi-)parallel to processing *GetAccountRange* network
## messages from the `snap/1` protocol (see `storeAccounts()`). Considering
## network bandwidth, the *GetAccountRange* message processing is way more
## efficient in comparison with the healing algorithm as there are no
## intermediate trie nodes involved.
##
## * The healing algorithm visits all nodes of a complete trie unless it is
## stopped in between.
##
## * If a trie node is missing, it can be fetched directly by the healing
## algorithm or one can wait for another process to do the job. Waiting for
## other processes to do the job also applies to problem nodes (and vice
## versa.)
##
## * Network bandwidth can be saved if nodes are fetched by the more efficient
## *GetAccountRange* message processing (if that is available.) This suggests
## that fetching missing trie nodes by the healing algorithm should kick in
## very late when the trie database is nearly complete.
##
## * Healing applies to a hexary trie database associated with the currently
## latest *state root*, where tha latter may change occasionally. This
## suggests to start the healing algorithm very late at a time when most of
## the accounts have been updated by any *state root*, already. There is a
## good chance that the healing algorithm detects and activates account data
## from previous *state roots* that have not changed.
import
std/[sequtils, tables],
chronicles,
chronos,
eth/[common, p2p, trie/nibbles, trie/trie_defs, rlp],
stew/[interval_set, keyed_queue],
../../../utils/prettify,
../../sync_desc,
".."/[constants, range_desc, worker_desc],
./com/[com_error, get_trie_nodes],
./db/[hexary_desc, hexary_error, snapdb_accounts],
"."/[sub_tries_helper, swap_in]
{.push raises: [Defect].}
logScope:
topics = "snap-heal"
const
extraTraceMessages = false or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private logging helpers
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"Accounts healing " & info
proc healingCtx(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): string =
let ctx = buddy.ctx
"{" &
"pivot=" & "#" & $env.stateHeader.blockNumber & "," &
"nAccounts=" & $env.nAccounts & "," &
("covered=" & env.fetchAccounts.processed.fullFactor.toPC(0) & "/" &
ctx.data.coveredAccounts.fullFactor.toPC(0)) & "," &
"nCheckNodes=" & $env.fetchAccounts.checkNodes.len & "," &
"nSickSubTries=" & $env.fetchAccounts.sickSubTries.len & "}"
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
template discardRlpError(info: static[string]; code: untyped) =
try:
code
except RlpError as e:
discard
template noExceptionOops(info: static[string]; code: untyped) =
try:
code
except Defect as e:
raise e
except Exception as e:
raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc reorgHealingState(
buddy: SnapBuddyRef;
env: SnapPivotRef;
) =
let
ctx = buddy.ctx
rootKey = env.stateHeader.stateRoot.to(NodeKey)
getFn = ctx.data.snapDb.getAccountFn
nCheckNodes0 = env.fetchAccounts.checkNodes.len
nSickSubTries0 = env.fetchAccounts.sickSubTries.len
nProcessed0 = env.fetchAccounts.processed.fullfactor.toPC(3)
# Reclassify nodes into existing/allocated and dangling ones
if buddy.swapInAccounts(env) == 0:
# Nothing to swap in, so force reclassification
noExceptionOops("reorgHealingState"):
var delayed: seq[NodeSpecs]
for node in env.fetchAccounts.sickSubTries:
if node.nodeKey.ByteArray32.getFn().len == 0:
delayed.add node # still subject to healing
else:
env.fetchAccounts.checkNodes.add node
env.fetchAccounts.sickSubTries = delayed
when extraTraceMessages:
let
nCheckNodes1 = env.fetchAccounts.checkNodes.len
nSickSubTries1 = env.fetchAccounts.sickSubTries.len
trace logTxt "sick nodes reclassified", nCheckNodes0, nSickSubTries0,
nCheckNodes1, nSickSubTries1, nProcessed0
proc updateMissingNodesList(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): Future[bool]
{.async.} =
## Starting with a given set of potentially dangling account nodes
## `checkNodes`, this set is filtered and processed. The outcome is
## fed back to the vey same list `checkNodes`
let
ctx = buddy.ctx
peer = buddy.peer
db = ctx.data.snapDb
let rc = await db.getAccountFn.subTriesFromPartialPaths(
env.stateHeader.stateRoot, # State root related to pivot
env.fetchAccounts, # Account download specs
snapRequestTrieNodesFetchMax) # Maxinmal datagram request size
if rc.isErr:
if rc.error == TrieIsLockedForPerusal:
trace logTxt "failed", peer,
ctx=buddy.healingCtx(env), error=rc.error
else:
error logTxt "failed => stop", peer,
ctx=buddy.healingCtx(env), error=rc.error
# Attempt to switch pivot, there is not much else one can do here
buddy.ctrl.zombie = true
return false
return true
proc getMissingNodesFromNetwork(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): Future[seq[NodeSpecs]]
{.async.} =
## Extract from `sickSubTries` the next batch of nodes that need
## to be merged it into the database
let
ctx = buddy.ctx
peer = buddy.peer
stateRoot = env.stateHeader.stateRoot
pivot = "#" & $env.stateHeader.blockNumber # for logging
nSickSubTries = env.fetchAccounts.sickSubTries.len
inxLeft = max(0, nSickSubTries - snapRequestTrieNodesFetchMax)
# There is no point in processing too many nodes at the same time. So leave
# the rest on the `sickSubTries` queue to be handled later.
let fetchNodes = env.fetchAccounts.sickSubTries[inxLeft ..< nSickSubTries]
env.fetchAccounts.sickSubTries.setLen(inxLeft)
# Initalise for `getTrieNodes()` for fetching nodes from the network
var
nodeKey: Table[Blob,NodeKey] # Temporary `path -> key` mapping
pathList: seq[seq[Blob]] # Function argument for `getTrieNodes()`
for w in fetchNodes:
pathList.add @[w.partialPath]
nodeKey[w.partialPath] = w.nodeKey
# Fetch nodes from the network. Note that the remainder of the
# `sickSubTries` list might be used by another process that runs
# semi-parallel.
let rc = await buddy.getTrieNodes(stateRoot, pathList, pivot)
if rc.isOk:
# Reset error counts for detecting repeated timeouts, network errors, etc.
buddy.data.errors.resetComError()
# Register unfetched missing nodes for the next pass
for w in rc.value.leftOver:
env.fetchAccounts.sickSubTries.add NodeSpecs(
partialPath: w[0],
nodeKey: nodeKey[w[0]])
return rc.value.nodes.mapIt(NodeSpecs(
partialPath: it.partialPath,
nodeKey: nodeKey[it.partialPath],
data: it.data))
# Restore missing nodes list now so that a task switch in the error checker
# allows other processes to access the full `sickSubTries` list.
env.fetchAccounts.sickSubTries = env.fetchAccounts.sickSubTries & fetchNodes
let
error = rc.error
ok = await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors)
when extraTraceMessages:
if ok:
trace logTxt "fetch nodes error => stop", peer,
ctx=buddy.healingCtx(env), error
else:
trace logTxt "fetch nodes error", peer,
ctx=buddy.healingCtx(env), error
return @[]
proc kvAccountLeaf(
buddy: SnapBuddyRef;
node: NodeSpecs;
env: SnapPivotRef;
): (bool,NodeKey,Account) =
## Re-read leaf node from persistent database (if any)
let
peer = buddy.peer
var
nNibbles = -1
discardRlpError("kvAccountLeaf"):
let
nodeRlp = rlpFromBytes node.data
prefix = (hexPrefixDecode node.partialPath)[1]
segment = (hexPrefixDecode nodeRlp.listElem(0).toBytes)[1]
nibbles = prefix & segment
nNibbles = nibbles.len
if nNibbles == 64:
let
data = nodeRlp.listElem(1).toBytes
nodeKey = nibbles.getBytes.convertTo(NodeKey)
accData = rlp.decode(data,Account)
return (true, nodeKey, accData)
when extraTraceMessages:
trace logTxt "non-leaf node path or corrupt data", peer,
ctx=buddy.healingCtx(env), nNibbles
proc registerAccountLeaf(
buddy: SnapBuddyRef;
accKey: NodeKey;
acc: Account;
env: SnapPivotRef;
) =
## Process single account node as would be done with an interval by
## the `storeAccounts()` function
let
peer = buddy.peer
pt = accKey.to(NodeTag)
# Register isolated leaf node
if 0 < env.fetchAccounts.processed.merge(pt,pt) :
env.nAccounts.inc
env.fetchAccounts.unprocessed.reduce(pt,pt)
discard buddy.ctx.data.coveredAccounts.merge(pt,pt)
# Update storage slots batch
if acc.storageRoot != emptyRlpHash:
env.fetchStorageFull.merge AccountSlotsHeader(
acckey: accKey,
storageRoot: acc.storageRoot)
# ------------------------------------------------------------------------------
# Private functions: do the healing for one round
# ------------------------------------------------------------------------------
proc accountsHealingImpl(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): Future[int]
{.async.} =
## Fetching and merging missing account trie database nodes. It returns the
## number of nodes fetched from the network, and -1 upon error.
let
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
# Update for changes since last visit
buddy.reorgHealingState(env)
if env.fetchAccounts.sickSubTries.len == 0:
# Traverse the hexary trie for more missing nodes. This call is expensive.
if await buddy.updateMissingNodesList(env):
# Check whether the trie is complete.
if env.fetchAccounts.sickSubTries.len == 0:
trace logTxt "complete", peer, ctx=buddy.healingCtx(env)
return 0 # nothing to do
# Get next batch of nodes that need to be merged it into the database
let nodeSpecs = await buddy.getMissingNodesFromNetwork(env)
if nodeSpecs.len == 0:
return 0
# Store nodes onto disk
let report = db.importRawAccountsNodes(peer, nodeSpecs)
if 0 < report.len and report[^1].slot.isNone:
# Storage error, just run the next lap (not much else that can be done)
error logTxt "error updating persistent database", peer,
ctx=buddy.healingCtx(env), nNodes=nodeSpecs.len, error=report[^1].error
env.fetchAccounts.sickSubTries = env.fetchAccounts.sickSubTries & nodeSpecs
return -1
# Filter out error and leaf nodes
var nLeafNodes = 0 # for logging
for w in report:
if w.slot.isSome: # non-indexed entries appear typically at the end, though
let inx = w.slot.unsafeGet
if w.error != NothingSerious or w.kind.isNone:
# error, try downloading again
env.fetchAccounts.sickSubTries.add nodeSpecs[inx]
elif w.kind.unsafeGet != Leaf:
# re-check this node
env.fetchAccounts.checkNodes.add nodeSpecs[inx]
else:
# Node has been stored, double check
let (isLeaf, key, acc) = buddy.kvAccountLeaf(nodeSpecs[inx], env)
if isLeaf:
# Update `uprocessed` registry, collect storage roots (if any)
buddy.registerAccountLeaf(key, acc, env)
nLeafNodes.inc
else:
env.fetchAccounts.checkNodes.add nodeSpecs[inx]
when extraTraceMessages:
trace logTxt "merged into database", peer,
ctx=buddy.healingCtx(env), nNodes=nodeSpecs.len, nLeafNodes
return nodeSpecs.len
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc healAccounts*(
buddy: SnapBuddyRef;
env: SnapPivotRef;
) {.async.} =
## Fetching and merging missing account trie database nodes.
let
ctx = buddy.ctx
peer = buddy.peer
when extraTraceMessages:
trace logTxt "started", peer, ctx=buddy.healingCtx(env)
var
nNodesFetched = 0
nFetchLoop = 0
# Stop after `healAccountsBatchFetchMax` nodes have been fetched
while nNodesFetched < healAccountsBatchFetchMax:
var nNodes = await buddy.accountsHealingImpl(env)
if nNodes <= 0:
break
nNodesFetched.inc(nNodes)
nFetchLoop.inc
when extraTraceMessages:
trace logTxt "job done", peer, ctx=buddy.healingCtx(env),
nNodesFetched, nFetchLoop, runState=buddy.ctrl.state
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -17,6 +17,8 @@ import
../../sync_desc, ../../sync_desc,
".."/[constants, range_desc, worker_desc], ".."/[constants, range_desc, worker_desc],
./db/[hexary_error, snapdb_accounts, snapdb_pivot], ./db/[hexary_error, snapdb_accounts, snapdb_pivot],
./pivot/[heal_accounts, heal_storage_slots,
range_fetch_accounts, range_fetch_storage_slots],
./ticker ./ticker
{.push raises: [Defect].} {.push raises: [Defect].}
@ -26,7 +28,6 @@ const
## Enable some asserts ## Enable some asserts
proc pivotAccountsHealingOk*(env: SnapPivotRef;ctx: SnapCtxRef): bool {.gcsafe.} proc pivotAccountsHealingOk*(env: SnapPivotRef;ctx: SnapCtxRef): bool {.gcsafe.}
proc pivotAccountsComplete*(env: SnapPivotRef): bool {.gcsafe.}
proc pivotMothball*(env: SnapPivotRef) {.gcsafe.} proc pivotMothball*(env: SnapPivotRef) {.gcsafe.}
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
@ -40,44 +41,21 @@ proc init(
) = ) =
## Returns a pair of account hash range lists with the full range of hashes ## Returns a pair of account hash range lists with the full range of hashes
## smartly spread across the mutually disjunct interval sets. ## smartly spread across the mutually disjunct interval sets.
batch.unprocessed.init() batch.unprocessed.init() # full range on the first set of the pair
batch.processed = NodeTagRangeSet.init() batch.processed = NodeTagRangeSet.init()
# Once applicable when the hexary trie is non-empty, healing is started on
# the full range of all possible accounts. So the partial path batch list
# is initialised with the empty partial path encoded as `@[0]` which refers
# to the first (typically `Branch`) node. The envelope of `@[0]` covers the
# maximum range of accounts.
#
# Note that `@[]` incidentally has the same effect as `@[0]` although it
# is formally no partial path.
batch.checkNodes.add NodeSpecs(
partialPath: @[0.byte],
nodeKey: stateRoot.to(NodeKey))
# Initialise accounts range fetch batch, the pair of `fetchAccounts[]` # Initialise accounts range fetch batch, the pair of `fetchAccounts[]`
# range sets. # range sets.
if ctx.data.coveredAccounts.isFull: if ctx.data.coveredAccounts.isFull:
# All of accounts hashes are covered by completed range fetch processes # All of accounts hashes are covered by completed range fetch processes
# for all pivot environments. Do a random split distributing the full # for all pivot environments. So reset covering and record full-ness level.
# accounts hash range across the pair of range sets. ctx.data.covAccTimesFull.inc
for _ in 0 .. 5: ctx.data.coveredAccounts.clear()
var nodeKey: NodeKey
ctx.data.rng[].generate(nodeKey.ByteArray32) # Deprioritise already processed ranges by moving it to the second set.
let top = nodeKey.to(NodeTag) for iv in ctx.data.coveredAccounts.increasing:
if low(NodeTag) < top and top < high(NodeTag): discard batch.unprocessed[0].reduce iv
# Move covered account ranges (aka intervals) to the second set. discard batch.unprocessed[1].merge iv
batch.unprocessed.merge NodeTagRange.new(low(NodeTag), top)
break
# Otherwise there is a full single range in `unprocessed[0]`
else:
# Not all account hashes are covered, yet. So keep the uncovered
# account hashes in the first range set, and the other account hashes
# in the second range set.
for iv in ctx.data.coveredAccounts.increasing:
# Move already processed account ranges (aka intervals) to the second set.
discard batch.unprocessed[0].reduce iv
discard batch.unprocessed[1].merge iv
when extraAsserts: when extraAsserts:
doAssert batch.unprocessed.verify doAssert batch.unprocessed.verify
@ -196,18 +174,15 @@ proc tickerStats*(
sSqSum += sLen * sLen sSqSum += sLen * sLen
let let
env = ctx.data.pivotTable.lastValue.get(otherwise = nil) env = ctx.data.pivotTable.lastValue.get(otherwise = nil)
accCoverage = ctx.data.coveredAccounts.fullFactor accCoverage = (ctx.data.coveredAccounts.fullFactor +
ctx.data.covAccTimesFull.float)
accFill = meanStdDev(uSum, uSqSum, count) accFill = meanStdDev(uSum, uSqSum, count)
var var
pivotBlock = none(BlockNumber) pivotBlock = none(BlockNumber)
stoQuLen = none(int) stoQuLen = none(int)
accStats = (0,0)
if not env.isNil: if not env.isNil:
pivotBlock = some(env.stateHeader.blockNumber) pivotBlock = some(env.stateHeader.blockNumber)
stoQuLen = some(env.fetchStorageFull.len + env.fetchStoragePart.len) stoQuLen = some(env.fetchStorageFull.len + env.fetchStoragePart.len)
accStats = (env.fetchAccounts.processed.chunks,
env.fetchAccounts.checkNodes.len +
env.fetchAccounts.sickSubTries.len)
TickerStats( TickerStats(
pivotBlock: pivotBlock, pivotBlock: pivotBlock,
@ -215,7 +190,7 @@ proc tickerStats*(
nAccounts: meanStdDev(aSum, aSqSum, count), nAccounts: meanStdDev(aSum, aSqSum, count),
nSlotLists: meanStdDev(sSum, sSqSum, count), nSlotLists: meanStdDev(sSum, sSqSum, count),
accountsFill: (accFill[0], accFill[1], accCoverage), accountsFill: (accFill[0], accFill[1], accCoverage),
nAccountStats: accStats, nAccountStats: env.fetchAccounts.processed.chunks,
nStorageQueue: stoQuLen) nStorageQueue: stoQuLen)
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
@ -226,8 +201,6 @@ proc pivotMothball*(env: SnapPivotRef) =
## Clean up most of this argument `env` pivot record and mark it `archived`. ## Clean up most of this argument `env` pivot record and mark it `archived`.
## Note that archived pivots will be checked for swapping in already known ## Note that archived pivots will be checked for swapping in already known
## accounts and storage slots. ## accounts and storage slots.
env.fetchAccounts.checkNodes.setLen(0)
env.fetchAccounts.sickSubTries.setLen(0)
env.fetchAccounts.unprocessed.init() env.fetchAccounts.unprocessed.init()
# Simplify storage slots queues by resolving partial slots into full list # Simplify storage slots queues by resolving partial slots into full list
@ -248,31 +221,67 @@ proc pivotMothball*(env: SnapPivotRef) =
env.archived = true env.archived = true
proc pivotAccountsComplete*(
env: SnapPivotRef; # Current pivot environment
): bool =
## Returns `true` if accounts are fully available for this this pivot.
env.fetchAccounts.processed.isFull
proc pivotAccountsHealingOk*( proc pivotAccountsHealingOk*(
env: SnapPivotRef; # Current pivot environment env: SnapPivotRef; # Current pivot environment
ctx: SnapCtxRef; # Some global context ctx: SnapCtxRef; # Some global context
): bool = ): bool =
## Returns `true` if accounts healing is enabled for this pivot. ## Returns `true` if accounts healing is enabled for this pivot.
## ##
if not env.pivotAccountsComplete(): # Only start accounts healing if there is some completion level, already.
# Only start accounts healing if there is some completion level, already. #
# # We check against the global coverage factor, i.e. a measure for how much
# We check against the global coverage factor, i.e. a measure for how much # of the total of all accounts have been processed. Even if the hexary trie
# of the total of all accounts have been processed. Even if the hexary trie # database for the current pivot state root is sparsely filled, there is a
# database for the current pivot state root is sparsely filled, there is a # good chance that it can inherit some unchanged sub-trie from an earlier
# good chance that it can inherit some unchanged sub-trie from an earlier # pivot state root download. The healing process then works like sort of
# pivot state root download. The healing process then works like sort of # glue.
# glue. if healAccountsCoverageTrigger <= ctx.pivotAccountsCoverage():
if healAccountsCoverageTrigger <= ctx.data.coveredAccounts.fullFactor: if env.healThresh <= env.fetchAccounts.processed.fullFactor:
# Ditto for pivot. return true
if env.healThresh <= env.fetchAccounts.processed.fullFactor:
return true
proc execSnapSyncAction*(
env: SnapPivotRef; # Current pivot environment
buddy: SnapBuddyRef; # Worker peer
) {.async.} =
## Execute a synchronisation run.
let
ctx = buddy.ctx
block:
# Clean up storage slots queue first it it becomes too large
let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
if snapStorageSlotsQuPrioThresh < nStoQu:
await buddy.rangeFetchStorageSlots(env)
if buddy.ctrl.stopped or env.archived:
return
if not env.fetchAccounts.processed.isFull:
await buddy.rangeFetchAccounts(env)
# Run at least one round fetching storage slosts even if the `archived`
# flag is set in order to keep the batch queue small.
if not buddy.ctrl.stopped:
await buddy.rangeFetchStorageSlots(env)
if buddy.ctrl.stopped or env.archived:
return
if env.pivotAccountsHealingOk(ctx):
await buddy.healAccounts(env)
if buddy.ctrl.stopped or env.archived:
return
# Some additional storage slots might have been popped up
await buddy.rangeFetchStorageSlots(env)
if buddy.ctrl.stopped or env.archived:
return
# Don't bother with storage slots healing before accounts healing takes
# place. This saves communication bandwidth. The pivot might change soon,
# anyway.
if env.pivotAccountsHealingOk(ctx):
await buddy.healStorageSlots(env)
proc saveCheckpoint*( proc saveCheckpoint*(
@ -329,7 +338,7 @@ proc recoverPivotFromCheckpoint*(
# Handle storage slots # Handle storage slots
let stateRoot = recov.state.header.stateRoot let stateRoot = recov.state.header.stateRoot
for w in recov.state.slotAccounts: for w in recov.state.slotAccounts:
let pt = NodeTagRange.new(w.to(NodeTag),w.to(NodeTag)) let pt = NodeTagRange.new(w.to(NodeTag),w.to(NodeTag)) # => `pt.len == 1`
if 0 < env.fetchAccounts.processed.covered(pt): if 0 < env.fetchAccounts.processed.covered(pt):
# Ignoring slots that have accounts to be downloaded, anyway # Ignoring slots that have accounts to be downloaded, anyway

View File

@ -0,0 +1,387 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Heal accounts DB:
## =================
##
## This module is a variation of the `swap-in` module in the sense that it
## searches for missing nodes in the database (which means that links to
## that nodes must exist for knowing this fact) and then fetches the nodes
## from the network.
##
## Algorithm
## ---------
##
## * Run `swapInAccounts()` so that inheritable sub-tries are imported from
## previous pivots.
##
## * Find nodes with envelopes that have no account in common with any range
## interval of the `processed` set of the current pivot. Stop if there are
## no such nodes.
##
## * Extract the missing nodes from the previous step, i.e. the nodes that
## are known to exist but are not allocated. If all nodes are allocated,
## employ the `hexaryInspect()` function in a limited mode do find dangling
## (i.e. missing) sub-nodes of these allocated nodes. Stop if this function
## fails to find any such nodes.
##
## * From the nodes of the previous step, extract non-allocated nodes and
## fetch them from the network.
##
## * Install that nodes from the network.
##
## * Rinse and repeat
##
## Discussion:
## -----------
##
## The worst case scenario in the third step might also be solved by allocating
## more accounts and running this healing algorith again.
##
## Due to its potentially poor performance there is no way to recursively
## search the whole database hexary trie for more dangling nodes using the
## `hexaryInspect()` function.
##
import
std/[math, sequtils, tables],
chronicles,
chronos,
eth/[common, p2p, trie/nibbles, trie/trie_defs, rlp],
stew/[byteutils, interval_set, keyed_queue],
../../../../utils/prettify,
"../../.."/[sync_desc, types],
"../.."/[constants, range_desc, worker_desc],
../com/[com_error, get_trie_nodes],
../db/[hexary_desc, hexary_envelope, hexary_error, hexary_inspect,
snapdb_accounts],
./swap_in
{.push raises: [Defect].}
logScope:
topics = "snap-heal"
const
extraTraceMessages = false or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private logging helpers
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"Accounts healing " & info
proc `$`(node: NodeSpecs): string =
node.partialPath.toHex
proc `$`(rs: NodeTagRangeSet): string =
rs.fullFactor.toPC(0)
proc `$`(iv: NodeTagRange): string =
iv.fullFactor.toPC(3)
proc toPC(w: openArray[NodeSpecs]; n: static[int] = 3): string =
let sumUp = w.mapIt(it.hexaryEnvelope.len).foldl(a+b, 0.u256)
(sumUp.to(float) / (2.0^256)).toPC(n)
proc healingCtx(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): string =
let ctx = buddy.ctx
"{" &
"pivot=" & "#" & $env.stateHeader.blockNumber & "," &
"nAccounts=" & $env.nAccounts & "," &
("covered=" & $env.fetchAccounts.processed & "/" &
$ctx.data.coveredAccounts ) & "}"
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
template discardRlpError(info: static[string]; code: untyped) =
try:
code
except RlpError as e:
discard
template noExceptionOops(info: static[string]; code: untyped) =
try:
code
except Defect as e:
raise e
except Exception as e:
raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc compileMissingNodesList(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): seq[NodeSpecs] =
## Find some missing glue nodes in current database to be fetched
## individually.
let
ctx = buddy.ctx
rootKey = env.stateHeader.stateRoot.to(NodeKey)
getFn = ctx.data.snapDb.getAccountFn
fa = env.fetchAccounts
# Import from earlier run
while buddy.swapInAccounts(env) != 0:
discard
var nodes: seq[NodeSpecs]
noExceptionOops("getMissingNodesList"):
# Get unallocated nodes to be fetched
let rc = fa.processed.hexaryEnvelopeDecompose(rootKey, getFn)
if rc.isOk:
nodes = rc.value
# Remove allocated nodes
let missingNodes = nodes.filterIt(it.nodeKey.ByteArray32.getFn().len == 0)
if 0 < missingNodes.len:
when extraTraceMessages:
trace logTxt "missing nodes", ctx=buddy.healingCtx(env),
nResult=missingNodes.len, result=missingNodes.toPC
return missingNodes
# Plan B, carefully employ `hexaryInspect()`
if 0 < nodes.len:
try:
let stats = getFn.hexaryInspectTrie(
rootKey, nodes.mapIt(it.partialPath), suspendAfter=healInspectionBatch)
if 0 < stats.dangling.len:
trace logTxt "missing nodes (plan B)", ctx=buddy.healingCtx(env),
nResult=stats.dangling.len, result=stats.dangling.toPC
return stats.dangling
except:
discard
proc fetchMissingNodes(
buddy: SnapBuddyRef;
missingNodes: seq[NodeSpecs];
env: SnapPivotRef;
): Future[seq[NodeSpecs]]
{.async.} =
## Extract from `nodes.missing` the next batch of nodes that need
## to be merged it into the database
let
ctx = buddy.ctx
peer = buddy.peer
stateRoot = env.stateHeader.stateRoot
pivot = "#" & $env.stateHeader.blockNumber # for logging
nMissingNodes= missingNodes.len
nFetchNodes = max(0, nMissingNodes - snapRequestTrieNodesFetchMax)
# There is no point in fetching too many nodes as it will be rejected. So
# rest of the `missingNodes` list is ignored to be picked up later.
fetchNodes = missingNodes[0 ..< nFetchNodes]
# Initalise for fetching nodes from the network via `getTrieNodes()`
var
nodeKey: Table[Blob,NodeKey] # Temporary `path -> key` mapping
pathList: seq[seq[Blob]] # Function argument for `getTrieNodes()`
for w in fetchNodes:
pathList.add @[w.partialPath]
nodeKey[w.partialPath] = w.nodeKey
# Fetch nodes from the network.
let rc = await buddy.getTrieNodes(stateRoot, pathList, pivot)
if rc.isOk:
# Reset error counts for detecting repeated timeouts, network errors, etc.
buddy.data.errors.resetComError()
# Forget about unfetched missing nodes, will be picked up later
return rc.value.nodes.mapIt(NodeSpecs(
partialPath: it.partialPath,
nodeKey: nodeKey[it.partialPath],
data: it.data))
# Process error ...
let
error = rc.error
ok = await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors)
when extraTraceMessages:
if ok:
trace logTxt "fetch nodes error => stop", peer,
ctx=buddy.healingCtx(env), error
else:
trace logTxt "fetch nodes error", peer,
ctx=buddy.healingCtx(env), error
return @[]
proc kvAccountLeaf(
buddy: SnapBuddyRef;
node: NodeSpecs;
env: SnapPivotRef;
): (bool,NodeKey,Account) =
## Re-read leaf node from persistent database (if any)
let
peer = buddy.peer
var
nNibbles = -1
discardRlpError("kvAccountLeaf"):
let
nodeRlp = rlpFromBytes node.data
prefix = (hexPrefixDecode node.partialPath)[1]
segment = (hexPrefixDecode nodeRlp.listElem(0).toBytes)[1]
nibbles = prefix & segment
nNibbles = nibbles.len
if nNibbles == 64:
let
data = nodeRlp.listElem(1).toBytes
nodeKey = nibbles.getBytes.convertTo(NodeKey)
accData = rlp.decode(data,Account)
return (true, nodeKey, accData)
when extraTraceMessages:
trace logTxt "non-leaf node path or corrupt data", peer,
ctx=buddy.healingCtx(env), nNibbles
proc registerAccountLeaf(
buddy: SnapBuddyRef;
accKey: NodeKey;
acc: Account;
env: SnapPivotRef;
) =
## Process single account node as would be done with an interval by
## the `storeAccounts()` function
let
peer = buddy.peer
pt = accKey.to(NodeTag)
# Register isolated leaf node
if 0 < env.fetchAccounts.processed.merge(pt,pt) :
env.nAccounts.inc
env.fetchAccounts.unprocessed.reduce(pt,pt)
discard buddy.ctx.data.coveredAccounts.merge(pt,pt)
# Update storage slots batch
if acc.storageRoot != emptyRlpHash:
env.fetchStorageFull.merge AccountSlotsHeader(
acckey: accKey,
storageRoot: acc.storageRoot)
# ------------------------------------------------------------------------------
# Private functions: do the healing for one round
# ------------------------------------------------------------------------------
proc accountsHealingImpl(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): Future[int]
{.async.} =
## Fetching and merging missing account trie database nodes. It returns the
## number of nodes fetched from the network, and -1 upon error.
let
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
fa = env.fetchAccounts
# Update for changes since last visit
missingNodes = buddy.compileMissingNodesList(env)
if missingNodes.len == 0:
# Nothing to do
trace logTxt "nothing to do", peer, ctx=buddy.healingCtx(env)
return 0 # nothing to do
# Get next batch of nodes that need to be merged it into the database
let fetchedNodes = await buddy.fetchMissingNodes(missingNodes, env)
if fetchedNodes.len == 0:
return 0
# Store nodes onto disk
let
nFetchedNodes = fetchedNodes.len
report = db.importRawAccountsNodes(peer, fetchedNodes)
if 0 < report.len and report[^1].slot.isNone:
# Storage error, just run the next lap (not much else that can be done)
error logTxt "error updating persistent database", peer,
ctx=buddy.healingCtx(env), nFetchedNodes, error=report[^1].error
return -1
# Filter out error and leaf nodes
var
nIgnored = 0
nLeafNodes = 0 # for logging
for w in report:
if w.slot.isSome: # non-indexed entries appear typically at the end, though
let inx = w.slot.unsafeGet
if w.kind.isNone:
# Error report without node referenece
discard
elif w.error != NothingSerious:
# Node error, will need to pick up later and download again
nIgnored.inc
elif w.kind.unsafeGet == Leaf:
# Leaf node has been stored, double check
let (isLeaf, key, acc) = buddy.kvAccountLeaf(fetchedNodes[inx], env)
if isLeaf:
# Update `unprocessed` registry, collect storage roots (if any)
buddy.registerAccountLeaf(key, acc, env)
nLeafNodes.inc
when extraTraceMessages:
trace logTxt "merged into database", peer,
ctx=buddy.healingCtx(env), nFetchedNodes, nLeafNodes
return nFetchedNodes - nIgnored
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc healAccounts*(
buddy: SnapBuddyRef;
env: SnapPivotRef;
) {.async.} =
## Fetching and merging missing account trie database nodes.
let
ctx = buddy.ctx
peer = buddy.peer
when extraTraceMessages:
trace logTxt "started", peer, ctx=buddy.healingCtx(env)
var
nNodesFetched = 0
nFetchLoop = 0
# Stop after `healAccountsBatchFetchMax` nodes have been fetched
while nNodesFetched < healAccountsBatchFetchMax:
var nNodes = await buddy.accountsHealingImpl(env)
if nNodes <= 0:
break
nNodesFetched.inc(nNodes)
nFetchLoop.inc
when extraTraceMessages:
trace logTxt "job done", peer, ctx=buddy.healingCtx(env),
nNodesFetched, nFetchLoop, runState=buddy.ctrl.state
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -30,11 +30,11 @@ import
chronos, chronos,
eth/[common, p2p, trie/nibbles, trie/trie_defs, rlp], eth/[common, p2p, trie/nibbles, trie/trie_defs, rlp],
stew/[interval_set, keyed_queue], stew/[interval_set, keyed_queue],
../../../utils/prettify, ../../../../utils/prettify,
../../sync_desc, ../../../sync_desc,
".."/[constants, range_desc, worker_desc], "../.."/[constants, range_desc, worker_desc],
./com/[com_error, get_trie_nodes], ../com/[com_error, get_trie_nodes],
./db/[hexary_desc, hexary_error, snapdb_storage_slots], ../db/[hexary_desc, hexary_error, snapdb_storage_slots],
./sub_tries_helper ./sub_tries_helper
{.push raises: [Defect].} {.push raises: [Defect].}
@ -62,8 +62,8 @@ proc healingCtx(
"{" & "{" &
"pivot=" & "#" & $env.stateHeader.blockNumber & "," & "pivot=" & "#" & $env.stateHeader.blockNumber & "," &
"covered=" & slots.unprocessed.emptyFactor.toPC(0) & "," & "covered=" & slots.unprocessed.emptyFactor.toPC(0) & "," &
"nCheckNodes=" & $slots.checkNodes.len & "," & "nNodesCheck=" & $slots.nodes.check.len & "," &
"nSickSubTries=" & $slots.sickSubTries.len & "}" "nNodesMissing=" & $slots.nodes.missing.len & "}"
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Private functions # Private functions
@ -99,9 +99,9 @@ proc verifyStillMissingNodes(
kvp: SnapSlotsQueuePair; kvp: SnapSlotsQueuePair;
env: SnapPivotRef; env: SnapPivotRef;
) = ) =
## Check whether previously missing nodes from the `sickSubTries` list ## Check whether previously missing nodes from the `nodes.missing` list
## have been magically added to the database since it was checked last ## have been magically added to the database since it was checked last
## time. These nodes will me moved to `checkNodes` for further processing. ## time. These nodes will me moved to `nodes.check` for further processing.
let let
ctx = buddy.ctx ctx = buddy.ctx
db = ctx.data.snapDb db = ctx.data.snapDb
@ -111,17 +111,17 @@ proc verifyStillMissingNodes(
slots = kvp.data.slots slots = kvp.data.slots
var delayed: seq[NodeSpecs] var delayed: seq[NodeSpecs]
for w in slots.sickSubTries: for w in slots.nodes.missing:
let rc = db.getStorageSlotsNodeKey(peer, accKey, storageRoot, w.partialPath) let rc = db.getStorageSlotsNodeKey(peer, accKey, storageRoot, w.partialPath)
if rc.isOk: if rc.isOk:
# Check nodes for dangling links # Check nodes for dangling links
slots.checkNodes.add w slots.nodes.check.add w
else: else:
# Node is still missing # Node is still missing
delayed.add w delayed.add w
# Must not modify sequence while looping over it # Must not modify sequence while looping over it
slots.sickSubTries = slots.sickSubTries & delayed slots.nodes.missing = slots.nodes.missing & delayed
proc updateMissingNodesList( proc updateMissingNodesList(
@ -131,8 +131,8 @@ proc updateMissingNodesList(
): Future[bool] ): Future[bool]
{.async.} = {.async.} =
## Starting with a given set of potentially dangling intermediate trie nodes ## Starting with a given set of potentially dangling intermediate trie nodes
## `checkNodes`, this set is filtered and processed. The outcome is fed back ## `nodes.check`, this set is filtered and processed. The outcome is fed back
## to the vey same list `checkNodes` ## to the vey same list `nodes.check`.
let let
ctx = buddy.ctx ctx = buddy.ctx
db = ctx.data.snapDb db = ctx.data.snapDb
@ -166,7 +166,7 @@ proc getMissingNodesFromNetwork(
env: SnapPivotRef; env: SnapPivotRef;
): Future[seq[NodeSpecs]] ): Future[seq[NodeSpecs]]
{.async.} = {.async.} =
## Extract from `sickSubTries` the next batch of nodes that need ## Extract from `nodes.missing` the next batch of nodes that need
## to be merged it into the database ## to be merged it into the database
let let
ctx = buddy.ctx ctx = buddy.ctx
@ -176,13 +176,13 @@ proc getMissingNodesFromNetwork(
pivot = "#" & $env.stateHeader.blockNumber # for logging pivot = "#" & $env.stateHeader.blockNumber # for logging
slots = kvp.data.slots slots = kvp.data.slots
nSickSubTries = slots.sickSubTries.len nSickSubTries = slots.nodes.missing.len
inxLeft = max(0, nSickSubTries - snapRequestTrieNodesFetchMax) inxLeft = max(0, nSickSubTries - snapRequestTrieNodesFetchMax)
# There is no point in processing too many nodes at the same time. So leave # There is no point in processing too many nodes at the same time. So leave
# the rest on the `sickSubTries` queue to be handled later. # the rest on the `nodes.missing` queue to be handled later.
let fetchNodes = slots.sickSubTries[inxLeft ..< nSickSubTries] let fetchNodes = slots.nodes.missing[inxLeft ..< nSickSubTries]
slots.sickSubTries.setLen(inxLeft) slots.nodes.missing.setLen(inxLeft)
# Initalise for `getTrieNodes()` for fetching nodes from the network # Initalise for `getTrieNodes()` for fetching nodes from the network
var var
@ -192,7 +192,7 @@ proc getMissingNodesFromNetwork(
pathList.add @[w.partialPath] pathList.add @[w.partialPath]
nodeKey[w.partialPath] = w.nodeKey nodeKey[w.partialPath] = w.nodeKey
# Fetch nodes from the network. Note that the remainder of the `sickSubTries` # Fetch nodes from the network. Note that the remainder of the `nodes.missing`
# list might be used by another process that runs semi-parallel. # list might be used by another process that runs semi-parallel.
let let
req = @[accKey.to(Blob)] & fetchNodes.mapIt(it.partialPath) req = @[accKey.to(Blob)] & fetchNodes.mapIt(it.partialPath)
@ -204,7 +204,7 @@ proc getMissingNodesFromNetwork(
# Register unfetched missing nodes for the next pass # Register unfetched missing nodes for the next pass
for w in rc.value.leftOver: for w in rc.value.leftOver:
for n in 1 ..< w.len: for n in 1 ..< w.len:
slots.sickSubTries.add NodeSpecs( slots.nodes.missing.add NodeSpecs(
partialPath: w[n], partialPath: w[n],
nodeKey: nodeKey[w[n]]) nodeKey: nodeKey[w[n]])
return rc.value.nodes.mapIt(NodeSpecs( return rc.value.nodes.mapIt(NodeSpecs(
@ -213,8 +213,8 @@ proc getMissingNodesFromNetwork(
data: it.data)) data: it.data))
# Restore missing nodes list now so that a task switch in the error checker # Restore missing nodes list now so that a task switch in the error checker
# allows other processes to access the full `sickSubTries` list. # allows other processes to access the full `nodes.missing` list.
slots.sickSubTries = slots.sickSubTries & fetchNodes slots.nodes.missing = slots.nodes.missing & fetchNodes
let error = rc.error let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors):
@ -364,12 +364,12 @@ proc storageSlotsHealing(
buddy.verifyStillMissingNodes(kvp, env) buddy.verifyStillMissingNodes(kvp, env)
# ??? # ???
if slots.checkNodes.len != 0: if slots.nodes.check.len != 0:
if not await buddy.updateMissingNodesList(kvp,env): if not await buddy.updateMissingNodesList(kvp,env):
return false return false
# Check whether the trie is complete. # Check whether the trie is complete.
if slots.sickSubTries.len == 0: if slots.nodes.missing.len == 0:
trace logTxt "complete", peer, itCtx=buddy.healingCtx(kvp,env) trace logTxt "complete", peer, itCtx=buddy.healingCtx(kvp,env)
return true return true
@ -386,7 +386,7 @@ proc storageSlotsHealing(
error logTxt "error updating persistent database", peer, error logTxt "error updating persistent database", peer,
itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists, itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists,
nStorageQueue, nNodes=nodeSpecs.len, error=report[^1].error nStorageQueue, nNodes=nodeSpecs.len, error=report[^1].error
slots.sickSubTries = slots.sickSubTries & nodeSpecs slots.nodes.missing = slots.nodes.missing & nodeSpecs
return false return false
when extraTraceMessages: when extraTraceMessages:
@ -404,11 +404,11 @@ proc storageSlotsHealing(
if w.error != NothingSerious or w.kind.isNone: if w.error != NothingSerious or w.kind.isNone:
# error, try downloading again # error, try downloading again
slots.sickSubTries.add nodeSpecs[inx] slots.nodes.missing.add nodeSpecs[inx]
elif w.kind.unsafeGet != Leaf: elif w.kind.unsafeGet != Leaf:
# re-check this node # re-check this node
slots.checkNodes.add nodeSpecs[inx] slots.nodes.check.add nodeSpecs[inx]
else: else:
# Node has been stored, double check # Node has been stored, double check
@ -419,7 +419,7 @@ proc storageSlotsHealing(
buddy.registerStorageSlotsLeaf(kvp, slotKey, env) buddy.registerStorageSlotsLeaf(kvp, slotKey, env)
nLeafNodes.inc nLeafNodes.inc
else: else:
slots.checkNodes.add nodeSpecs[inx] slots.nodes.check.add nodeSpecs[inx]
when extraTraceMessages: when extraTraceMessages:
let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len let nStorageQueue = env.fetchStorageFull.len + env.fetchStoragePart.len
@ -461,7 +461,9 @@ proc healingIsComplete(
return true # done return true # done
# Full range covered by unprocessed items # Full range covered by unprocessed items
kvp.data.slots = SnapRangeBatchRef(sickSubTries: rc.value.dangling) kvp.data.slots = SnapRangeBatchRef(
nodes: SnapTodoNodes(
missing: rc.value.dangling))
kvp.data.slots.unprocessed.init() kvp.data.slots.unprocessed.init()
# Proceed with healing # Proceed with healing

View File

@ -35,12 +35,12 @@ import
eth/[common, p2p], eth/[common, p2p],
stew/[interval_set, keyed_queue], stew/[interval_set, keyed_queue],
stint, stint,
../../../utils/prettify, ../../../../utils/prettify,
../../sync_desc, ../../../sync_desc,
".."/[constants, range_desc, worker_desc], "../.."/[constants, range_desc, worker_desc],
./com/[com_error, get_account_range], ../com/[com_error, get_account_range],
./db/[hexary_envelope, snapdb_accounts], ../db/[hexary_envelope, snapdb_accounts],
"."/[pivot_helper, swap_in] ./swap_in
{.push raises: [Defect].} {.push raises: [Defect].}
@ -58,19 +58,6 @@ const
template logTxt(info: static[string]): static[string] = template logTxt(info: static[string]): static[string] =
"Accounts range " & info "Accounts range " & info
# proc dumpUnprocessed(
# buddy: SnapBuddyRef;
# env: SnapPivotRef;
# ): string =
# ## Debugging ...
# let
# peer = buddy.peer
# pivot = "#" & $env.stateHeader.blockNumber # for logging
# moan = proc(overlap: UInt256; iv: NodeTagRange) =
# trace logTxt "unprocessed => overlap", peer, pivot, overlap, iv
#
# env.fetchAccounts.unprocessed.dump(moan, 5)
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Private helpers # Private helpers
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
@ -181,13 +168,8 @@ proc accountsRangefetchImpl(
env.fetchStorageFull.merge dd.withStorage env.fetchStorageFull.merge dd.withStorage
var nSwapInLaps = 0 var nSwapInLaps = 0
if env.archived: if not env.archived and
# Current pivot just became outdated, rebuild storage slots index (if any) swapInAccountsCoverageTrigger <= ctx.pivotAccountsCoverage():
if 0 < gotStorage:
trace logTxt "mothballing", peer, pivot, gotStorage
env.pivotMothball
elif swapInAccountsCoverageTrigger <= ctx.data.coveredAccounts.fullFactor:
# Swap in from other pivots # Swap in from other pivots
when extraTraceMessages: when extraTraceMessages:
trace logTxt "before swap in", peer, pivot, gotAccounts, gotStorage, trace logTxt "before swap in", peer, pivot, gotAccounts, gotStorage,
@ -242,7 +224,6 @@ proc rangeFetchAccounts*(
when extraTraceMessages: when extraTraceMessages:
trace logTxt "done", peer, pivot, nFetchAccounts, trace logTxt "done", peer, pivot, nFetchAccounts,
nCheckNodes=fa.checkNodes.len, nSickSubTries=fa.sickSubTries.len,
runState=buddy.ctrl.state runState=buddy.ctrl.state
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------

View File

@ -58,10 +58,10 @@ import
eth/[common, p2p], eth/[common, p2p],
stew/[interval_set, keyed_queue], stew/[interval_set, keyed_queue],
stint, stint,
../../sync_desc, ../../../sync_desc,
".."/[constants, range_desc, worker_desc], "../.."/[constants, range_desc, worker_desc],
./com/[com_error, get_storage_ranges], ../com/[com_error, get_storage_ranges],
./db/[hexary_error, snapdb_storage_slots] ../db/[hexary_error, snapdb_storage_slots]
{.push raises: [Defect].} {.push raises: [Defect].}
@ -330,21 +330,23 @@ proc rangeFetchStorageSlots*(
# Processing the full range will implicitely handle inheritable storage # Processing the full range will implicitely handle inheritable storage
# slots first with each batch item (see `getNextSlotItemsFull()`.) # slots first with each batch item (see `getNextSlotItemsFull()`.)
#
# Run this batch even if `archived` flag is set in order to shrink the
# batch queue.
var fullRangeItemsleft = 1 + (fullRangeLen-1) div snapStorageSlotsFetchMax var fullRangeItemsleft = 1 + (fullRangeLen-1) div snapStorageSlotsFetchMax
while 0 < fullRangeItemsleft and while 0 < fullRangeItemsleft and
buddy.ctrl.running and buddy.ctrl.running:
not env.archived:
# Pull out the next request list from the queue # Pull out the next request list from the queue
let req = buddy.getNextSlotItemsFull(env) let req = buddy.getNextSlotItemsFull(env)
if req.len == 0: if req.len == 0:
break break
fullRangeItemsleft.dec fullRangeItemsleft.dec
await buddy.storeStoragesSingleBatch(req, env) await buddy.storeStoragesSingleBatch(req, env)
var partialRangeItemsLeft = env.fetchStoragePart.len var partialRangeItemsLeft = env.fetchStoragePart.len
while 0 < partialRangeItemsLeft and while 0 < partialRangeItemsLeft and
buddy.ctrl.running and buddy.ctrl.running:
not env.archived:
# Pull out the next request list from the queue # Pull out the next request list from the queue
let req = buddy.getNextSlotItemPartial(env) let req = buddy.getNextSlotItemPartial(env)
if req.len == 0: if req.len == 0:

View File

@ -14,8 +14,8 @@ import
chronos, chronos,
eth/[common, p2p], eth/[common, p2p],
stew/interval_set, stew/interval_set,
".."/[constants, range_desc, worker_desc], "../.."/[constants, range_desc, worker_desc],
./db/[hexary_desc, hexary_error, hexary_inspect] ../db/[hexary_desc, hexary_error, hexary_inspect]
{.push raises: [Defect].} {.push raises: [Defect].}
@ -61,7 +61,7 @@ proc subTriesFromPartialPaths*(
getFn: HexaryGetFn; ## Abstract database access getFn: HexaryGetFn; ## Abstract database access
stateRoot: Hash256; ## Start of hexary trie stateRoot: Hash256; ## Start of hexary trie
batch: SnapRangeBatchRef; ## Healing data support batch: SnapRangeBatchRef; ## Healing data support
sickSubTriesMaxLen = high(int); ## Max length of `sickSubTries` nodesMissingMaxLen = high(int); ## Max length of `nodes.missing`
): Future[Result[void,HexaryError]] ): Future[Result[void,HexaryError]]
{.async.} = {.async.} =
## Starting with a given set of potentially dangling account nodes ## Starting with a given set of potentially dangling account nodes
@ -82,9 +82,9 @@ proc subTriesFromPartialPaths*(
block errorWhenOutside: block errorWhenOutside:
try: try:
while batch.sickSubTries.len < sickSubTriesMaxLen: while batch.nodes.missing.len < nodesMissingMaxLen:
# Inspect hexary trie for dangling nodes # Inspect hexary trie for dangling nodes
let rc = getFn.doInspect(rootKey, batch.checkNodes, batch.resumeCtx) let rc = getFn.doInspect(rootKey, batch.nodes.check, batch.resumeCtx)
if rc.isErr: if rc.isErr:
error = rc.error error = rc.error
break errorWhenOutside break errorWhenOutside
@ -93,10 +93,10 @@ proc subTriesFromPartialPaths*(
# Update context for async threading environment # Update context for async threading environment
batch.resumeCtx = rc.value.resumeCtx batch.resumeCtx = rc.value.resumeCtx
batch.checkNodes.setLen(0) batch.nodes.check.setLen(0)
# Collect result # Collect result
batch.sickSubTries = batch.sickSubTries & rc.value.dangling batch.nodes.missing = batch.nodes.missing & rc.value.dangling
# Done unless there is some resumption context # Done unless there is some resumption context
if rc.value.resumeCtx.isNil: if rc.value.resumeCtx.isNil:
@ -106,11 +106,11 @@ proc subTriesFromPartialPaths*(
trace logTxt "inspection wait", count, trace logTxt "inspection wait", count,
elapsed=(Moment.now()-start), elapsed=(Moment.now()-start),
sleep=healInspectionBatchWaitNanoSecs, sleep=healInspectionBatchWaitNanoSecs,
sickSubTriesLen=batch.sickSubTries.len, sickSubTriesMaxLen, nodesMissingLen=batch.nodes.missing.len, nodesMissingMaxLen,
resumeCtxLen = batch.resumeCtx.hddCtx.len resumeCtxLen = batch.resumeCtx.hddCtx.len
# Allow async task switch and continue. Note that some other task might # Allow async task switch and continue. Note that some other task might
# steal some of the `sickSubTries` var argument. # steal some of the `nodes.missing` var argument.
await sleepAsync healInspectionBatchWaitNanoSecs.nanoseconds await sleepAsync healInspectionBatchWaitNanoSecs.nanoseconds
batch.lockTriePerusal = false batch.lockTriePerusal = false
@ -119,7 +119,7 @@ proc subTriesFromPartialPaths*(
except RlpError: except RlpError:
error = RlpEncoding error = RlpEncoding
batch.sickSubTries = batch.sickSubTries & batch.resumeCtx.to(seq[NodeSpecs]) batch.nodes.missing = batch.nodes.missing & batch.resumeCtx.to(seq[NodeSpecs])
batch.resumeCtx = nil batch.resumeCtx = nil
batch.lockTriePerusal = false batch.lockTriePerusal = false

View File

@ -0,0 +1,336 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Swap in already allocated sub-tries
## ===================================
##
## This module imports sub-tries from other pivots into the current. It does
## so by detecting the top of an existing sub-trie in the current pivot and
## searches other pivots for the part of the sub-trie that is already
## available there. So it can be marked accomplished on the current pivot.
##
## Algorithm
## ---------
##
## * Find nodes with envelopes that have no account in common with any range
## interval of the `processed` set of the current pivot.
##
## * From the nodes of the previous step, extract allocated nodes and try to
## find them on previous pivots. Stop if there are no such nodes.
##
## * The portion of `processed` ranges on the other pivot that intersects with
## the envelopes of the nodes have been downloaded already. And it is equally
## applicable to the current pivot as it applies to the same sub-trie.
##
## So the intersection of `processed` with the node envelope will be copied
## to to the `processed` ranges of the current pivot.
##
## * Rinse and repeat.
##
import
std/[math, sequtils],
chronicles,
eth/[common, p2p],
stew/[byteutils, interval_set, keyed_queue, sorted_set],
../../../../utils/prettify,
../../../types,
"../.."/[range_desc, worker_desc],
../db/[hexary_desc, hexary_envelope, hexary_error,
hexary_paths, snapdb_accounts]
{.push raises: [Defect].}
logScope:
topics = "snap-swapin"
type
SwapInPivot = object
## Subset of `SnapPivotRef` with relevant parts, only
rootKey: NodeKey ## Storage slots & accounts
processed: NodeTagRangeSet ## Storage slots & accounts
pivot: SnapPivotRef ## Accounts only
const
extraTraceMessages = false or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private logging helpers
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"Swap-in " & info
proc `$`(node: NodeSpecs): string =
node.partialPath.toHex
proc `$`(rs: NodeTagRangeSet): string =
rs.fullFactor.toPC(3)
proc `$`(iv: NodeTagRange): string =
iv.fullFactor.toPC(3)
proc toPC(w: openArray[NodeSpecs]; n: static[int] = 3): string =
let sumUp = w.mapIt(it.hexaryEnvelope.len).foldl(a+b, 0.u256)
(sumUp.to(float) / (2.0^256)).toPC(n)
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc existsInTrie(
node: NodeSpecs; # Probe node to test to exist
rootKey: NodeKey; # Start node into hexary trie
getFn: HexaryGetFn; # Abstract database access
): bool =
## Check whether this node exists on the sub-trie starting at ` rootKey`
var error: HexaryError
try:
let rc = node.partialPath.hexaryPathNodeKey(rootKey, getFn)
if rc.isOk:
return rc.value == node.nodeKey
except RlpError:
error = RlpEncoding
when extraTraceMessages:
if error != NothingSerious:
trace logTxt "other trie check node failed", node, error
false
template noKeyErrorOrExceptionOops(info: static[string]; code: untyped) =
try:
code
except KeyError as e:
raiseAssert "Not possible (" & info & "): " & e.msg
except Defect as e:
raise e
except Exception as e:
raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg
template noExceptionOops(info: static[string]; code: untyped) =
try:
code
except Defect as e:
raise e
except Exception as e:
raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc uncoveredEnvelopes(
processed: NodeTagRangeSet; # To be complemented
rootKey: NodeKey; # Start node into hexary trie
getFn: HexaryGetFn; # Abstract database access
): seq[NodeSpecs] =
## Compile the complement of the union of the `processed` intervals and
## express this complement as a list of envelopes of sub-tries.
##
let rc = processed.hexaryEnvelopeDecompose(rootKey, getFn)
if rc.isOk:
# Remove non-allocated nodes
result = rc.value.filterIt(0 < it.nodeKey.ByteArray32.getFn().len)
when extraTraceMessages:
trace logTxt "decomposed result", processed, nProcessed=processed.chunks,
nResult=result.len, result=result.toPC
proc otherProcessedRanges(
node: NodeSpecs; # Top node of portential sub-trie
otherPivots: seq[SwapInPivot]; # Other pivots list
rootKey: NodeKey; # Start node into hexary trie
getFn: HexaryGetFn; # Abstract database access
): seq[NodeTagRangeSet] =
## Collect already processed ranges from other pivots intersecting with the
## envelope of the argument `node`. The list of other pivots is represented
## by the argument iterator `otherPivots`.
let envelope = node.hexaryEnvelope
noExceptionOops("otherProcessedRanges"):
# For the current `node` select all hexary sub-tries that contain the same
# node `node.nodeKey` for the partial path `node.partianPath`.
for n,op in otherPivots:
result.add NodeTagRangeSet.init()
# Check whether the node is shared
if node.existsInTrie(op.rootKey, getFn):
# Import already processed part of the envelope of `node` into the
# `batch.processed` set of ranges.
let
other = op.processed
touched = other.hexaryEnvelopeTouchedBy node
for iv in touched.increasing:
let segment = (envelope * iv).value
discard result[^1].merge segment
#when extraTraceMessages:
# trace logTxt "collect other pivot segment", n, node, segment
#when extraTraceMessages:
# if 0 < touched.chunks:
# trace logTxt "collected other pivot", n, node,
# other, nOtherChunks=other.chunks,
# touched, nTouched=touched.chunks,
# collected=result[^1]
# ------------------------------------------------------------------------------
# Private functions, swap-in functionality
# ------------------------------------------------------------------------------
proc swapIn(
processed: NodeTagRangeSet; # Covered node ranges to be updated
unprocessed: var SnapTodoRanges; # Uncovered node ranges to be updated
otherPivots: seq[SwapInPivot]; # Other pivots list (read only)
rootKey: NodeKey; # Start node into target hexary trie
getFn: HexaryGetFn; # Abstract database access
loopMax: int; # Prevent from looping too often
): (seq[NodeTagRangeSet],int) =
## Collect processed already ranges from argument `otherPivots` and merge them
## it onto the argument sets `processed` and `unprocessed`. For each entry
## of `otherPivots`, this function returns a list of merged (aka swapped in)
## ranges. It also returns the number of main loop runs with non-empty merges.
var
swappedIn = newSeq[NodeTagRangeSet](otherPivots.len)
lapCount = 0 # Loop control
allMerged = 0.u256 # Logging & debugging
# Initialise return value
for n in 0 ..< swappedIn.len:
swappedIn[n] = NodeTagRangeSet.init()
# Swap in node ranges from other pivots
while lapCount < loopMax:
var merged = 0.u256 # Loop control
let checkNodes = processed.uncoveredEnvelopes(rootKey, getFn)
for node in checkNodes:
# Process table of sets from other pivots with ranges intersecting
# with the `node` envelope.
for n,rngSet in node.otherProcessedRanges(otherPivots, rootKey, getFn):
# Merge `rngSet` into `swappedIn[n]` and `pivot.processed`,
# and remove `rngSet` from ` pivot.unprocessed`
for iv in rngSet.increasing:
discard swappedIn[n].merge iv # Imported range / other pivot
merged += processed.merge iv # Import range as processed
unprocessed.reduce iv # No need to re-fetch
if merged == 0: # Loop control
break
lapCount.inc
allMerged += merged # Statistics, logging
when extraTraceMessages:
trace logTxt "inherited ranges", lapCount, nCheckNodes=checkNodes.len,
merged=((merged.to(float) / (2.0^256)).toPC(3)),
allMerged=((allMerged.to(float) / (2.0^256)).toPC(3))
# End while()
(swappedIn,lapCount)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc swapInAccounts*(
ctx: SnapCtxRef; # Global context
env: SnapPivotRef; # Current pivot environment
loopMax = 100; # Prevent from looping too often
): int =
## Variant of `swapIn()` for the particular case of accounts database pivots.
let fa = env.fetchAccounts
if fa.processed.isFull:
return # nothing to do
let
rootKey = env.stateHeader.stateRoot.to(NodeKey)
getFn = ctx.data.snapDb.getAccountFn
others = toSeq(ctx.data.pivotTable.nextPairs)
# Swap in from mothballed pivots different from the current one
.filterIt(it.data.archived and it.key.to(NodeKey) != rootKey)
# Extract relevant parts
.mapIt(SwapInPivot(
rootKey: it.key.to(NodeKey),
processed: it.data.fetchAccounts.processed,
pivot: it.data))
if others.len == 0:
return # nothing to do
when extraTraceMessages:
let pivot = "#" & $env.stateHeader.blockNumber # for logging
trace logTxt "accounts start", pivot, nOthers=others.len
var
nLaps = 0 # Logging & debugging
nSlotAccounts = 0 # Logging & debugging
swappedIn: seq[NodeTagRangeSet]
noKeyErrorOrExceptionOops("swapInAccounts"):
(swappedIn, nLaps) = swapIn(
fa.processed, fa.unprocessed, others, rootKey, getFn, loopMax)
if 0 < nLaps:
# Update storage slots
for n in 0 ..< others.len:
#when extraTraceMessages:
# if n < swappedIn[n].chunks:
# trace logTxt "post-processing storage slots", n, nMax=others.len,
# changes=swappedIn[n], chunks=swappedIn[n].chunks
# Revisit all imported account key ranges
for iv in swappedIn[n].increasing:
# The `storageAccounts` list contains indices for storage slots,
# mapping account keys => storage root
var rc = others[n].pivot.storageAccounts.ge(iv.minPt)
while rc.isOk and rc.value.key <= iv.maxPt:
# Fetch storage slots specs from `fetchStorageFull` list
let stRoot = rc.value.data
if others[n].pivot.fetchStorageFull.hasKey(stRoot):
let accKey = others[n].pivot.fetchStorageFull[stRoot].accKey
discard env.fetchStorageFull.append(
stRoot, SnapSlotsQueueItemRef(acckey: accKey))
nSlotAccounts.inc
rc = others[n].pivot.storageAccounts.gt(rc.value.key)
when extraTraceMessages:
trace logTxt "accounts done", pivot, nOthers=others.len, nLaps,
nSlotAccounts
nLaps
proc swapInAccounts*(
buddy: SnapBuddyRef; # Worker peer
env: SnapPivotRef; # Current pivot environment
loopMax = 100; # Prevent from looping too often
): int =
## Variant of `swapInAccounts()`
buddy.ctx.swapInAccounts(env, loopMax)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -1,392 +0,0 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Swap in already allocated sub-tries
## ===================================
##
## This module imports sub-tries from other pivots into the current. It does
## so by detecting the top of an existing sub-trie in the current pivot and
## searches other pivots for the part of the sub-trie that is already
## available there. So it can be marked accomplished on the current pivot.
##
## Note that the terminology hinges on *account pivots* but is implemented in
## a more general way where
##
## * the current pivot is of type `SnapRangeBatchRef`
##
## * other pivots are represented by an iterator of type `SwapInPivots`
##
## So the algorithm can be transferred to other that accounting pivot
## situations.
##
## Algorithm
## ---------
##
## * On the *current pivot*, use the `processed` ranges of accounts to find all
## the nodes the envelopes of which are disjunct to the `processed` ranges
## (see module `hexary_envelope` for supporting functions.)
##
## * Select all the non-dangling/existing nodes disjunct envelopes from the
## previous step.
##
## * For all the selected non-dangling nodes from the previous step, check
## which ones are present in other pivots. This means that for a given
## existing node in the current pivot its *partial path* can be applied
## to the *state root* key of another pivot ending up at the same node key.
##
## The portion of `processed` ranges on the other pivot that intersects with
## the envelope of the node has been downloaded already. It is equally
## applicable to the current pivot as it applies to the same sub-trie. So
## the intersection of `processed` with the node envelope can be copied to
## to the `processed` ranges of the current pivot.
##
## * Rinse and repeat.
##
import
std/[sequtils, strutils],
chronicles,
eth/[common, p2p],
stew/[byteutils, interval_set, keyed_queue, sorted_set],
../../../utils/prettify,
".."/[range_desc, worker_desc],
./db/[hexary_desc, hexary_error, hexary_envelope,
hexary_paths, snapdb_accounts]
{.push raises: [Defect].}
logScope:
topics = "snap-swapin"
type
SwapInPivot = object
## Subset of `SnapPivotRef` with relevant parts, only
rootKey: NodeKey ## Storage slots & accounts
processed: NodeTagRangeSet ## Storage slots & accounts
pivot: SnapPivotRef ## Accounts only
const
extraTraceMessages = false or true
## Enabled additional logging noise
when extraTraceMessages:
import std/math, ../../types
# ------------------------------------------------------------------------------
# Private logging helpers
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"Swap-in helper " & info
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc decompose(
node: NodeSpecs; # Contains hex encoded partial path
iv: NodeTagRange; # Proofed range of leaf paths
rootKey: NodeKey; # Start node into hexary trie
getFn: HexaryGetFn; # Abstract database access
): Result[seq[NodeSpecs],void] =
## Decompose, succeed only if there is a change
var error: HexaryError
try:
let rc = node.partialPath.hexaryEnvelopeDecompose(rootKey, iv, getFn)
if rc.isErr:
error = rc.error
elif rc.value.len != 1 or rc.value[0].nodeKey != node.nodeKey:
return ok(rc.value)
else:
return err()
except RlpError:
error = RlpEncoding
when extraTraceMessages:
trace logTxt "envelope decomposition failed",
node=node.partialPath.toHex, error
err()
proc existsInTrie(
node: NodeSpecs; # Probe node to test to exist
rootKey: NodeKey; # Start node into hexary trie
getFn: HexaryGetFn; # Abstract database access
): bool =
## Check whether this node exists on the sub-trie starting at ` rootKey`
var error: HexaryError
try:
let rc = node.partialPath.hexaryPathNodeKey(rootKey, getFn)
if rc.isOk:
return rc.value == node.nodeKey
except RlpError:
error = RlpEncoding
when extraTraceMessages:
trace logTxt "check nodes failed",
partialPath=node.partialPath.toHex, error
false
template noKeyErrorOops(info: static[string]; code: untyped) =
try:
code
except KeyError as e:
raiseAssert "Not possible (" & info & "): " & e.msg
template noExceptionOops(info: static[string]; code: untyped) =
try:
code
except Defect as e:
raise e
except Exception as e:
raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc decomposeCheckNodes(
pivot: SnapRangeBatchRef; # Healing data support
rootKey: NodeKey; # Start node into hexary trie
getFn: HexaryGetFn; # Abstract database access
): Result[seq[NodeSpecs],void] =
## Decompose the `checkNodes` list of the argument `pivot` relative to the
## set `processed` of processed leaf node ranges.
##
## The function fails if there wan no change to the `checkNodes` list.
var
delayed: seq[NodeSpecs]
didSomething = 0
# Remove `checkNodes` entries with known complete sub-tries.
for node in pivot.checkNodes:
var paths: seq[NodeSpecs]
# For a Partially processed range, fetch overlapping intervals and
# sort of remove them from the envelope of `w`.
for touched in pivot.processed.hexaryEnvelopeTouchedBy(node).increasing:
let rc = node.decompose(touched, rootKey, getFn)
if rc.isOk:
paths &= rc.value
didSomething.inc
when extraTraceMessages:
trace logTxt "checkNodes decompose", nDelayed=delayed.len,
node=node.partialPath.toHex, nPaths=paths.len,
newPaths=rc.value.mapIt(it.partialPath.toHex).join(",")
# End inner for()
delayed &= paths
# End outer for()
if 0 < didSomething:
noKeyErrorOops("subTriesCheckNodesDecompose"):
# Remove duplicates in resulting path list
return ok(delayed.hexaryEnvelopeUniq)
err()
proc otherProcessedRanges(
node: NodeSpecs; # Top node of portential sub-trie
otherPivots: seq[SwapInPivot]; # Other pivots list
rootKey: NodeKey; # Start node into hexary trie
getFn: HexaryGetFn; # Abstract database access
): seq[NodeTagRangeSet] =
## Collect already processed ranges from other pivots intersecting with the
## envelope of the argument `node`. The list of other pivots is represented
## by the argument iterator `otherPivots`.
let envelope = node.hexaryEnvelope
var count = 0 # logging & debugging
noExceptionOops("otherProcessedRanges"):
# For the current `node` select all hexary sub-tries that contain the same
# node `node.nodeKey` for the partial path `node.partianPath`.
for rp in otherPivots.items:
# Check whether the node is shared
let haveNode = node.existsInTrie(rp.rootKey, getFn)
var subCount = 0 # logging & debugging
count.inc # logging & debugging
result.add NodeTagRangeSet.init()
if not haveNode:
trace logTxt "history loop", count, node=node.partialPath.toHex,
processed=rp.processed.fullFactor.toPC(3), haveNode
if haveNode:
when extraTraceMessages:
trace logTxt "history loop => sub start", count,
nTouched=rp.processed.hexaryEnvelopeTouchedBy(node).chunks, haveNode
# Import already processed part of the envelope of `node` into the
# `batch.processed` set of ranges.
for iv in rp.processed.hexaryEnvelopeTouchedBy(node).increasing:
let segment = (envelope * iv).value
discard result[^1].merge segment
subCount.inc # dlogging & ebugging
when extraTraceMessages:
trace logTxt "history loop => sub", count, subCount,
touchedLen=segment.fullFactor.toPC(3)
# ------------------------------------------------------------------------------
# Private functions, swap-in functionality
# ------------------------------------------------------------------------------
proc swapIn*(
pivot: SnapRangeBatchRef; # Healing state for target hexary trie
otherPivots: seq[SwapInPivot]; # Other pivots list
rootKey: NodeKey; # Start node into target hexary trie
getFn: HexaryGetFn; # Abstract database access
loopMax = 20; # Prevent from looping too often
): (int,seq[NodeTagRangeSet]) =
## Collect processed already ranges from argument `otherPivots` and register
## it onto the argument `pivot`. This function recognises and imports
## directly accessible sub-tries where the top-level node exists.
var
lapCount = 0
notDoneYet = true
swappedIn = newSeq[NodeTagRangeSet](otherPivots.len)
# Initialise return value
for n in 0 ..< swappedIn.len:
swappedIn[n] = NodeTagRangeSet.init()
while notDoneYet and lapCount < loopMax:
var
merged = 0.u256
nCheckNodesBefore = 0 # debugging
# Decompose `checkNodes` into sub-tries disjunct from `processed`
let toBeReclassified = block:
let rc = pivot.decomposeCheckNodes(rootKey, getFn)
if rc.isErr:
return (lapCount,swappedIn) # nothing to do
rc.value
lapCount.inc
notDoneYet = false
# Reclassify nodes into existing/allocated and dangling ones
noKeyErrorOops("swapIn"):
var
checkNodes: seq[NodeSpecs]
sickNodes: seq[NodeSpecs]
for node in toBeReclassified:
# Check whether previously missing nodes from the `sickSubTries` list
# have been magically added to the database since it was checked last
# time. These nodes will me moved to `checkNodes` for further
# processing.
if node.nodeKey.ByteArray32.getFn().len == 0:
sickNodes.add node # probably subject to healing
else:
let iv = node.hexaryEnvelope
if pivot.processed.covered(iv) < iv.len:
checkNodes.add node # may be swapped in
pivot.checkNodes = checkNodes.hexaryEnvelopeUniq
pivot.sickSubTries = sickNodes.hexaryEnvelopeUniq
nCheckNodesBefore = pivot.checkNodes.len # logging & debugging
# Swap in node ranges from other pivots
for node in pivot.checkNodes:
for n,rangeSet in node.otherProcessedRanges(otherPivots,rootKey,getFn):
for iv in rangeSet.increasing:
discard swappedIn[n].merge iv # imported range / other pivot
merged += pivot.processed.merge iv # import this range
pivot.unprocessed.reduce iv # no need to fetch it again
notDoneYet = 0 < merged # loop control
# Remove fully covered nodes
block:
var checkNodes: seq[NodeSpecs]
for node in toBeReclassified:
let iv = node.hexaryEnvelope
if pivot.processed.covered(iv) < iv.len:
checkNodes.add node # may be swapped in
pivot.checkNodes = checkNodes.hexaryEnvelopeUniq
when extraTraceMessages:
let mergedFactor = merged.to(float) / (2.0^256)
trace logTxt "inherited ranges", nCheckNodesBefore,
nCheckNodes=pivot.checkNodes.len, merged=mergedFactor.toPC(3)
# End while()
(lapCount,swappedIn)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc swapInAccounts*(
buddy: SnapBuddyRef; # Worker peer
env: SnapPivotRef; # Current pivot environment
loopMax = 20; # Prevent from looping too often
): int =
## Variant of `swapIn()` for the particular case of accounts database pivots.
let
ctx = buddy.ctx
rootKey = env.stateHeader.stateRoot.to(NodeKey)
getFn = ctx.data.snapDb.getAccountFn
others = toSeq(ctx.data.pivotTable.nextPairs)
# Swap in from mothballed pifots different from the current one
.filterIt(it.data.archived and it.key.to(NodeKey) != rootKey)
# Extract relevant parts
.mapIt(SwapInPivot(
rootKey: it.key.to(NodeKey),
processed: it.data.fetchAccounts.processed,
pivot: it.data))
var
nLaps: int
swappedIn: seq[NodeTagRangeSet]
noExceptionOops("swapInAccounts"):
(nLaps,swappedIn) = env.fetchAccounts.swapIn(others,rootKey,getFn,loopMax)
noKeyErrorOops("swapInAccounts"):
# Update storage slots
doAssert swappedIn.len == others.len
for n in 0 ..< others.len:
when extraTraceMessages:
trace logTxt "post-processing storage slots", inx=n, maxInx=others.len,
changes=swappedIn[n].fullFactor.toPC(3), chunks=swappedIn[n].chunks
# Revisit all imported account key ranges
for iv in swappedIn[n].increasing:
# The `storageAccounts` list contains indices for storage slots, mapping
# account keys => storage root
var rc = others[n].pivot.storageAccounts.ge(iv.minPt)
while rc.isOk and rc.value.key <= iv.maxPt:
# Fetch storage slots specs from `fetchStorageFull` list
let stRoot = rc.value.data
if others[n].pivot.fetchStorageFull.hasKey(stRoot):
let accKey = others[n].pivot.fetchStorageFull[stRoot].accKey
discard env.fetchStorageFull.append(
stRoot, SnapSlotsQueueItemRef(acckey: accKey))
rc = others[n].pivot.storageAccounts.gt(rc.value.key)
nLaps
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -28,7 +28,7 @@ type
pivotBlock*: Option[BlockNumber] pivotBlock*: Option[BlockNumber]
nAccounts*: (float,float) ## mean and standard deviation nAccounts*: (float,float) ## mean and standard deviation
accountsFill*: (float,float,float) ## mean, standard deviation, merged total accountsFill*: (float,float,float) ## mean, standard deviation, merged total
nAccountStats*: (int,int) ## #chunks, #dangling/missing nodes nAccountStats*: int ## #chunks
nSlotLists*: (float,float) ## mean and standard deviation nSlotLists*: (float,float) ## mean and standard deviation
nStorageQueue*: Option[int] nStorageQueue*: Option[int]
nQueues*: int nQueues*: int
@ -40,6 +40,7 @@ type
## Account fetching state that is shared among all peers. ## Account fetching state that is shared among all peers.
nBuddies: int nBuddies: int
recovery: bool recovery: bool
lastRecov: bool
lastStats: TickerStats lastStats: TickerStats
statsCb: TickerStatsUpdater statsCb: TickerStatsUpdater
logTicker: TimerCallback logTicker: TimerCallback
@ -116,25 +117,29 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
data = t.statsCb() data = t.statsCb()
now = Moment.now() now = Moment.now()
if data != t.lastStats or tickerLogSuppressMax < (now - t.visited): if data != t.lastStats or
t.lastStats = data t.recovery != t.lastRecov or
t.visited = now tickerLogSuppressMax < (now - t.visited):
var var
nAcc, nSto, bulk: string nAcc, nSto, bulk: string
pivot = "n/a" pivot = "n/a"
nStoQue = "n/a" nStoQue = "n/a"
let let
recoveryDone = t.lastRecov
accCov = data.accountsFill[0].pc99 & accCov = data.accountsFill[0].pc99 &
"(" & data.accountsFill[1].pc99 & ")" & "(" & data.accountsFill[1].pc99 & ")" &
"/" & data.accountsFill[2].pc99 & "/" & data.accountsFill[2].pc99 &
"~" & data.nAccountStats[0].uint.toSI & "~" & data.nAccountStats.uint.toSI
"/" & data.nAccountStats[1].uint.toSI
buddies = t.nBuddies buddies = t.nBuddies
# With `int64`, there are more than 29*10^10 years range for seconds # With `int64`, there are more than 29*10^10 years range for seconds
up = (now - t.started).seconds.uint64.toSI up = (now - t.started).seconds.uint64.toSI
mem = getTotalMem().uint.toSI mem = getTotalMem().uint.toSI
t.lastStats = data
t.visited = now
t.lastRecov = t.recovery
noFmtError("runLogTicker"): noFmtError("runLogTicker"):
if data.pivotBlock.isSome: if data.pivotBlock.isSome:
pivot = &"#{data.pivotBlock.get}/{data.nQueues}" pivot = &"#{data.pivotBlock.get}/{data.nQueues}"
@ -149,6 +154,9 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
if t.recovery: if t.recovery:
info "Snap sync statistics (recovery)", info "Snap sync statistics (recovery)",
up, buddies, pivot, nAcc, accCov, nSto, nStoQue, mem up, buddies, pivot, nAcc, accCov, nSto, nStoQue, mem
elif recoveryDone:
info "Snap sync statistics (recovery done)",
up, buddies, pivot, nAcc, accCov, nSto, nStoQue, mem
else: else:
info "Snap sync statistics", info "Snap sync statistics",
up, buddies, pivot, nAcc, accCov, nSto, nStoQue, mem up, buddies, pivot, nAcc, accCov, nSto, nStoQue, mem

View File

@ -50,12 +50,16 @@ type
## This data structure is used for coordinating peers that run quasi ## This data structure is used for coordinating peers that run quasi
## parallel. ## parallel.
SnapTodoNodes* = object
## Pair of node lists subject to swap-in and healing
check*: seq[NodeSpecs] ## Existing nodes, sub-trie unknown
missing*: seq[NodeSpecs] ## Top ref for sub-tries to be healed
SnapRangeBatchRef* = ref object SnapRangeBatchRef* = ref object
## `NodeTag` ranges to fetch, healing support ## `NodeTag` ranges to fetch, healing support
unprocessed*: SnapTodoRanges ## Range of slots to be fetched unprocessed*: SnapTodoRanges ## Range of slots to be fetched
processed*: NodeTagRangeSet ## Nodes definitely processed processed*: NodeTagRangeSet ## Node ranges definitely processed
checkNodes*: seq[NodeSpecs] ## Nodes with prob. dangling child links nodes*: SnapTodoNodes ## Single nodes to double check
sickSubTries*: seq[NodeSpecs] ## Top ref for sub-tries to be healed
resumeCtx*: TrieNodeStatCtxRef ## State for resuming trie inpection resumeCtx*: TrieNodeStatCtxRef ## State for resuming trie inpection
lockTriePerusal*: bool ## Only one process at a time lockTriePerusal*: bool ## Only one process at a time
@ -104,6 +108,7 @@ type
pivotTable*: SnapPivotTable ## Per state root environment pivotTable*: SnapPivotTable ## Per state root environment
pivotFinderCtx*: RootRef ## Opaque object reference for sub-module pivotFinderCtx*: RootRef ## Opaque object reference for sub-module
coveredAccounts*: NodeTagRangeSet ## Derived from all available accounts coveredAccounts*: NodeTagRangeSet ## Derived from all available accounts
covAccTimesFull*: uint ## # of 100% coverages
recovery*: SnapRecoveryRef ## Current recovery checkpoint/context recovery*: SnapRecoveryRef ## Current recovery checkpoint/context
noRecovery*: bool ## Ignore recovery checkpoints noRecovery*: bool ## Ignore recovery checkpoints
@ -128,6 +133,14 @@ proc hash*(a: Hash256): Hash =
## Table/KeyedQueue mixin ## Table/KeyedQueue mixin
a.data.hash a.data.hash
# ------------------------------------------------------------------------------
# Public helpers: coverage
# ------------------------------------------------------------------------------
proc pivotAccountsCoverage*(ctx: SnapCtxRef): float =
## Returns the accounts coverage factor
ctx.data.coveredAccounts.fullFactor + ctx.data.covAccTimesFull.float
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Public helpers: SnapTodoRanges # Public helpers: SnapTodoRanges
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
@ -195,11 +208,13 @@ proc fetch*(q: var SnapTodoRanges; maxLen: UInt256): Result[NodeTagRange,void] =
proc verify*(q: var SnapTodoRanges): bool = proc verify*(q: var SnapTodoRanges): bool =
## Verify consistency, i.e. that the two sets of ranges have no overlap. ## Verify consistency, i.e. that the two sets of ranges have no overlap.
if q[0].chunks == 0 or q[1].chunks == 0: if q[0].chunks == 0 or q[1].chunks == 0:
# At least on set is empty # At least one set is empty
return true return true
# So neither set is empty
if q[0].total == 0 or q[1].total == 0: if q[0].total == 0 or q[1].total == 0:
# At least one set is maximal and the other non-empty # At least one set is maximal and the other non-empty
return false return false
# So neither set is empty, not full
let (a,b) = if q[0].chunks < q[1].chunks: (0,1) else: (1,0) let (a,b) = if q[0].chunks < q[1].chunks: (0,1) else: (1,0)
for iv in q[a].increasing: for iv in q[a].increasing:
if 0 < q[b].covered(iv): if 0 < q[b].covered(iv):