Snap sync swap in other pivots (#1363)

* Provide index to reconstruct missing storage slots

why;
  Pivots will be changed anymore once they are officially archived. The
  account of the archived pivots are ready to be swapped into the active
  pivot. This leaves open how to treat storage slots not fetched yet.

  Solution: when mothballing, an `account->storage-root` index is
  compiled that can be used when swapping in accounts.

* Implement swap-in from earlier pivots

details;
  When most accounts are covered by the current and previous pivot
  sessions, swapping inthe accounts and storage slots  (i.e. registering
  account ranges done) from earlier pivots takes place if there is a
  common sub-trie.

* Throttle pivot change when healing state has bean reached

why:
  There is a hope to complete the current pivot, so pivot update can be
  throttled. This is achieved by setting another minimum block number
  distance for the pivot headers. This feature is still experimental
This commit is contained in:
Jordan Hrycaj 2022-12-12 22:00:24 +00:00 committed by GitHub
parent dcd1225724
commit cc2c888a63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 708 additions and 244 deletions

View File

@ -11,6 +11,9 @@
{.push raises: [Defect].}
const
pivotTableLruEntriesMax* = 50
## Max depth of pivot table. On overflow, the oldest one will be removed.
pivotBlockDistanceMin* = 128
## The minimal depth of two block headers needed to activate a new state
## root pivot.
@ -28,6 +31,10 @@ const
##
## Note that 128 is the magic distance for snapshots used by *Geth*.
pivotBlockDistanceThrottledPivotChangeMin* = 256
## Slower pivot change while healing or nearly complete accounts database
## takes place.
pivotEnvStopChangingIfComplete* = true
## If set `true`, new peers will not change the pivot even if the
## negotiated pivot would be newer. This should be the default.
@ -48,17 +55,17 @@ const
snapAccountsSaveProcessedChunksMax* = 1000
## Recovery data are stored if the processed ranges list contains no more
## than this many range `chunks`.
## than this many range *chunks*.
##
## If there are too many dangling nodes, no data will be saved and restart
## has to perform from scratch.
## If the range set is too much fragmented, no data will be saved and
## restart has to perform from scratch or an earlier checkpoint.
snapAccountsSaveStorageSlotsMax* = 10_000
## Recovery data are stored if the oustanding storage slots to process do
## not amount to more than this many entries.
##
## If there are too many dangling nodes, no data will be saved and restart
## has to perform from scratch.
## has to perform from scratch or an earlier checkpoint.
snapStorageSlotsFetchMax* = 2 * 1024
@ -74,6 +81,17 @@ const
# --------------
swapInAccountsCoverageTrigger* = 0.30
## Similar to `healAccountsCoverageTrigger` below only for trying to
## swap in from earlier pivots.
swapInAccountsPivotsMin* = 2
## Require at least this man pivots available before any swap in can
## take place (must be at least 2.) This value is typically combined
## with `swapInAccountsCoverageTrigger`.
# --------------
healInspectionBatch* = 10_000
## Number of nodes to inspect in a single batch. In between batches, a
## task/thread switch is allowed.
@ -159,6 +177,7 @@ const
## Set 0 to disable.
static:
doAssert 1 < swapInAccountsPivotsMin
doAssert healAccountsCoverageTrigger < 1.0 # larger values make no sense
doAssert snapStorageSlotsQuPrioThresh < snapAccountsSaveStorageSlotsMax
doAssert snapStorageSlotsFetchMax < healAccountsBatchFetchMax

View File

@ -25,10 +25,11 @@ type
NodeKey* = distinct ByteArray32
## Hash key without the hash wrapper (as opposed to `NodeTag` which is a
## number)
## number.)
NodeTag* = distinct UInt256
## Trie leaf item, account hash etc.
## Trie leaf item, account hash etc. This data type is a representation
## for a `NodeKey` geared up for arithmetic and comparing keys.
NodeTagRange* = Interval[NodeTag,UInt256]
## Interval `[minPt,maxPt]` of` NodeTag` elements, can be managed in an
@ -249,6 +250,13 @@ proc fullFactor*(lrs: NodeTagRangeSet): float =
else:
1.0 # number of points in `lrs` is `2^256 + 1`
proc fullFactor*(iv: NodeTagRange): float =
## Relative covered length of an inetrval, i.e. `#points-covered / 2^256`
if 0 < iv.len:
iv.len.u256.to(float) / (2.0^256)
else:
1.0 # number of points in `iv` is `2^256 + 1`
# ------------------------------------------------------------------------------
# Public functions: printing & pretty printing
# ------------------------------------------------------------------------------

View File

@ -18,7 +18,8 @@ import
../../utils/prettify,
../misc/best_pivot,
".."/[protocol, sync_desc],
./worker/[pivot_helper, ticker],
./worker/[heal_accounts, heal_storage_slots, pivot_helper,
range_fetch_accounts, range_fetch_storage_slots, ticker],
./worker/com/com_error,
./worker/db/[hexary_desc, snapdb_desc, snapdb_pivot],
"."/[constants, range_desc, worker_desc]
@ -72,19 +73,21 @@ proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} =
return false
rc.value
# Cosmetics: allows other processes to log etc.
await sleepAsync(1300.milliseconds)
# Cosmetics: allow other processes (e.g. ticker) to log the current recovery
# state. There is no other intended purpose of this wait state.
await sleepAsync 1100.milliseconds
when extraTraceMessages:
trace "Recovery continued ...", checkpoint, topLevel,
nAccounts=recov.state.nAccounts, nDangling=recov.state.dangling.len
#when extraTraceMessages:
# trace "Recovery continued ...", checkpoint, topLevel,
# nAccounts=recov.state.nAccounts, nDangling=recov.state.dangling.len
# Update pivot data from recovery checkpoint
env.recoverPivotFromCheckpoint(ctx, topLevel)
# Fetch next recovery record if there is any
if recov.state.predecessor.isZero:
trace "Recovery done", checkpoint, topLevel
#when extraTraceMessages:
# trace "Recovery done", checkpoint, topLevel
return false
let rc = ctx.data.snapDb.recoverPivot(recov.state.predecessor)
if rc.isErr:
@ -138,6 +141,47 @@ proc updateSinglePivot(buddy: SnapBuddyRef): Future[bool] {.async.} =
return true
proc execSnapSyncAction(
env: SnapPivotRef; # Current pivot environment
buddy: SnapBuddyRef; # Worker peer
) {.async.} =
## Execute a synchronisation run.
let
ctx = buddy.ctx
block:
# Clean up storage slots queue first it it becomes too large
let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
if snapStorageSlotsQuPrioThresh < nStoQu:
await buddy.rangeFetchStorageSlots(env)
if buddy.ctrl.stopped or env.archived:
return
if not env.pivotAccountsComplete():
await buddy.rangeFetchAccounts(env)
if buddy.ctrl.stopped or env.archived:
return
await buddy.rangeFetchStorageSlots(env)
if buddy.ctrl.stopped or env.archived:
return
if env.pivotAccountsHealingOk(ctx):
await buddy.healAccounts(env)
if buddy.ctrl.stopped or env.archived:
return
# Some additional storage slots might have been popped up
await buddy.rangeFetchStorageSlots(env)
if buddy.ctrl.stopped or env.archived:
return
# Don't bother with storage slots healing before accounts healing takes
# place. This saves communication bandwidth. The pivot might change soon,
# anyway.
if env.pivotAccountsHealingOk(ctx):
await buddy.healStorageSlots(env)
# ------------------------------------------------------------------------------
# Public start/stop and admin functions
# ------------------------------------------------------------------------------
@ -248,8 +292,8 @@ proc runPool*(buddy: SnapBuddyRef, last: bool): bool =
proc runMulti*(buddy: SnapBuddyRef) {.async.} =
## Enabled while
## * `buddy.ctrl.multiOk` is `true`
## * `buddy.ctrl.poolMode` is `false`
## * `buddy.ctx.multiOk` is `true`
## * `buddy.ctx.poolMode` is `false`
##
let
ctx = buddy.ctx
@ -280,29 +324,41 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} =
# point in keeping some older space consuming state data any longer.
ctx.data.pivotTable.beforeTopMostlyClean()
when extraTraceMessages:
block:
let
nCheckNodes = env.fetchAccounts.checkNodes.len
nSickSubTries = env.fetchAccounts.sickSubTries.len
nAccounts = env.nAccounts
nSlotLists = env.nSlotLists
processed = env.fetchAccounts.processed.fullFactor.toPC(2)
nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
accHealThresh = env.healThresh.toPC(2)
trace "Multi sync runner", peer, pivot, nAccounts, nSlotLists, processed,
nStoQu, accHealThresh, nCheckNodes, nSickSubTries
# This one is the syncing work horse which downloads the database
await env.execSnapSyncAction(buddy)
if env.obsolete:
if env.archived:
return # pivot has changed
# Save state so sync can be partially resumed at next start up
let
nCheckNodes = env.fetchAccounts.checkNodes.len
nSickSubTries = env.fetchAccounts.sickSubTries.len
nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
processed = env.fetchAccounts.processed.fullFactor.toPC(2)
block:
let rc = env.saveCheckpoint(ctx)
# Save state so sync can be partially resumed at next start up
let
nAccounts = env.nAccounts
nSlotLists = env.nSlotLists
processed = env.fetchAccounts.processed.fullFactor.toPC(2)
nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
accHealThresh = env.healThresh.toPC(2)
rc = env.saveCheckpoint(ctx)
if rc.isErr:
error "Failed to save recovery checkpoint", peer, pivot,
nAccounts=env.nAccounts, nSlotLists=env.nSlotLists,
processed, nStoQu, error=rc.error
error "Failed to save recovery checkpoint", peer, pivot, nAccounts,
nSlotLists, processed, nStoQu, error=rc.error
else:
when extraTraceMessages:
trace "Saved recovery checkpoint", peer, pivot,
nAccounts=env.nAccounts, nSlotLists=env.nSlotLists,
processed, nStoQu, blobSize=rc.value
trace "Saved recovery checkpoint", peer, pivot, nAccounts, nSlotLists,
processed, nStoQu, blobSize=rc.value, accHealThresh
if buddy.ctrl.stopped:
return # peer worker has gone

View File

@ -117,7 +117,7 @@ import
".."/[constants, range_desc, worker_desc],
./com/[com_error, get_trie_nodes],
./db/[hexary_desc, hexary_error, snapdb_accounts],
./sub_tries_helper
"."/[sub_tries_helper, swap_in]
{.push raises: [Defect].}
@ -148,10 +148,61 @@ proc healingCtx(
"nCheckNodes=" & $env.fetchAccounts.checkNodes.len & "," &
"nSickSubTries=" & $env.fetchAccounts.sickSubTries.len & "}"
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
template discardRlpError(info: static[string]; code: untyped) =
try:
code
except RlpError as e:
discard
template noExceptionOops(info: static[string]; code: untyped) =
try:
code
except Defect as e:
raise e
except Exception as e:
raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc reorgHealingState(
buddy: SnapBuddyRef;
env: SnapPivotRef;
) =
let
ctx = buddy.ctx
rootKey = env.stateHeader.stateRoot.to(NodeKey)
getFn = ctx.data.snapDb.getAccountFn
nCheckNodes0 = env.fetchAccounts.checkNodes.len
nSickSubTries0 = env.fetchAccounts.sickSubTries.len
nProcessed0 = env.fetchAccounts.processed.fullfactor.toPC(3)
# Reclassify nodes into existing/allocated and dangling ones
if buddy.swapInAccounts(env) == 0:
# Nothing to swap in, so force reclassification
noExceptionOops("reorgHealingState"):
var delayed: seq[NodeSpecs]
for node in env.fetchAccounts.sickSubTries:
if node.nodeKey.ByteArray32.getFn().len == 0:
delayed.add node # still subject to healing
else:
env.fetchAccounts.checkNodes.add node
env.fetchAccounts.sickSubTries = delayed
when extraTraceMessages:
let
nCheckNodes1 = env.fetchAccounts.checkNodes.len
nSickSubTries1 = env.fetchAccounts.sickSubTries.len
trace logTxt "sick nodes reclassified", nCheckNodes0, nSickSubTries0,
nCheckNodes1, nSickSubTries1, nProcessed0
proc updateMissingNodesList(
buddy: SnapBuddyRef;
env: SnapPivotRef;
@ -234,15 +285,14 @@ proc getMissingNodesFromNetwork(
# allows other processes to access the full `sickSubTries` list.
env.fetchAccounts.sickSubTries = env.fetchAccounts.sickSubTries & fetchNodes
let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors):
discard
when extraTraceMessages:
let
error = rc.error
ok = await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors)
when extraTraceMessages:
if ok:
trace logTxt "fetch nodes error => stop", peer,
ctx=buddy.healingCtx(env), error
else:
discard
when extraTraceMessages:
else:
trace logTxt "fetch nodes error", peer,
ctx=buddy.healingCtx(env), error
@ -253,23 +303,31 @@ proc kvAccountLeaf(
buddy: SnapBuddyRef;
node: NodeSpecs;
env: SnapPivotRef;
): (bool,NodeKey,Account)
{.gcsafe, raises: [Defect,RlpError]} =
): (bool,NodeKey,Account) =
## Re-read leaf node from persistent database (if any)
let
peer = buddy.peer
var
nNibbles = -1
nodeRlp = rlpFromBytes node.data
(_,prefix) = hexPrefixDecode node.partialPath
(_,segment) = hexPrefixDecode nodeRlp.listElem(0).toBytes
nibbles = prefix & segment
if nibbles.len == 64:
let data = nodeRlp.listElem(1).toBytes
return (true, nibbles.getBytes.convertTo(NodeKey), rlp.decode(data,Account))
discardRlpError("kvAccountLeaf"):
let
nodeRlp = rlpFromBytes node.data
prefix = (hexPrefixDecode node.partialPath)[1]
segment = (hexPrefixDecode nodeRlp.listElem(0).toBytes)[1]
nibbles = prefix & segment
nNibbles = nibbles.len
if nNibbles == 64:
let
data = nodeRlp.listElem(1).toBytes
nodeKey = nibbles.getBytes.convertTo(NodeKey)
accData = rlp.decode(data,Account)
return (true, nodeKey, accData)
when extraTraceMessages:
trace logTxt "non-leaf node path", peer,
ctx=buddy.healingCtx(env), nNibbles=nibbles.len
trace logTxt "non-leaf node path or corrupt data", peer,
ctx=buddy.healingCtx(env), nNibbles
proc registerAccountLeaf(
@ -313,11 +371,7 @@ proc accountsHealingImpl(
peer = buddy.peer
# Update for changes since last visit
try:
db.getAccountFn.subTriesNodesReclassify(
env.stateHeader.stateRoot.to(NodeKey), env.fetchAccounts)
except Exception as e:
raiseAssert "Not possible @ accountsHealingImpl(" & $e.name & "):" & e.msg
buddy.reorgHealingState(env)
if env.fetchAccounts.sickSubTries.len == 0:
# Traverse the hexary trie for more missing nodes. This call is expensive.

View File

@ -13,12 +13,11 @@ import
bearssl/rand,
chronos,
eth/[common, trie/trie_defs],
stew/[interval_set, keyed_queue],
stew/[interval_set, keyed_queue, sorted_set],
../../sync_desc,
".."/[constants, range_desc, worker_desc],
./db/[hexary_error, snapdb_accounts, snapdb_pivot],
"."/[heal_accounts, heal_storage_slots,
range_fetch_accounts, range_fetch_storage_slots, ticker]
./ticker
{.push raises: [Defect].}
@ -26,6 +25,10 @@ const
extraAsserts = false or true
## Enable some asserts
proc pivotAccountsHealingOk*(env: SnapPivotRef;ctx: SnapCtxRef): bool {.gcsafe.}
proc pivotAccountsComplete*(env: SnapPivotRef): bool {.gcsafe.}
proc pivotMothball*(env: SnapPivotRef) {.gcsafe.}
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
@ -40,9 +43,14 @@ proc init(
batch.unprocessed.init()
batch.processed = NodeTagRangeSet.init()
# Initialise partial path the envelope of which covers the full range of
# account keys `0..high(NodeTag)`. This will trigger healing on the full
# range all possible keys.
# Once applicable when the hexary trie is non-empty, healing is started on
# the full range of all possible accounts. So the partial path batch list
# is initialised with the empty partial path encoded as `@[0]` which refers
# to the first (typically `Branch`) node. The envelope of `@[0]` covers the
# maximum range of accounts.
#
# Note that `@[]` incidentally has the same effect as `@[0]` although it
# is formally no partial path.
batch.checkNodes.add NodeSpecs(
partialPath: @[0.byte],
nodeKey: stateRoot.to(NodeKey))
@ -84,12 +92,7 @@ proc beforeTopMostlyClean*(pivotTable: var SnapPivotTable) =
## usable any more after cleaning but might be useful as historic record.
let rc = pivotTable.beforeLastValue
if rc.isOk:
let env = rc.value
env.fetchStorageFull.clear()
env.fetchStoragePart.clear()
env.fetchAccounts.checkNodes.setLen(0)
env.fetchAccounts.sickSubTries.setLen(0)
env.obsolete = true
rc.value.pivotMothball
proc topNumber*(pivotTable: var SnapPivotTable): BlockNumber =
@ -100,10 +103,10 @@ proc topNumber*(pivotTable: var SnapPivotTable): BlockNumber =
proc update*(
pivotTable: var SnapPivotTable; ## Pivot table
header: BlockHeader; ## Header to generate new pivot from
ctx: SnapCtxRef; ## Some global context
reverse = false; ## Update from bottom (e.g. for recovery)
pivotTable: var SnapPivotTable; # Pivot table
header: BlockHeader; # Header to generate new pivot from
ctx: SnapCtxRef; # Some global context
reverse = false; # Update from bottom (e.g. for recovery)
) =
## Activate environment for state root implied by `header` argument. This
## function appends a new environment unless there was any not far enough
@ -112,6 +115,14 @@ proc update*(
## Note that the pivot table is assumed to be sorted by the block numbers of
## the pivot header.
##
# Calculate minimum block distance.
let minBlockDistance = block:
let rc = pivotTable.lastValue
if rc.isOk and rc.value.pivotAccountsHealingOk(ctx):
pivotBlockDistanceThrottledPivotChangeMin
else:
pivotBlockDistanceMin
# Check whether the new header follows minimum depth requirement. This is
# where the queue is assumed to have increasing block numbers.
if reverse or
@ -122,6 +133,7 @@ proc update*(
stateHeader: header,
fetchAccounts: SnapRangeBatchRef())
env.fetchAccounts.init(header.stateRoot, ctx)
env.storageAccounts.init()
var topEnv = env
# Append per-state root environment to LRU queue
@ -139,7 +151,8 @@ proc update*(
topEnv = pivotTable.lastValue.value
else:
discard pivotTable.lruAppend(header.stateRoot, env, ctx.buddiesMax)
discard pivotTable.lruAppend(
header.stateRoot, env, pivotTableLruEntriesMax)
# Update healing threshold
let
@ -149,8 +162,8 @@ proc update*(
proc tickerStats*(
pivotTable: var SnapPivotTable; ## Pivot table
ctx: SnapCtxRef; ## Some global context
pivotTable: var SnapPivotTable; # Pivot table
ctx: SnapCtxRef; # Some global context
): TickerStatsUpdater =
## This function returns a function of type `TickerStatsUpdater` that prints
## out pivot table statitics. The returned fuction is supposed to drive
@ -181,7 +194,6 @@ proc tickerStats*(
let sLen = kvp.data.nSlotLists.float
sSum += sLen
sSqSum += sLen * sLen
let
env = ctx.data.pivotTable.lastValue.get(otherwise = nil)
accCoverage = ctx.data.coveredAccounts.fullFactor
@ -210,16 +222,41 @@ proc tickerStats*(
# Public functions: particular pivot
# ------------------------------------------------------------------------------
proc pivotMothball*(env: SnapPivotRef) =
## Clean up most of this argument `env` pivot record and mark it `archived`.
## Note that archived pivots will be checked for swapping in already known
## accounts and storage slots.
env.fetchAccounts.checkNodes.setLen(0)
env.fetchAccounts.sickSubTries.setLen(0)
env.fetchAccounts.unprocessed.init()
# Simplify storage slots queues by resolving partial slots into full list
for kvp in env.fetchStoragePart.nextPairs:
discard env.fetchStorageFull.append(
kvp.key, SnapSlotsQueueItemRef(acckey: kvp.data.accKey))
env.fetchStoragePart.clear()
# Provide index into `fetchStorageFull`
env.storageAccounts.clear()
for kvp in env.fetchStorageFull.nextPairs:
let rc = env.storageAccounts.insert(kvp.data.accKey.to(NodeTag))
# Note that `rc.isErr` should not exist as accKey => storageRoot
if rc.isOk:
rc.value.data = kvp.key
# Finally, mark that node `archived`
env.archived = true
proc pivotAccountsComplete*(
env: SnapPivotRef; ## Current pivot environment
env: SnapPivotRef; # Current pivot environment
): bool =
## Returns `true` if accounts are fully available for this this pivot.
env.fetchAccounts.processed.isFull
proc pivotAccountsHealingOk*(
env: SnapPivotRef; ## Current pivot environment
ctx: SnapCtxRef; ## Some global context
env: SnapPivotRef; # Current pivot environment
ctx: SnapCtxRef; # Some global context
): bool =
## Returns `true` if accounts healing is enabled for this pivot.
##
@ -238,47 +275,9 @@ proc pivotAccountsHealingOk*(
return true
proc execSnapSyncAction*(
env: SnapPivotRef; ## Current pivot environment
buddy: SnapBuddyRef; ## Worker peer
) {.async.} =
## Execute a synchronisation run.
let
ctx = buddy.ctx
block:
# Clean up storage slots queue first it it becomes too large
let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
if snapStorageSlotsQuPrioThresh < nStoQu:
await buddy.rangeFetchStorageSlots(env)
if buddy.ctrl.stopped or env.obsolete:
return
if not env.pivotAccountsComplete():
await buddy.rangeFetchAccounts(env)
if buddy.ctrl.stopped or env.obsolete:
return
await buddy.rangeFetchStorageSlots(env)
if buddy.ctrl.stopped or env.obsolete:
return
if env.pivotAccountsHealingOk(ctx):
await buddy.healAccounts(env)
if buddy.ctrl.stopped or env.obsolete:
return
# Some additional storage slots might have been popped up
await buddy.rangeFetchStorageSlots(env)
if buddy.ctrl.stopped or env.obsolete:
return
await buddy.healStorageSlots(env)
proc saveCheckpoint*(
env: SnapPivotRef; ## Current pivot environment
ctx: SnapCtxRef; ## Some global context
env: SnapPivotRef; # Current pivot environment
ctx: SnapCtxRef; # Some global context
): Result[int,HexaryError] =
## Save current sync admin data. On success, the size of the data record
## saved is returned (e.g. for logging.)
@ -304,9 +303,9 @@ proc saveCheckpoint*(
proc recoverPivotFromCheckpoint*(
env: SnapPivotRef; ## Current pivot environment
ctx: SnapCtxRef; ## Global context (containing save state)
topLevel: bool; ## Full data set on top level only
env: SnapPivotRef; # Current pivot environment
ctx: SnapCtxRef; # Global context (containing save state)
topLevel: bool; # Full data set on top level only
) =
## Recover some pivot variables and global list `coveredAccounts` from
## checkpoint data. If the argument `toplevel` is set `true`, also the
@ -323,27 +322,34 @@ proc recoverPivotFromCheckpoint*(
# Import processed interval
for (minPt,maxPt) in recov.state.processed:
if topLevel:
discard env.fetchAccounts.processed.merge(minPt, maxPt)
env.fetchAccounts.unprocessed.reduce(minPt, maxPt)
discard env.fetchAccounts.processed.merge(minPt, maxPt)
discard ctx.data.coveredAccounts.merge(minPt, maxPt)
# Handle storage slots
if topLevel:
let stateRoot = recov.state.header.stateRoot
for w in recov.state.slotAccounts:
let pt = NodeTagRange.new(w.to(NodeTag),w.to(NodeTag))
let stateRoot = recov.state.header.stateRoot
for w in recov.state.slotAccounts:
let pt = NodeTagRange.new(w.to(NodeTag),w.to(NodeTag))
if 0 < env.fetchAccounts.processed.covered(pt):
# Ignoring slots that have accounts to be downloaded, anyway
let rc = ctx.data.snapDb.getAccountsData(stateRoot, w)
if rc.isErr:
# Oops, how did that account get lost?
discard env.fetchAccounts.processed.reduce pt
env.fetchAccounts.unprocessed.merge pt
elif rc.value.storageRoot != emptyRlpHash:
env.fetchStorageFull.merge AccountSlotsHeader(
accKey: w,
storageRoot: rc.value.storageRoot)
if 0 < env.fetchAccounts.processed.covered(pt):
# Ignoring slots that have accounts to be downloaded, anyway
let rc = ctx.data.snapDb.getAccountsData(stateRoot, w)
if rc.isErr:
# Oops, how did that account get lost?
discard env.fetchAccounts.processed.reduce pt
env.fetchAccounts.unprocessed.merge pt
elif rc.value.storageRoot != emptyRlpHash:
env.fetchStorageFull.merge AccountSlotsHeader(
accKey: w,
storageRoot: rc.value.storageRoot)
# Handle mothballed pivots for swapping in (see `pivotMothball()`)
if not topLevel:
for kvp in env.fetchStorageFull.nextPairs:
let rc = env.storageAccounts.insert(kvp.data.accKey.to(NodeTag))
if rc.isOk:
rc.value.data = kvp.key
env.archived = true
# ------------------------------------------------------------------------------
# End

View File

@ -39,7 +39,8 @@ import
../../sync_desc,
".."/[constants, range_desc, worker_desc],
./com/[com_error, get_account_range],
./db/[hexary_envelope, snapdb_accounts]
./db/[hexary_envelope, snapdb_accounts],
"."/[pivot_helper, swap_in]
{.push raises: [Defect].}
@ -127,7 +128,7 @@ proc accountsRangefetchImpl(
buddy.data.errors.resetComError()
let
gotAccounts = dd.data.accounts.len
gotAccounts = dd.data.accounts.len # comprises `gotStorage`
gotStorage = dd.withStorage.len
#when extraTraceMessages:
@ -179,10 +180,29 @@ proc accountsRangefetchImpl(
# Register accounts with storage slots on the storage TODO list.
env.fetchStorageFull.merge dd.withStorage
var nSwapInLaps = 0
if env.archived:
# Current pivot just became outdated, rebuild storage slots index (if any)
if 0 < gotStorage:
trace logTxt "mothballing", peer, pivot, gotStorage
env.pivotMothball
elif swapInAccountsCoverageTrigger <= ctx.data.coveredAccounts.fullFactor:
# Swap in from other pivots
when extraTraceMessages:
trace logTxt "before swap in", peer, pivot, gotAccounts, gotStorage,
coveredHere=covered.fullFactor.toPC(2),
processed=fa.processed.fullFactor.toPC(2),
nProcessedChunks=fa.processed.chunks.uint.toSI
if swapInAccountsPivotsMin <= ctx.data.pivotTable.len:
nSwapInLaps = buddy.swapInAccounts(env)
when extraTraceMessages:
trace logTxt "request done", peer, pivot,
covered=covered.fullFactor.toPC(2),
processed=fa.processed.fullFactor.toPC(2)
trace logTxt "request done", peer, pivot, gotAccounts, gotStorage,
nSwapInLaps, coveredHere=covered.fullFactor.toPC(2),
processed=fa.processed.fullFactor.toPC(2),
nProcessedChunks=fa.processed.chunks.uint.toSI
return true
@ -210,7 +230,7 @@ proc rangeFetchAccounts*(
var nFetchAccounts = 0 # for logging
while not fa.processed.isFull() and
buddy.ctrl.running and
not env.obsolete:
not env.archived:
nFetchAccounts.inc
if not await buddy.accountsRangefetchImpl(env):
break

View File

@ -333,7 +333,7 @@ proc rangeFetchStorageSlots*(
var fullRangeItemsleft = 1 + (fullRangeLen-1) div snapStorageSlotsFetchMax
while 0 < fullRangeItemsleft and
buddy.ctrl.running and
not env.obsolete:
not env.archived:
# Pull out the next request list from the queue
let req = buddy.getNextSlotItemsFull(env)
if req.len == 0:
@ -344,7 +344,7 @@ proc rangeFetchStorageSlots*(
var partialRangeItemsLeft = env.fetchStoragePart.len
while 0 < partialRangeItemsLeft and
buddy.ctrl.running and
not env.obsolete:
not env.archived:
# Pull out the next request list from the queue
let req = buddy.getNextSlotItemPartial(env)
if req.len == 0:

View File

@ -15,7 +15,7 @@ import
eth/[common, p2p],
stew/interval_set,
".."/[constants, range_desc, worker_desc],
./db/[hexary_desc, hexary_error, hexary_envelope, hexary_inspect]
./db/[hexary_desc, hexary_error, hexary_inspect]
{.push raises: [Defect].}
@ -53,22 +53,6 @@ proc doInspect(
ok(stats)
proc getOverlapping(
batch: SnapRangeBatchRef; ## Healing data support
iv: NodeTagRange; ## Reference interval
): Result[NodeTagRange,void] =
## Find overlapping interval in `batch`
block:
let rc = batch.processed.ge iv.minPt
if rc.isOk and rc.value.minPt <= iv.maxPt:
return ok(rc.value)
block:
let rc = batch.processed.le iv.maxPt
if rc.isOk and iv.minPt <= rc.value.maxPt:
return ok(rc.value)
err()
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
@ -141,87 +125,6 @@ proc subTriesFromPartialPaths*(
batch.lockTriePerusal = false
return err(error)
proc subTriesNodesReclassify*(
getFn: HexaryGetFn; ## Abstract database access
rootKey: NodeKey; ## Start node into hexary trie
batch: SnapRangeBatchRef; ## Healing data support
) {.gcsafe, raises: [Defect,KeyError].} =
## Check whether previously missing nodes from the `sickSubTries` list have
## been magically added to the database since it was checked last time. These
## nodes will me moved to `checkNodes` for further processing. Also, some
## full sub-tries might have been added which can be checked against
## the `processed` range set.
# Move `sickSubTries` entries that have now an exisiting node to the
# list of partial paths to be re-checked.
block:
var delayed: seq[NodeSpecs]
for w in batch.sickSubTries:
if 0 < getFn(w.nodeKey.ByteArray32).len:
batch.checkNodes.add w
else:
delayed.add w
batch.sickSubTries = delayed
# Remove `checkNodes` entries with complete known sub-tries.
var
doneWith: seq[NodeSpecs] # loop will not recurse on that list
count = 0 # for logging only
# `While` loop will terminate with processed paths in `doneWith`.
block:
var delayed: seq[NodeSpecs]
while 0 < batch.checkNodes.len:
when extraTraceMessages:
trace logTxt "reclassify", count,
nCheckNodes=batch.checkNodes.len
for w in batch.checkNodes:
let
iv = w.hexaryEnvelope
nCov = batch.processed.covered iv
if iv.len <= nCov:
# Fully processed envelope, no need to keep `w` any longer
when extraTraceMessages:
trace logTxt "reclassify discard", count, partialPath=w,
nDelayed=delayed.len
continue
if 0 < nCov:
# Partially processed range, fetch an overlapping interval and
# remove that from the envelope of `w`.
try:
let paths = block:
let rc = w.partialPath.hexaryEnvelopeDecompose(
rootKey, batch.getOverlapping(iv).value, getFn)
if rc.isErr:
continue
rc.value
delayed &= paths
when extraTraceMessages:
trace logTxt "reclassify dismantled", count, partialPath=w,
nPaths=paths.len, nDelayed=delayed.len
continue
except RlpError:
discard
# Not processed at all. So keep `w` but there is no need to look
# at it again in the next lap.
doneWith.add w
# Prepare for next lap
batch.checkNodes.swap delayed
delayed.setLen(0)
batch.checkNodes = doneWith.hexaryEnvelopeUniq
when extraTraceMessages:
trace logTxt "reclassify finalise", count,
nDoneWith=doneWith.len, nCheckNodes=batch.checkNodes.len
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,392 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Swap in already allocated sub-tries
## ===================================
##
## This module imports sub-tries from other pivots into the current. It does
## so by detecting the top of an existing sub-trie in the current pivot and
## searches other pivots for the part of the sub-trie that is already
## available there. So it can be marked accomplished on the current pivot.
##
## Note that the terminology hinges on *account pivots* but is implemented in
## a more general way where
##
## * the current pivot is of type `SnapRangeBatchRef`
##
## * other pivots are represented by an iterator of type `SwapInPivots`
##
## So the algorithm can be transferred to other that accounting pivot
## situations.
##
## Algorithm
## ---------
##
## * On the *current pivot*, use the `processed` ranges of accounts to find all
## the nodes the envelopes of which are disjunct to the `processed` ranges
## (see module `hexary_envelope` for supporting functions.)
##
## * Select all the non-dangling/existing nodes disjunct envelopes from the
## previous step.
##
## * For all the selected non-dangling nodes from the previous step, check
## which ones are present in other pivots. This means that for a given
## existing node in the current pivot its *partial path* can be applied
## to the *state root* key of another pivot ending up at the same node key.
##
## The portion of `processed` ranges on the other pivot that intersects with
## the envelope of the node has been downloaded already. It is equally
## applicable to the current pivot as it applies to the same sub-trie. So
## the intersection of `processed` with the node envelope can be copied to
## to the `processed` ranges of the current pivot.
##
## * Rinse and repeat.
##
import
std/[sequtils, strutils],
chronicles,
eth/[common, p2p],
stew/[byteutils, interval_set, keyed_queue, sorted_set],
../../../utils/prettify,
".."/[range_desc, worker_desc],
./db/[hexary_desc, hexary_error, hexary_envelope,
hexary_paths, snapdb_accounts]
{.push raises: [Defect].}
logScope:
topics = "snap-swapin"
type
SwapInPivot = object
## Subset of `SnapPivotRef` with relevant parts, only
rootKey: NodeKey ## Storage slots & accounts
processed: NodeTagRangeSet ## Storage slots & accounts
pivot: SnapPivotRef ## Accounts only
const
extraTraceMessages = false or true
## Enabled additional logging noise
when extraTraceMessages:
import std/math, ../../types
# ------------------------------------------------------------------------------
# Private logging helpers
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"Swap-in helper " & info
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc decompose(
node: NodeSpecs; # Contains hex encoded partial path
iv: NodeTagRange; # Proofed range of leaf paths
rootKey: NodeKey; # Start node into hexary trie
getFn: HexaryGetFn; # Abstract database access
): Result[seq[NodeSpecs],void] =
## Decompose, succeed only if there is a change
var error: HexaryError
try:
let rc = node.partialPath.hexaryEnvelopeDecompose(rootKey, iv, getFn)
if rc.isErr:
error = rc.error
elif rc.value.len != 1 or rc.value[0].nodeKey != node.nodeKey:
return ok(rc.value)
else:
return err()
except RlpError:
error = RlpEncoding
when extraTraceMessages:
trace logTxt "envelope decomposition failed",
node=node.partialPath.toHex, error
err()
proc existsInTrie(
node: NodeSpecs; # Probe node to test to exist
rootKey: NodeKey; # Start node into hexary trie
getFn: HexaryGetFn; # Abstract database access
): bool =
## Check whether this node exists on the sub-trie starting at ` rootKey`
var error: HexaryError
try:
let rc = node.partialPath.hexaryPathNodeKey(rootKey, getFn)
if rc.isOk:
return rc.value == node.nodeKey
except RlpError:
error = RlpEncoding
when extraTraceMessages:
trace logTxt "check nodes failed",
partialPath=node.partialPath.toHex, error
false
template noKeyErrorOops(info: static[string]; code: untyped) =
try:
code
except KeyError as e:
raiseAssert "Not possible (" & info & "): " & e.msg
template noExceptionOops(info: static[string]; code: untyped) =
try:
code
except Defect as e:
raise e
except Exception as e:
raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc decomposeCheckNodes(
pivot: SnapRangeBatchRef; # Healing data support
rootKey: NodeKey; # Start node into hexary trie
getFn: HexaryGetFn; # Abstract database access
): Result[seq[NodeSpecs],void] =
## Decompose the `checkNodes` list of the argument `pivot` relative to the
## set `processed` of processed leaf node ranges.
##
## The function fails if there wan no change to the `checkNodes` list.
var
delayed: seq[NodeSpecs]
didSomething = 0
# Remove `checkNodes` entries with known complete sub-tries.
for node in pivot.checkNodes:
var paths: seq[NodeSpecs]
# For a Partially processed range, fetch overlapping intervals and
# sort of remove them from the envelope of `w`.
for touched in pivot.processed.hexaryEnvelopeTouchedBy(node).increasing:
let rc = node.decompose(touched, rootKey, getFn)
if rc.isOk:
paths &= rc.value
didSomething.inc
when extraTraceMessages:
trace logTxt "checkNodes decompose", nDelayed=delayed.len,
node=node.partialPath.toHex, nPaths=paths.len,
newPaths=rc.value.mapIt(it.partialPath.toHex).join(",")
# End inner for()
delayed &= paths
# End outer for()
if 0 < didSomething:
noKeyErrorOops("subTriesCheckNodesDecompose"):
# Remove duplicates in resulting path list
return ok(delayed.hexaryEnvelopeUniq)
err()
proc otherProcessedRanges(
node: NodeSpecs; # Top node of portential sub-trie
otherPivots: seq[SwapInPivot]; # Other pivots list
rootKey: NodeKey; # Start node into hexary trie
getFn: HexaryGetFn; # Abstract database access
): seq[NodeTagRangeSet] =
## Collect already processed ranges from other pivots intersecting with the
## envelope of the argument `node`. The list of other pivots is represented
## by the argument iterator `otherPivots`.
let envelope = node.hexaryEnvelope
var count = 0 # logging & debugging
noExceptionOops("otherProcessedRanges"):
# For the current `node` select all hexary sub-tries that contain the same
# node `node.nodeKey` for the partial path `node.partianPath`.
for rp in otherPivots.items:
# Check whether the node is shared
let haveNode = node.existsInTrie(rp.rootKey, getFn)
var subCount = 0 # logging & debugging
count.inc # logging & debugging
result.add NodeTagRangeSet.init()
if not haveNode:
trace logTxt "history loop", count, node=node.partialPath.toHex,
processed=rp.processed.fullFactor.toPC(3), haveNode
if haveNode:
when extraTraceMessages:
trace logTxt "history loop => sub start", count,
nTouched=rp.processed.hexaryEnvelopeTouchedBy(node).chunks, haveNode
# Import already processed part of the envelope of `node` into the
# `batch.processed` set of ranges.
for iv in rp.processed.hexaryEnvelopeTouchedBy(node).increasing:
let segment = (envelope * iv).value
discard result[^1].merge segment
subCount.inc # dlogging & ebugging
when extraTraceMessages:
trace logTxt "history loop => sub", count, subCount,
touchedLen=segment.fullFactor.toPC(3)
# ------------------------------------------------------------------------------
# Private functions, swap-in functionality
# ------------------------------------------------------------------------------
proc swapIn*(
pivot: SnapRangeBatchRef; # Healing state for target hexary trie
otherPivots: seq[SwapInPivot]; # Other pivots list
rootKey: NodeKey; # Start node into target hexary trie
getFn: HexaryGetFn; # Abstract database access
loopMax = 20; # Prevent from looping too often
): (int,seq[NodeTagRangeSet]) =
## Collect processed already ranges from argument `otherPivots` and register
## it onto the argument `pivot`. This function recognises and imports
## directly accessible sub-tries where the top-level node exists.
var
lapCount = 0
notDoneYet = true
swappedIn = newSeq[NodeTagRangeSet](otherPivots.len)
# Initialise return value
for n in 0 ..< swappedIn.len:
swappedIn[n] = NodeTagRangeSet.init()
while notDoneYet and lapCount < loopMax:
var
merged = 0.u256
nCheckNodesBefore = 0 # debugging
# Decompose `checkNodes` into sub-tries disjunct from `processed`
let toBeReclassified = block:
let rc = pivot.decomposeCheckNodes(rootKey, getFn)
if rc.isErr:
return (lapCount,swappedIn) # nothing to do
rc.value
lapCount.inc
notDoneYet = false
# Reclassify nodes into existing/allocated and dangling ones
noKeyErrorOops("swapIn"):
var
checkNodes: seq[NodeSpecs]
sickNodes: seq[NodeSpecs]
for node in toBeReclassified:
# Check whether previously missing nodes from the `sickSubTries` list
# have been magically added to the database since it was checked last
# time. These nodes will me moved to `checkNodes` for further
# processing.
if node.nodeKey.ByteArray32.getFn().len == 0:
sickNodes.add node # probably subject to healing
else:
let iv = node.hexaryEnvelope
if pivot.processed.covered(iv) < iv.len:
checkNodes.add node # may be swapped in
pivot.checkNodes = checkNodes.hexaryEnvelopeUniq
pivot.sickSubTries = sickNodes.hexaryEnvelopeUniq
nCheckNodesBefore = pivot.checkNodes.len # logging & debugging
# Swap in node ranges from other pivots
for node in pivot.checkNodes:
for n,rangeSet in node.otherProcessedRanges(otherPivots,rootKey,getFn):
for iv in rangeSet.increasing:
discard swappedIn[n].merge iv # imported range / other pivot
merged += pivot.processed.merge iv # import this range
pivot.unprocessed.reduce iv # no need to fetch it again
notDoneYet = 0 < merged # loop control
# Remove fully covered nodes
block:
var checkNodes: seq[NodeSpecs]
for node in toBeReclassified:
let iv = node.hexaryEnvelope
if pivot.processed.covered(iv) < iv.len:
checkNodes.add node # may be swapped in
pivot.checkNodes = checkNodes.hexaryEnvelopeUniq
when extraTraceMessages:
let mergedFactor = merged.to(float) / (2.0^256)
trace logTxt "inherited ranges", nCheckNodesBefore,
nCheckNodes=pivot.checkNodes.len, merged=mergedFactor.toPC(3)
# End while()
(lapCount,swappedIn)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc swapInAccounts*(
buddy: SnapBuddyRef; # Worker peer
env: SnapPivotRef; # Current pivot environment
loopMax = 20; # Prevent from looping too often
): int =
## Variant of `swapIn()` for the particular case of accounts database pivots.
let
ctx = buddy.ctx
rootKey = env.stateHeader.stateRoot.to(NodeKey)
getFn = ctx.data.snapDb.getAccountFn
others = toSeq(ctx.data.pivotTable.nextPairs)
# Swap in from mothballed pifots different from the current one
.filterIt(it.data.archived and it.key.to(NodeKey) != rootKey)
# Extract relevant parts
.mapIt(SwapInPivot(
rootKey: it.key.to(NodeKey),
processed: it.data.fetchAccounts.processed,
pivot: it.data))
var
nLaps: int
swappedIn: seq[NodeTagRangeSet]
noExceptionOops("swapInAccounts"):
(nLaps,swappedIn) = env.fetchAccounts.swapIn(others,rootKey,getFn,loopMax)
noKeyErrorOops("swapInAccounts"):
# Update storage slots
doAssert swappedIn.len == others.len
for n in 0 ..< others.len:
when extraTraceMessages:
trace logTxt "post-processing storage slots", inx=n, maxInx=others.len,
changes=swappedIn[n].fullFactor.toPC(3), chunks=swappedIn[n].chunks
# Revisit all imported account key ranges
for iv in swappedIn[n].increasing:
# The `storageAccounts` list contains indices for storage slots, mapping
# account keys => storage root
var rc = others[n].pivot.storageAccounts.ge(iv.minPt)
while rc.isOk and rc.value.key <= iv.maxPt:
# Fetch storage slots specs from `fetchStorageFull` list
let stRoot = rc.value.data
if others[n].pivot.fetchStorageFull.hasKey(stRoot):
let accKey = others[n].pivot.fetchStorageFull[stRoot].accKey
discard env.fetchStorageFull.append(
stRoot, SnapSlotsQueueItemRef(acckey: accKey))
rc = others[n].pivot.storageAccounts.gt(rc.value.key)
nLaps
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -11,7 +11,7 @@
import
std/hashes,
eth/[common, p2p],
stew/[interval_set, keyed_queue],
stew/[interval_set, keyed_queue, sorted_set],
../../db/select_backend,
../sync_desc,
./worker/com/com_error,
@ -22,6 +22,9 @@ import
{.push raises: [Defect].}
type
SnapAccountsList* = SortedSet[NodeTag,Hash256]
## Sorted pair of `(account,state-root)` entries
SnapSlotsQueue* = KeyedQueue[Hash256,SnapSlotsQueueItemRef]
## Handles list of storage slots data for fetch indexed by storage root.
##
@ -72,7 +75,10 @@ type
# Info
nAccounts*: uint64 ## Imported # of accounts
nSlotLists*: uint64 ## Imported # of account storage tries
obsolete*: bool ## Not latest pivot, anymore
# Mothballing, ready to be swapped into newer pivot record
storageAccounts*: SnapAccountsList ## Accounts with missing stortage slots
archived*: bool ## Not latest pivot, anymore
SnapPivotTable* = KeyedQueue[Hash256,SnapPivotRef]
## LRU table, indexed by state root