Snap sync can start on saved checkpoint (#1327)

* Stop negotiating pivot if peer repeatedly replies w/usesless answers

why:
  There is some fringe condition where a peer replies with legit but
  useless empty headers repetely. This goes on until somebody stops.
  We stop now.

* Rename `missingNodes` => `sickSubTries`

why:
  These (probably missing) nodes represent in reality fully or partially
  missing sub-tries. The top nodes may even exist, e.g. as a shallow
  sub-trie.

also:
  Keep track of account healing on/of by bool variable `accountsHealing`
  controlled in `pivot_helper.execSnapSyncAction()`

* Add `nimbus` option argument `snapCtx` for starting snap recovery (if any)

also:
+ Trigger the recovery (or similar) process from inside the global peer
  worker initialisation `worker.setup()` and not by the `snap.start()`
  function.
+ Have `runPool()` returned a `bool` code to indicate early stop to
  scheduler.

* Can import partial snap sync checkpoint at start

details:
 + Modified what is stored with the checkpoint in `snapdb_pivot.nim`
 + Will be loaded within `runDaemon()` if activated

* Forgot to import total coverage range

why:
  Only the top (or latest) pivot needs coverage but the total coverage
  is the list of all ranges for all pivots -- simply forgotten.
This commit is contained in:
Jordan Hrycaj 2022-11-25 14:56:42 +00:00 committed by GitHub
parent 66439d69ca
commit 7688148565
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 479 additions and 318 deletions

View File

@ -135,6 +135,7 @@ type
Default
Full ## Beware, experimental
Snap ## Beware, experimental
SnapCtx ## Beware, experimental
NimbusConf* = object of RootObj
## Main Nimbus configuration object
@ -166,7 +167,8 @@ type
longDesc:
"- default -- legacy sync mode\n" &
"- full -- full blockchain archive\n" &
"- snap -- experimental snap mode (development only)\n"
"- snap -- experimental snap mode (development only)\n" &
"- snapCtx -- snap considering possible recovery context\n"
defaultValue: SyncMode.Default
defaultValueDesc: $SyncMode.Default
abbr: "y"

View File

@ -155,7 +155,7 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
)
nimbus.ethNode.addCapability(protocol.eth, ethWireHandler)
case conf.syncMode:
of SyncMode.Snap:
of SyncMode.Snap, SyncMode.SnapCtx:
nimbus.ethNode.addCapability protocol.snap
of SyncMode.Full, SyncMode.Default:
discard
@ -173,10 +173,10 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers,
tickerOK)
nimbus.fullSyncRef.start
of SyncMode.Snap:
of SyncMode.Snap, SyncMode.SnapCtx:
nimbus.snapSyncRef = SnapSyncRef.init(
nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers,
nimbus.dbBackend, tickerOK)
nimbus.dbBackend, tickerOK, noRecovery = (conf.syncMode==SyncMode.Snap))
nimbus.snapSyncRef.start
of SyncMode.Default:
discard
@ -196,7 +196,7 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
if conf.maxPeers > 0:
var waitForPeers = true
case conf.syncMode:
of SyncMode.Snap:
of SyncMode.Snap, SyncMode.SnapCtx:
waitForPeers = false
of SyncMode.Full, SyncMode.Default:
discard
@ -433,7 +433,7 @@ proc start(nimbus: NimbusNode, conf: NimbusConf) =
cast[pointer](syncer)
)
of SyncMode.Full, SyncMode.Snap:
of SyncMode.Full, SyncMode.Snap, SyncMode.SnapCtx:
discard
if nimbus.state == Starting:

View File

@ -42,7 +42,7 @@ proc runStart(buddy: FullBuddyRef): bool =
proc runStop(buddy: FullBuddyRef) =
worker.stop(buddy)
proc runPool(buddy: FullBuddyRef; last: bool) =
proc runPool(buddy: FullBuddyRef; last: bool): bool =
worker.runPool(buddy, last)
proc runSingle(buddy: FullBuddyRef) {.async.} =

View File

@ -33,6 +33,10 @@ const
pivotMinPeersToStartSync* = 2
## Wait for consensus of at least this number of peers before syncing.
pivotFailCountMax* = 3
## Stop after a peer fails too often while negotiating. This happens if
## a peer responses repeatedly with useless data.
type
BestPivotCtxRef* = ref object of RootRef
## Data shared by all peers.
@ -47,6 +51,7 @@ type
header: Option[BlockHeader] ## Pivot header (if any)
ctrl: BuddyCtrlRef ## Control and state settings
peer: Peer ## network peer
comFailCount: int ## Beware of repeated network errors
# ------------------------------------------------------------------------------
# Private helpers
@ -139,7 +144,11 @@ proc getBestHeader(
return err()
if hdrResp.isNone:
trace trEthRecvReceivedBlockHeaders, peer, reqLen, respose="n/a"
bp.comFailCount.inc
trace trEthRecvReceivedBlockHeaders, peer, reqLen,
response="n/a", comFailCount=bp.comFailCount
if pivotFailCountMax < bp.comFailCount:
bp.ctrl.zombie = true
return err()
let hdrRespLen = hdrResp.get.headers.len
@ -148,6 +157,7 @@ proc getBestHeader(
header = hdrResp.get.headers[0]
blockNumber = header.blockNumber
trace trEthRecvReceivedBlockHeaders, peer, hdrRespLen, blockNumber
bp.comFailCount = 0 # reset fail count
return ok(header)
trace trEthRecvReceivedBlockHeaders, peer, reqLen, hdrRespLen

View File

@ -44,7 +44,7 @@ proc runStart(buddy: SnapBuddyRef): bool =
proc runStop(buddy: SnapBuddyRef) =
worker.stop(buddy)
proc runPool(buddy: SnapBuddyRef; last: bool) =
proc runPool(buddy: SnapBuddyRef; last: bool): bool =
worker.runPool(buddy, last)
proc runSingle(buddy: SnapBuddyRef) {.async.} =
@ -63,18 +63,21 @@ proc init*(
chain: Chain;
rng: ref HmacDrbgContext;
maxPeers: int;
dbBackend: ChainDb,
enableTicker = false): T =
dbBackend: ChainDb;
enableTicker = false;
noRecovery = false;
): T =
new result
result.initSync(ethNode, chain, maxPeers, enableTicker)
result.ctx.chain = chain # explicitely override
result.ctx.data.rng = rng
result.ctx.data.dbBackend = dbBackend
result.ctx.data.noRecovery = noRecovery
# Required to have been initialised via `addCapability()`
doAssert not result.ctx.ethWireCtx.isNil
proc start*(ctx: SnapSyncRef) =
doAssert ctx.startSync(daemon = true)
doAssert ctx.startSync()
proc stop*(ctx: SnapSyncRef) =
ctx.stopSync()

View File

@ -37,24 +37,28 @@ const
snapRequestBytesLimit* = 2 * 1024 * 1024
## Soft bytes limit to request in `snap` protocol calls.
snapAccountsSaveDanglingMax* = 10_000
## Recovery data are stored if the healing register
## `fetchAccounts.missingNodes` with dangling node links has no more
## than this many entries. Upon recovery, these dangling links allow
## to reconstuct the needed ranges to complete the hexary trie for the
## account fot current pivot.
snapRequestTrieNodesFetchMax* = 1024
## Informal maximal number of trie nodes to fetch at once in `snap`
## protocol calls. This is not an official limit but found with several
## implementations (e.g. Geth.)
##
## Resticting the fetch list length early allows to better paralellise
## healing.
snapAccountsSaveProcessedChunksMax* = 1000
## Recovery data are stored if the processed ranges list contains no more
## than this many range `chunks`.
##
## If there are too many dangling nodes, no data will be saved and restart
## has to perform from scratch.
snapAccountsSaveStorageSlotsMax* = 10_000
## Similar retriction as `snapAccountsSaveDanglingMax` but for the joint
## queues `fetchStorageFull` and `fetchStoragePart`. If the joint queue
## becomes too large, nothing is saved.
## Recovery data are stored if the oustanding storage slots to process do
## not amount to more than this many entries.
##
## Note thet the length of the jount queue is controlled by the constat
## `snapStorageSlotsQuPrioThresh` which should be smaller than
## this one.
## If there are too many dangling nodes, no data will be saved and restart
## has to perform from scratch.
snapStorageSlotsFetchMax* = 2 * 1024
@ -68,16 +72,17 @@ const
## and switch to processing the storage slots queue if the queue has
## more than this many items.
snapTrieNodesFetchMax* = 1024
## Informal maximal number of trie nodes to fetch at once. This is not
## an official limit but found on several implementations (e.g. Geth.)
##
## Resticting the fetch list length early allows to better paralellise
## healing.
# --------------
healInspectionBatch* = 10_000
## Number of nodes to inspect in a single batch. In between batches, a
## task/thread switch is allowed.
healInspectionBatchWaitNanoSecs* = 500
## Wait some time asynchroneously after processing `healInspectionBatch`
## nodes to allow for a pseudo -task switch.
healAccountsTrigger* = 0.99
## Apply accounts healing if the global snap download coverage factor
## exceeds this setting. The global coverage factor is derived by merging
@ -90,11 +95,6 @@ const
## over the network. More requests might be a disadvantage if peers only
## serve a maximum number requests (rather than data.)
healAccountsInspectionBatch* = 10_000
## Number of nodes to inspect in a single batch. Several batches are run
## until at least `snapTrieNodeFetchMax` dangling nodes are found. In
## between batches, a tast/thread switch is allowed.
healAccountsBatchFetchMax* = 10 * 1024
## Keep on gloing in healing task up until this many nodes have been
## fetched from the network or some error contition terminates the task.

View File

@ -221,13 +221,13 @@ proc digestTo*(data: Blob; T: type NodeTag): T =
proc isEmpty*(lrs: NodeTagRangeSet): bool =
## Returns `true` if the argument set `lrs` of intervals is empty
lrs.total == 0 and lrs.chunks == 0
lrs.chunks == 0
proc isEmpty*(lrs: openArray[NodeTagRangeSet]): bool =
## Variant of `isEmpty()` where intervals are distributed across several
## sets.
for ivSet in lrs:
if 0 < ivSet.total or 0 < ivSet.chunks:
if 0 < ivSet.chunks:
return false
true
@ -276,24 +276,6 @@ proc fullFactor*(lrs: NodeTagRangeSet): float =
else:
1.0 # number of points in `lrs` is `2^256 + 1`
proc fullFactor*(lrs: openArray[NodeTagRangeSet]): float =
## Variant of `fullFactor()` where intervals are distributed across several
## sets. This function makes sense only if the interval sets are mutually
## disjunct.
var accu: NodeTag
for ivSet in lrs:
if 0 < ivSet.total:
if high(NodeTag) - ivSet.total < accu:
return 1.0
accu = accu + ivSet.total
elif ivSet.chunks == 0:
discard
else: # number of points in `ivSet` is `2^256 + 1`
return 1.0
if accu == 0.to(NodeTag):
return 0.0
accu.u256.to(float) / (2.0^256)
# ------------------------------------------------------------------------------
# Public functions: printing & pretty printing
# ------------------------------------------------------------------------------

View File

@ -15,10 +15,12 @@ import
eth/[common, p2p],
stew/[interval_set, keyed_queue],
../../db/select_backend,
".."/[misc/best_pivot, protocol, sync_desc],
../../utils/prettify,
../misc/best_pivot,
".."/[protocol, sync_desc],
./worker/[pivot_helper, ticker],
./worker/com/com_error,
./worker/db/[snapdb_check, snapdb_desc],
./worker/db/[hexary_desc, snapdb_check, snapdb_desc, snapdb_pivot],
"."/[constants, range_desc, worker_desc]
{.push raises: [Defect].}
@ -54,6 +56,54 @@ proc `pivot=`(buddy: SnapBuddyRef; val: BestPivotWorkerRef) =
# Private functions
# ------------------------------------------------------------------------------
proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} =
let recov = ctx.data.recovery
if recov.isNil:
return false
let
checkpoint =
"#" & $recov.state.header.blockNumber & "(" & $recov.level & ")"
topLevel = recov.level == 0
env = block:
let rc = ctx.data.pivotTable.eq recov.state.header.stateRoot
if rc.isErr:
error "Recovery pivot context gone", checkpoint, topLevel
return false
rc.value
# Cosmetics: allows other processes to log etc.
await sleepAsync(1300.milliseconds)
when extraTraceMessages:
trace "Recovery continued ...", checkpoint, topLevel,
nAccounts=recov.state.nAccounts, nDangling=recov.state.dangling.len
# Update pivot data from recovery checkpoint
env.recoverPivotFromCheckpoint(ctx, topLevel)
# Fetch next recovery record if there is any
if recov.state.predecessor.isZero:
trace "Recovery done", checkpoint, topLevel
return false
let rc = ctx.data.snapDb.recoverPivot(recov.state.predecessor)
if rc.isErr:
when extraTraceMessages:
trace "Recovery stopped at pivot stale checkpoint", checkpoint, topLevel
return false
# Set up next level pivot checkpoint
ctx.data.recovery = SnapRecoveryRef(
state: rc.value,
level: recov.level + 1)
# Push onto pivot table and continue recovery (i.e. do not stop it yet)
ctx.data.pivotTable.update(
ctx.data.recovery.state.header, ctx, reverse=true)
return true # continue recovery
proc updateSinglePivot(buddy: SnapBuddyRef): Future[bool] {.async.} =
## Helper, negotiate pivot unless present
if buddy.pivot.pivotHeader.isOk:
@ -106,7 +156,20 @@ proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool =
else:
trace "Ticker is disabled"
result = true
# Check for recovery mode
if not ctx.data.noRecovery:
let rc = ctx.data.snapDb.recoverPivot()
if rc.isOk:
ctx.data.recovery = SnapRecoveryRef(state: rc.value)
ctx.daemon = true
# Set up early initial pivot
ctx.data.pivotTable.update(ctx.data.recovery.state.header, ctx)
trace "Recovery started",
checkpoint=("#" & $ctx.data.pivotTable.topNumber() & "(0)")
if not ctx.data.ticker.isNil:
ctx.data.ticker.startRecovery()
true
proc release*(ctx: SnapCtxRef) =
## Global clean up
@ -147,11 +210,21 @@ proc stop*(buddy: SnapBuddyRef) =
proc runDaemon*(ctx: SnapCtxRef) {.async.} =
## Enabled while `ctx.daemon` is `true`
##
let nPivots = ctx.data.pivotTable.len
trace "I am the mighty recovery daemon ... stopped for now", nPivots
# To be populated ...
if not ctx.data.recovery.isNil:
if not await ctx.recoveryStepContinue():
# Done, stop recovery
ctx.data.recovery = nil
ctx.daemon = false
# Update logging
if not ctx.data.ticker.isNil:
ctx.data.ticker.stopRecovery()
return
# Update logging
if not ctx.data.ticker.isNil:
ctx.data.ticker.stopRecovery()
proc runSingle*(buddy: SnapBuddyRef) {.async.} =
## Enabled while
@ -159,9 +232,7 @@ proc runSingle*(buddy: SnapBuddyRef) {.async.} =
## * `buddy.ctrl.poolMode` is `false`
##
let peer = buddy.peer
# This pivot finder one harmonises assigned difficulties of at least two
# peers. There can only be one `pivot2Exec()` instance active/unfinished
# (which is wrapped into the helper function `updateSinglePivot()`.)
# Find pivot, probably relaxed mode enabled in `setup()`
if not await buddy.updateSinglePivot():
# Wait if needed, then return => repeat
if not buddy.ctrl.stopped:
@ -171,13 +242,14 @@ proc runSingle*(buddy: SnapBuddyRef) {.async.} =
buddy.ctrl.multiOk = true
proc runPool*(buddy: SnapBuddyRef, last: bool) =
proc runPool*(buddy: SnapBuddyRef, last: bool): bool =
## Enabled when `buddy.ctrl.poolMode` is `true`
##
let ctx = buddy.ctx
if ctx.poolMode:
ctx.poolMode = false
result = true
block:
let rc = ctx.data.pivotTable.lastValue
if rc.isOk:
@ -233,12 +305,6 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} =
# Full sync processsing based on current snapshot
# -----------------------------------------------
if env.storageDone:
if not buddy.checkAccountsTrieIsComplete(env):
error "Ooops, all accounts fetched but DvnB still incomplete", peer, pivot
if not buddy.checkStorageSlotsTrieIsComplete(env):
error "Ooops, all storages fetched but DB still incomplete", peer, pivot
trace "Snap full sync -- not implemented yet", peer, pivot
await sleepAsync(5.seconds)
return
@ -246,31 +312,30 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} =
# Snapshot sync processing
# ------------------------
# If this is a new pivot, the previous one can be partially cleaned up.
# There is no point in keeping some older space consuming state data any
# longer.
# If this is a new pivot, the previous one can be cleaned up. There is no
# point in keeping some older space consuming state data any longer.
ctx.data.pivotTable.beforeTopMostlyClean()
# This one is the syncing work horse
# This one is the syncing work horse which downloads the database
let syncActionContinue = await env.execSnapSyncAction(buddy)
# Save state so sync can be partially resumed at next start up
block:
let
nMissingNodes = env.fetchAccounts.missingNodes.len
nCheckNodes = env.fetchAccounts.checkNodes.len
nSickSubTries = env.fetchAccounts.sickSubTries.len
nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
rc = env.saveSnapState(ctx)
processed = env.fetchAccounts.processed.fullFactor.toPC(2)
block:
let rc = env.saveCheckpoint(ctx)
if rc.isErr:
error "Failed to save recovery checkpoint", peer, pivot,
nAccounts=env.nAccounts, nSlotLists=env.nSlotLists,
nMissingNodes, nStoQu, error=rc.error
processed, nStoQu, error=rc.error
else:
discard
when extraTraceMessages:
trace "Saved recovery checkpoint", peer, pivot,
nAccounts=env.nAccounts, nSlotLists=env.nSlotLists,
nMissingNodes, nStoQu, blobSize=rc.value
processed, nStoQu, blobSize=rc.value
if not syncActionContinue:
return

View File

@ -22,7 +22,7 @@ type
SlotsNotSrictlyIncreasing
TrieLoopAlert
TrieIsEmpty
TooManyDanglingLinks
TooManyProcessedChunks
TooManySlotAccounts
# import

View File

@ -507,13 +507,12 @@ proc getAccountsNodeKey*(
proc getAccountsNodeKey*(
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer; ## For log messages, only
root: Hash256; ## state root
path: Blob; ## Partial node path
): Result[NodeKey,HexaryDbError] =
## Variant of `getAccountsNodeKey()` for persistent storage.
SnapDbAccountsRef.init(
pv, root, peer).getAccountsNodeKey(path, persistent=true)
pv, root, Peer()).getAccountsNodeKey(path, persistent=true)
proc getAccountsData*(
@ -524,7 +523,6 @@ proc getAccountsData*(
## Fetch account data.
##
## Caveat: There is no unit test yet for the non-persistent version
let peer = ps.peer
var acc: Account
noRlpExceptionOops("getAccountData()"):
@ -542,13 +540,12 @@ proc getAccountsData*(
proc getAccountsData*(
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer; ## For log messages, only
root: Hash256; ## State root
path: NodeKey; ## Account to visit
): Result[Account,HexaryDbError] =
## Variant of `getAccountsData()` for persistent storage.
SnapDbAccountsRef.init(
pv, root, peer).getAccountsData(path, persistent=true)
pv, root, Peer()).getAccountsData(path, persistent=true)
# ------------------------------------------------------------------------------
# Public functions: additional helpers

View File

@ -48,7 +48,7 @@ proc accountsCtx(
("covered=" & env.fetchAccounts.unprocessed.emptyFactor.toPC(0) & "/" &
ctx.data.coveredAccounts.fullFactor.toPC(0)) & "," &
"nCheckNodes=" & $env.fetchAccounts.checkNodes.len & "," &
"nMissingNodes=" & $env.fetchAccounts.missingNodes.len & "}"
"nSickSubTries=" & $env.fetchAccounts.sickSubTries.len & "}"
# ------------------------------------------------------------------------------
# Private helpers
@ -76,7 +76,7 @@ proc storageSlotsCtx(
result &= "" &
"covered=" & slots.unprocessed.emptyFactor.toPC(0) &
"nCheckNodes=" & $slots.checkNodes.len & "," &
"nMissingNodes=" & $slots.missingNodes.len
"nSickSubTries=" & $slots.sickSubTries.len
result &= "}"
# ------------------------------------------------------------------------------

View File

@ -9,7 +9,6 @@
# except according to those terms.
import
#chronicles,
eth/[common, rlp],
stew/results,
../../range_desc,
@ -17,18 +16,16 @@ import
{.push raises: [Defect].}
#logScope:
# topics = "snap-db"
type
SnapDbPivotRegistry* = object
predecessor*: NodeKey ## Predecessor key in chain
predecessor*: NodeKey ## Predecessor key in chain, auto filled
header*: BlockHeader ## Pivot state, containg state root
nAccounts*: uint64 ## Imported # of accounts
nSlotLists*: uint64 ## Imported # of account storage tries
dangling*: seq[Blob] ## Dangling nodes in accounts trie
processed*: seq[
(NodeTag,NodeTag)] ## Processed acoount ranges
slotAccounts*: seq[NodeKey] ## List of accounts with storage slots
coverage*: uint8 ## coverage factor, 255 => 100%
const
extraTraceMessages = false or true
@ -58,24 +55,6 @@ proc savePivot*(
return ok(rlpData.len)
# notreached
proc savePivot*(
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
header: BlockHeader; ## Pivot state, containg state root
nAccounts: uint64; ## Imported # of accounts
nSlotLists: uint64; ## Imported # of account storage tries
dangling: seq[Blob]; ## Dangling nodes in accounts trie
slotAccounts: seq[NodeKey]; ## List of accounts with storage slots
coverage: uint8; ## coverage factor, 255 => 100%
): Result[int,HexaryDbError] =
## Variant of `savePivot()`
result = pv.savePivot SnapDbPivotRegistry(
header: header,
nAccounts: nAccounts,
nSlotLists: nSlotLists,
dangling: dangling,
slotAccounts: slotAccounts,
coverage: coverage)
proc recoverPivot*(
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
stateRoot: NodeKey; ## Check for a particular state root

View File

@ -47,7 +47,7 @@
##
## Legend:
## * `<..>`: some action, process, etc.
## * `{missing-nodes}`: list implemented as `env.fetchAccounts.missingNodes`
## * `{missing-nodes}`: list implemented as `env.fetchAccounts.sickSubTries`
## * `(state-root}`: implicit argument for `getAccountNodeKey()` when
## the argument list is empty
## * `{leaf-nodes}`: list is optimised out
@ -144,7 +144,7 @@ proc healingCtx(
("covered=" & env.fetchAccounts.unprocessed.emptyFactor.toPC(0) & "/" &
ctx.data.coveredAccounts.fullFactor.toPC(0)) & "," &
"nCheckNodes=" & $env.fetchAccounts.checkNodes.len & "," &
"nMissingNodes=" & $env.fetchAccounts.missingNodes.len & "}"
"nSickSubTries=" & $env.fetchAccounts.sickSubTries.len & "}"
# ------------------------------------------------------------------------------
# Private functions
@ -154,7 +154,7 @@ proc verifyStillMissingNodes(
buddy: SnapBuddyRef;
env: SnapPivotRef;
) =
## Check whether previously missing nodes from the `missingNodes` list
## Check whether previously missing nodes from the `sickSubTries` list
## have been magically added to the database since it was checked last
## time. These nodes will me moved to `checkNodes` for further processing.
let
@ -164,7 +164,7 @@ proc verifyStillMissingNodes(
stateRoot = env.stateHeader.stateRoot
var delayed: seq[NodeSpecs]
for w in env.fetchAccounts.missingNodes:
for w in env.fetchAccounts.sickSubTries:
if ctx.data.snapDb.nodeExists(peer, stateRoot, w):
# Check nodes for dangling links below
env.fetchAccounts.checkNodes.add w.partialPath
@ -173,7 +173,7 @@ proc verifyStillMissingNodes(
delayed.add w
# Must not modify sequence while looping over it
env.fetchAccounts.missingNodes = env.fetchAccounts.missingNodes & delayed
env.fetchAccounts.sickSubTries = env.fetchAccounts.sickSubTries & delayed
proc updateMissingNodesList(
@ -190,13 +190,13 @@ proc updateMissingNodesList(
peer = buddy.peer
stateRoot = env.stateHeader.stateRoot
while env.fetchAccounts.missingNodes.len < snapTrieNodesFetchMax:
while env.fetchAccounts.sickSubTries.len < snapRequestTrieNodesFetchMax:
# Inspect hexary trie for dangling nodes
let rc = db.inspectAccountsTrie(
peer, stateRoot,
env.fetchAccounts.checkNodes, # start with these nodes
env.fetchAccounts.resumeCtx, # resume previous attempt
healAccountsInspectionBatch) # visit no more than this many nodes
healInspectionBatch) # visit no more than this many nodes
if rc.isErr:
when extraTraceMessages:
error logTxt "failed => stop", peer,
@ -210,15 +210,15 @@ proc updateMissingNodesList(
env.fetchAccounts.checkNodes.setLen(0)
# Collect result
env.fetchAccounts.missingNodes =
env.fetchAccounts.missingNodes & rc.value.dangling
env.fetchAccounts.sickSubTries =
env.fetchAccounts.sickSubTries & rc.value.dangling
# Done unless there is some resumption context
if rc.value.resumeCtx.isNil:
break
# Allow async task switch and continue. Note that some other task might
# steal some of the `env.fetchAccounts.missingNodes`.
# steal some of the `env.fetchAccounts.sickSubTries`.
await sleepAsync 1.nanoseconds
return true
@ -229,7 +229,7 @@ proc getMissingNodesFromNetwork(
env: SnapPivotRef;
): Future[seq[NodeSpecs]]
{.async.} =
## Extract from `missingNodes` the next batch of nodes that need
## Extract from `sickSubTries` the next batch of nodes that need
## to be merged it into the database
let
ctx = buddy.ctx
@ -237,13 +237,13 @@ proc getMissingNodesFromNetwork(
stateRoot = env.stateHeader.stateRoot
pivot = "#" & $env.stateHeader.blockNumber # for logging
nMissingNodes = env.fetchAccounts.missingNodes.len
inxLeft = max(0, nMissingNodes - snapTrieNodesFetchMax)
nSickSubTries = env.fetchAccounts.sickSubTries.len
inxLeft = max(0, nSickSubTries - snapRequestTrieNodesFetchMax)
# There is no point in processing too many nodes at the same time. So leave
# the rest on the `missingNodes` queue to be handled later.
let fetchNodes = env.fetchAccounts.missingNodes[inxLeft ..< nMissingNodes]
env.fetchAccounts.missingNodes.setLen(inxLeft)
# the rest on the `sickSubTries` queue to be handled later.
let fetchNodes = env.fetchAccounts.sickSubTries[inxLeft ..< nSickSubTries]
env.fetchAccounts.sickSubTries.setLen(inxLeft)
# Initalise for `getTrieNodes()` for fetching nodes from the network
var
@ -253,8 +253,9 @@ proc getMissingNodesFromNetwork(
pathList.add @[w.partialPath]
nodeKey[w.partialPath] = w.nodeKey
# Fetch nodes from the network. Note that the remainder of the `missingNodes`
# list might be used by another process that runs semi-parallel.
# Fetch nodes from the network. Note that the remainder of the
# `sickSubTries` list might be used by another process that runs
# semi-parallel.
let rc = await buddy.getTrieNodes(stateRoot, pathList, pivot)
if rc.isOk:
# Reset error counts for detecting repeated timeouts, network errors, etc.
@ -262,7 +263,7 @@ proc getMissingNodesFromNetwork(
# Register unfetched missing nodes for the next pass
for w in rc.value.leftOver:
env.fetchAccounts.missingNodes.add NodeSpecs(
env.fetchAccounts.sickSubTries.add NodeSpecs(
partialPath: w[0],
nodeKey: nodeKey[w[0]])
return rc.value.nodes.mapIt(NodeSpecs(
@ -271,8 +272,8 @@ proc getMissingNodesFromNetwork(
data: it.data))
# Restore missing nodes list now so that a task switch in the error checker
# allows other processes to access the full `missingNodes` list.
env.fetchAccounts.missingNodes = env.fetchAccounts.missingNodes & fetchNodes
# allows other processes to access the full `sickSubTries` list.
env.fetchAccounts.sickSubTries = env.fetchAccounts.sickSubTries & fetchNodes
let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors):
@ -364,14 +365,14 @@ proc accountsHealingImpl(
buddy.verifyStillMissingNodes(env)
# If `checkNodes` is empty, healing is at the very start or was
# postponed in which case `missingNodes` is non-empty.
# postponed in which case `sickSubTries` is non-empty.
if env.fetchAccounts.checkNodes.len != 0 or
env.fetchAccounts.missingNodes.len == 0:
env.fetchAccounts.sickSubTries.len == 0:
if not await buddy.updateMissingNodesList(env):
return 0
# Check whether the trie is complete.
if env.fetchAccounts.missingNodes.len == 0:
if env.fetchAccounts.sickSubTries.len == 0:
trace logTxt "complete", peer, ctx=buddy.healingCtx(env)
return 0 # nothing to do
@ -386,7 +387,7 @@ proc accountsHealingImpl(
# Storage error, just run the next lap (not much else that can be done)
error logTxt "error updating persistent database", peer,
ctx=buddy.healingCtx(env), nNodes=nodeSpecs.len, error=report[^1].error
env.fetchAccounts.missingNodes = env.fetchAccounts.missingNodes & nodeSpecs
env.fetchAccounts.sickSubTries = env.fetchAccounts.sickSubTries & nodeSpecs
return -1
# Filter out error and leaf nodes
@ -399,7 +400,7 @@ proc accountsHealingImpl(
if w.error != NothingSerious or w.kind.isNone:
# error, try downloading again
env.fetchAccounts.missingNodes.add nodeSpecs[inx]
env.fetchAccounts.sickSubTries.add nodeSpecs[inx]
elif w.kind.unsafeGet != Leaf:
# re-check this node
@ -434,21 +435,6 @@ proc healAccounts*(
ctx = buddy.ctx
peer = buddy.peer
# Only start healing if there is some completion level, already.
#
# We check against the global coverage factor, i.e. a measure for how
# much of the total of all accounts have been processed. Even if the trie
# database for the current pivot state root is sparsely filled, there
# is a good chance that it can inherit some unchanged sub-trie from an
# earlier pivor state root download. The healing process then works like
# sort of glue.
#
if env.nAccounts == 0 or
ctx.data.coveredAccounts.fullFactor < healAccountsTrigger:
#when extraTraceMessages:
# trace logTxt "postponed", peer, ctx=buddy.healingCtx(env)
return
when extraTraceMessages:
trace logTxt "started", peer, ctx=buddy.healingCtx(env)

View File

@ -62,7 +62,7 @@ proc healingCtx(
"pivot=" & "#" & $env.stateHeader.blockNumber & "," &
"covered=" & slots.unprocessed.emptyFactor.toPC(0) & "," &
"nCheckNodes=" & $slots.checkNodes.len & "," &
"nMissingNodes=" & $slots.missingNodes.len & "}"
"nSickSubTries=" & $slots.sickSubTries.len & "}"
# ------------------------------------------------------------------------------
# Private functions
@ -98,7 +98,7 @@ proc verifyStillMissingNodes(
kvp: SnapSlotsQueuePair;
env: SnapPivotRef;
) =
## Check whether previously missing nodes from the `missingNodes` list
## Check whether previously missing nodes from the `sickSubTries` list
## have been magically added to the database since it was checked last
## time. These nodes will me moved to `checkNodes` for further processing.
let
@ -110,7 +110,7 @@ proc verifyStillMissingNodes(
slots = kvp.data.slots
var delayed: seq[NodeSpecs]
for w in slots.missingNodes:
for w in slots.sickSubTries:
let rc = db.getStorageSlotsNodeKey(peer, accKey, storageRoot, w.partialPath)
if rc.isOk:
# Check nodes for dangling links
@ -120,7 +120,7 @@ proc verifyStillMissingNodes(
delayed.add w
# Must not modify sequence while looping over it
slots.missingNodes = slots.missingNodes & delayed
slots.sickSubTries = slots.sickSubTries & delayed
proc updateMissingNodesList(
@ -140,7 +140,7 @@ proc updateMissingNodesList(
storageRoot = kvp.key
slots = kvp.data.slots
while slots.missingNodes.len < snapTrieNodesFetchMax:
while slots.sickSubTries.len < snapRequestTrieNodesFetchMax:
# Inspect hexary trie for dangling nodes
let rc = db.inspectStorageSlotsTrie(
peer, accKey, storageRoot,
@ -161,15 +161,14 @@ proc updateMissingNodesList(
slots.checkNodes.setLen(0)
# Collect result
slots.missingNodes =
slots.missingNodes & rc.value.dangling
slots.sickSubTries = slots.sickSubTries & rc.value.dangling
# Done unless there is some resumption context
if rc.value.resumeCtx.isNil:
break
# Allow async task switch and continue. Note that some other task might
# steal some of the `env.fetchAccounts.missingNodes`.
# steal some of the `env.fetchAccounts.sickSubTries`.
await sleepAsync 1.nanoseconds
return true
@ -181,7 +180,7 @@ proc getMissingNodesFromNetwork(
env: SnapPivotRef;
): Future[seq[NodeSpecs]]
{.async.} =
## Extract from `missingNodes` the next batch of nodes that need
## Extract from `sickSubTries` the next batch of nodes that need
## to be merged it into the database
let
ctx = buddy.ctx
@ -191,13 +190,13 @@ proc getMissingNodesFromNetwork(
pivot = "#" & $env.stateHeader.blockNumber # for logging
slots = kvp.data.slots
nMissingNodes = slots.missingNodes.len
inxLeft = max(0, nMissingNodes - snapTrieNodesFetchMax)
nSickSubTries = slots.sickSubTries.len
inxLeft = max(0, nSickSubTries - snapRequestTrieNodesFetchMax)
# There is no point in processing too many nodes at the same time. So leave
# the rest on the `missingNodes` queue to be handled later.
let fetchNodes = slots.missingNodes[inxLeft ..< nMissingNodes]
slots.missingNodes.setLen(inxLeft)
# the rest on the `sickSubTries` queue to be handled later.
let fetchNodes = slots.sickSubTries[inxLeft ..< nSickSubTries]
slots.sickSubTries.setLen(inxLeft)
# Initalise for `getTrieNodes()` for fetching nodes from the network
var
@ -207,7 +206,7 @@ proc getMissingNodesFromNetwork(
pathList.add @[w.partialPath]
nodeKey[w.partialPath] = w.nodeKey
# Fetch nodes from the network. Note that the remainder of the `missingNodes`
# Fetch nodes from the network. Note that the remainder of the `sickSubTries`
# list might be used by another process that runs semi-parallel.
let
req = @[accKey.to(Blob)] & fetchNodes.mapIt(it.partialPath)
@ -219,7 +218,7 @@ proc getMissingNodesFromNetwork(
# Register unfetched missing nodes for the next pass
for w in rc.value.leftOver:
for n in 1 ..< w.len:
slots.missingNodes.add NodeSpecs(
slots.sickSubTries.add NodeSpecs(
partialPath: w[n],
nodeKey: nodeKey[w[n]])
return rc.value.nodes.mapIt(NodeSpecs(
@ -228,8 +227,8 @@ proc getMissingNodesFromNetwork(
data: it.data))
# Restore missing nodes list now so that a task switch in the error checker
# allows other processes to access the full `missingNodes` list.
slots.missingNodes = slots.missingNodes & fetchNodes
# allows other processes to access the full `sickSubTries` list.
slots.sickSubTries = slots.sickSubTries & fetchNodes
let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors):
@ -384,7 +383,7 @@ proc storageSlotsHealing(
return false
# Check whether the trie is complete.
if slots.missingNodes.len == 0:
if slots.sickSubTries.len == 0:
trace logTxt "complete", peer, itCtx=buddy.healingCtx(kvp,env)
return true
@ -401,7 +400,7 @@ proc storageSlotsHealing(
error logTxt "error updating persistent database", peer,
itCtx=buddy.healingCtx(kvp,env), nSlotLists=env.nSlotLists,
nStorageQueue, nNodes=nodeSpecs.len, error=report[^1].error
slots.missingNodes = slots.missingNodes & nodeSpecs
slots.sickSubTries = slots.sickSubTries & nodeSpecs
return false
when extraTraceMessages:
@ -421,7 +420,7 @@ proc storageSlotsHealing(
if w.error != NothingSerious or w.kind.isNone:
# error, try downloading again
slots.missingNodes.add nodeSpecs[inx]
slots.sickSubTries.add nodeSpecs[inx]
elif w.kind.unsafeGet != Leaf:
# re-check this node
@ -478,7 +477,7 @@ proc healingIsComplete(
return true # done
# Full range covered by unprocessed items
kvp.data.slots = SnapTrieRangeBatchRef(missingNodes: rc.value.dangling)
kvp.data.slots = SnapRangeBatchRef(sickSubTries: rc.value.dangling)
kvp.data.slots.unprocessed.init()
# Proceed with healing

View File

@ -10,12 +10,13 @@
import
std/[math, sequtils],
bearssl/rand,
chronos,
eth/[common, p2p],
eth/[common, trie/trie_defs],
stew/[interval_set, keyed_queue],
../../sync_desc,
".."/[constants, range_desc, worker_desc],
./db/[hexary_error, snapdb_pivot],
./db/[hexary_error, snapdb_accounts, snapdb_pivot],
"."/[heal_accounts, heal_storage_slots,
range_fetch_accounts, range_fetch_storage_slots, ticker]
@ -29,10 +30,11 @@ const
# Private helpers
# ------------------------------------------------------------------------------
proc init(batch: var SnapTrieRangeBatch; ctx: SnapCtxRef) =
proc init(batch: SnapRangeBatchRef; ctx: SnapCtxRef) =
## Returns a pair of account hash range lists with the full range of hashes
## smartly spread across the mutually disjunct interval sets.
batch.unprocessed.init()
batch.processed = NodeTagRangeSet.init()
# Initialise accounts range fetch batch, the pair of `fetchAccounts[]`
# range sets.
@ -59,13 +61,7 @@ proc init(batch: var SnapTrieRangeBatch; ctx: SnapCtxRef) =
discard batch.unprocessed[1].merge iv
when extraAsserts:
if batch.unprocessed[0].isEmpty:
doAssert batch.unprocessed[1].isFull
elif batch.unprocessed[1].isEmpty:
doAssert batch.unprocessed[0].isFull
else:
doAssert((batch.unprocessed[0].total - 1) +
batch.unprocessed[1].total == high(UInt256))
doAssert batch.unprocessed.verify
# ------------------------------------------------------------------------------
# Public functions: pivot table related
@ -81,12 +77,12 @@ proc beforeTopMostlyClean*(pivotTable: var SnapPivotTable) =
env.fetchStorageFull.clear()
env.fetchStoragePart.clear()
env.fetchAccounts.checkNodes.setLen(0)
env.fetchAccounts.missingNodes.setLen(0)
env.fetchAccounts.sickSubTries.setLen(0)
env.obsolete = true
proc topNumber*(pivotTable: var SnapPivotTable): BlockNumber =
## Return the block number op the top pivot entry, or zero if there is none.
## Return the block number of the top pivot entry, or zero if there is none.
let rc = pivotTable.lastValue
if rc.isOk:
return rc.value.stateHeader.blockNumber
@ -96,6 +92,7 @@ proc update*(
pivotTable: var SnapPivotTable; ## Pivot table
header: BlockHeader; ## Header to generate new pivot from
ctx: SnapCtxRef; ## Some global context
reverse = false; ## Update from bottom (e.g. for recovery)
) =
## Activate environment for state root implied by `header` argument. This
## function appends a new environment unless there was any not far enough
@ -106,13 +103,27 @@ proc update*(
##
# Check whether the new header follows minimum depth requirement. This is
# where the queue is assumed to have increasing block numbers.
if pivotTable.topNumber() + pivotBlockDistanceMin < header.blockNumber:
if reverse or
pivotTable.topNumber() + pivotBlockDistanceMin < header.blockNumber:
# Ok, append a new environment
let env = SnapPivotRef(stateHeader: header)
let env = SnapPivotRef(
stateHeader: header,
fetchAccounts: SnapRangeBatchRef())
env.fetchAccounts.init(ctx)
var topEnv = env
# Append per-state root environment to LRU queue
if reverse:
discard pivotTable.prepend(header.stateRoot, env)
# Make sure that the LRU table does not grow too big.
if max(3, ctx.buddiesMax) < pivotTable.len:
# Delete second entry rather thanthe first which might currently
# be needed.
let rc = pivotTable.secondKey
if rc.isOk:
pivotTable.del rc.value
else:
discard pivotTable.lruAppend(header.stateRoot, env, ctx.buddiesMax)
@ -142,7 +153,7 @@ proc tickerStats*(
aSqSum += aLen * aLen
# Fill utilisation mean & variance
let fill = kvp.data.fetchAccounts.unprocessed.emptyFactor
let fill = kvp.data.fetchAccounts.processed.fullFactor
uSum += fill
uSqSum += fill * fill
@ -161,9 +172,9 @@ proc tickerStats*(
if not env.isNil:
pivotBlock = some(env.stateHeader.blockNumber)
stoQuLen = some(env.fetchStorageFull.len + env.fetchStoragePart.len)
accStats = (env.fetchAccounts.unprocessed[0].chunks +
env.fetchAccounts.unprocessed[1].chunks,
env.fetchAccounts.missingNodes.len)
accStats = (env.fetchAccounts.processed.chunks,
env.fetchAccounts.checkNodes.len +
env.fetchAccounts.sickSubTries.len)
TickerStats(
pivotBlock: pivotBlock,
@ -205,6 +216,20 @@ proc execSnapSyncAction*(
if buddy.ctrl.stopped or env.obsolete:
return false
if not ctx.data.accountsHealing:
# Only start healing if there is some completion level, already.
#
# We check against the global coverage factor, i.e. a measure for how
# much of the total of all accounts have been processed. Even if the
# hexary trie database for the current pivot state root is sparsely
# filled, there is a good chance that it can inherit some unchanged
# sub-trie from an earlier pivor state root download. The healing
# process then works like sort of glue.
if 0 < env.nAccounts:
if healAccountsTrigger <= ctx.data.coveredAccounts.fullFactor:
ctx.data.accountsHealing = true
if ctx.data.accountsHealing:
# Can only run a single accounts healer instance at a time. This
# instance will clear the batch queue so there is nothing to do for
# another process.
@ -228,31 +253,74 @@ proc execSnapSyncAction*(
return true
proc saveSnapState*(
proc saveCheckpoint*(
env: SnapPivotRef; ## Current pivot environment
ctx: SnapCtxRef; ## Some global context
): Result[int,HexaryDbError] =
## Save current sync admin data. On success, the size of the data record
## saved is returned (e.g. for logging.)
if snapAccountsSaveDanglingMax < env.fetchAccounts.missingNodes.len:
return err(TooManyDanglingLinks)
##
let
fa = env.fetchAccounts
nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
if snapAccountsSaveProcessedChunksMax < fa.processed.chunks:
return err(TooManyProcessedChunks)
let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
if snapAccountsSaveStorageSlotsMax < nStoQu:
return err(TooManySlotAccounts)
let
rc = ctx.data.snapDb.savePivot(
env.stateHeader, env.nAccounts, env.nSlotLists,
dangling = env.fetchAccounts.missingNodes.mapIt(it.partialPath),
slotAccounts = toSeq(env.fetchStorageFull.nextKeys).mapIt(it.to(NodeKey)) &
toSeq(env.fetchStoragePart.nextKeys).mapIt(it.to(NodeKey)),
coverage = (ctx.data.coveredAccounts.fullFactor * 255).uint8)
ctx.data.snapDb.savePivot SnapDbPivotRegistry(
header: env.stateHeader,
nAccounts: env.nAccounts,
nSlotLists: env.nSlotLists,
processed: toSeq(env.fetchAccounts.processed.increasing)
.mapIt((it.minPt,it.maxPt)),
slotAccounts: (toSeq(env.fetchStorageFull.nextKeys) &
toSeq(env.fetchStoragePart.nextKeys)).mapIt(it.to(NodeKey)))
proc recoverPivotFromCheckpoint*(
env: SnapPivotRef; ## Current pivot environment
ctx: SnapCtxRef; ## Global context (containing save state)
topLevel: bool; ## Full data set on top level only
) =
## Recover some pivot variables and global list `coveredAccounts` from
## checkpoint data. If the argument `toplevel` is set `true`, also the
## `processed`, `unprocessed`, and the `fetchStorageFull` lists are
## initialised.
##
let recov = ctx.data.recovery
if recov.isNil:
return
env.nAccounts = recov.state.nAccounts
env.nSlotLists = recov.state.nSlotLists
# Import processed interval
for (minPt,maxPt) in recov.state.processed:
if topLevel:
discard env.fetchAccounts.processed.merge(minPt, maxPt)
env.fetchAccounts.unprocessed.reduce(minPt, maxPt)
discard ctx.data.coveredAccounts.merge(minPt, maxPt)
# Handle storage slots
if topLevel:
let stateRoot = recov.state.header.stateRoot
for w in recov.state.slotAccounts:
let pt = NodeTagRange.new(w.to(NodeTag),w.to(NodeTag))
if 0 < env.fetchAccounts.processed.covered(pt):
# Ignoring slots that have accounts to be downloaded, anyway
let rc = ctx.data.snapDb.getAccountsData(stateRoot, w)
if rc.isErr:
return err(rc.error)
ok(rc.value)
# Oops, how did that account get lost?
discard env.fetchAccounts.processed.reduce pt
env.fetchAccounts.unprocessed.merge pt
elif rc.value.storageRoot != emptyRlpHash:
env.fetchStorageFull.merge AccountSlotsHeader(
accKey: w,
storageRoot: rc.value.storageRoot)
# ------------------------------------------------------------------------------
# End

View File

@ -161,23 +161,31 @@ proc accountsRangefetchImpl(
# Statistics
env.nAccounts.inc(gotAccounts)
# Punch holes into the reproted range from the network if it contains holes.
# Punch holes into the reported range of received accounts from the network
# if it there are gaps (described by dangling nodes.)
for w in gaps.innerGaps:
discard processed.reduce(
w.partialPath.min(NodeKey).to(NodeTag),
w.partialPath.max(NodeKey).to(Nodetag))
# Update dangling nodes list
# Update dangling nodes list unless healing is activated. The problem
# with healing activated is, that a previously missing node that suddenly
# appears will not automatically translate into a full sub-trie. It might
# just be the node itself (which justified the name `sickSubTrie`).
#
# Instead of managing partial sub-tries here, this is delegated to the
# healing module.
if not ctx.data.accountsHealing:
var delayed: seq[NodeSpecs]
for w in env.fetchAccounts.missingNodes:
for w in env.fetchAccounts.sickSubTries:
if not ctx.data.snapDb.nodeExists(peer, stateRoot, w):
delayed.add w
when extraTraceMessages:
trace logTxt "dangling nodes", peer, pivot,
nCheckNodes=env.fetchAccounts.checkNodes.len,
nMissingNodes=env.fetchAccounts.missingNodes.len,
nSickSubTries=env.fetchAccounts.sickSubTries.len,
nUpdatedMissing=delayed.len, nOutsideGaps=gaps.dangling.len
env.fetchAccounts.missingNodes = delayed & gaps.dangling
env.fetchAccounts.sickSubTries = delayed & gaps.dangling
# Update book keeping
for w in processed.increasing:
@ -195,7 +203,7 @@ proc accountsRangefetchImpl(
# unprocessed = buddy.dumpUnprocessed(env)
# trace logTxt "request done", peer, pivot,
# nCheckNodes=env.fetchAccounts.checkNodes.len,
# nMissingNodes=env.fetchAccounts.missingNodes.len,
# nSickSubTries=env.fetchAccounts.sickSubTries.len,
# imported, unprocessed
return true

View File

@ -39,6 +39,7 @@ type
TickerRef* = ref object
## Account fetching state that is shared among all peers.
nBuddies: int
recovery: bool
lastStats: TickerStats
statsCb: TickerStatsUpdater
logTicker: TimerCallback
@ -145,6 +146,10 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
if data.nStorageQueue.isSome:
nStoQue = $data.nStorageQueue.unsafeGet
if t.recovery:
info "Snap sync statistics (recovery)",
up, buddies, pivot, nAcc, accCov, nSto, nStoQue, mem
else:
info "Snap sync statistics",
up, buddies, pivot, nAcc, accCov, nSto, nStoQue, mem
@ -183,15 +188,29 @@ proc startBuddy*(t: TickerRef) =
## Increment buddies counter and start ticker unless running.
if t.nBuddies <= 0:
t.nBuddies = 1
if not t.recovery:
t.start()
else:
t.nBuddies.inc
proc startRecovery*(t: TickerRef) =
## Ditto for recovery mode
if not t.recovery:
t.recovery = true
if t.nBuddies <= 0:
t.start()
proc stopBuddy*(t: TickerRef) =
## Decrement buddies counter and stop ticker if there are no more registered
## buddies.
t.nBuddies.dec
if t.nBuddies <= 0:
if t.nBuddies <= 0 and not t.recovery:
t.stop()
proc stopRecovery*(t: TickerRef) =
## Ditto for recovery mode
if t.recovery:
t.recovery = false
t.stop()
# ------------------------------------------------------------------------------

View File

@ -15,7 +15,7 @@ import
../../db/select_backend,
../sync_desc,
./worker/com/com_error,
./worker/db/[hexary_desc, snapdb_desc],
./worker/db/[hexary_desc, snapdb_desc, snapdb_pivot],
./worker/ticker,
./range_desc
@ -37,25 +37,24 @@ type
## where the optional `subRange` interval has been replaced by an interval
## range + healing support.
accKey*: NodeKey ## Owner account
slots*: SnapTrieRangeBatchRef ## slots to fetch, nil => all slots
slots*: SnapRangeBatchRef ## slots to fetch, nil => all slots
inherit*: bool ## mark this trie seen already
SnapTodoRanges* = array[2,NodeTagRangeSet]
## Pair of node range lists. The first entry must be processed first. This
## allows to coordinate peers working on different state roots to avoid
## ovelapping accounts as long as they fetch from the first entry.
## Pair of sets of ``unprocessed`` node ranges that need to be fetched and
## integrated. The ranges in the first set must be handled with priority.
##
## This data structure is used for coordinating peers that run quasi
## parallel.
SnapTrieRangeBatch* = object
SnapRangeBatchRef* = ref object
## `NodeTag` ranges to fetch, healing support
unprocessed*: SnapTodoRanges ## Range of slots not covered, yet
unprocessed*: SnapTodoRanges ## Range of slots to be fetched
processed*: NodeTagRangeSet ## Nodes definitely processed
checkNodes*: seq[Blob] ## Nodes with prob. dangling child links
missingNodes*: seq[NodeSpecs] ## Dangling links to fetch and merge
sickSubTries*: seq[NodeSpecs] ## Top ref for sub-tries to be healed
resumeCtx*: TrieNodeStatCtxRef ## State for resuming trie inpection
SnapTrieRangeBatchRef* = ref SnapTrieRangeBatch
## Referenced object, so it can be made optional for the storage
## batch list
SnapHealingState* = enum
## State of healing process. The `HealerRunning` state indicates that
## dangling and/or missing nodes have been temprarily removed from the
@ -69,8 +68,9 @@ type
stateHeader*: BlockHeader ## Pivot state, containg state root
# Accounts download
fetchAccounts*: SnapTrieRangeBatch ## Set of accounts ranges to fetch
fetchAccounts*: SnapRangeBatchRef ## Set of accounts ranges to fetch
accountsState*: SnapHealingState ## All accounts have been processed
healThresh*: float ## Start healing when fill factor reached
# Storage slots download
fetchStorageFull*: SnapSlotsQueue ## Fetch storage trie for these accounts
@ -86,6 +86,11 @@ type
## LRU table, indexed by state root
KeyedQueue[Hash256,SnapPivotRef]
SnapRecoveryRef* = ref object
## Recovery context
state*: SnapDbPivotRegistry ## Saved recovery context state
level*: int ## top level is zero
BuddyData* = object
## Per-worker local descriptor data extension
errors*: ComErrorStatsRef ## For error handling
@ -102,6 +107,9 @@ type
pivotTable*: SnapPivotTable ## Per state root environment
pivotFinderCtx*: RootRef ## Opaque object reference for sub-module
coveredAccounts*: NodeTagRangeSet ## Derived from all available accounts
accountsHealing*: bool ## Activates accounts DB healing
recovery*: SnapRecoveryRef ## Current recovery checkpoint/context
noRecovery*: bool ## Ignore recovery checkpoints
# Info
ticker*: TickerRef ## Ticker, logger
@ -142,7 +150,7 @@ proc init*(q: var SnapTodoRanges) =
proc merge*(q: var SnapTodoRanges; iv: NodeTagRange) =
## Unconditionally merge the node range into the account ranges list
## Unconditionally merge the node range into the account ranges list.
discard q[0].merge(iv)
discard q[1].reduce(iv)
@ -187,6 +195,21 @@ proc fetch*(q: var SnapTodoRanges; maxLen: UInt256): Result[NodeTagRange,void] =
discard q[0].reduce(iv)
ok(iv)
proc verify*(q: var SnapTodoRanges): bool =
## Verify consistency, i.e. that the two sets of ranges have no overlap.
if q[0].chunks == 0 or q[1].chunks == 0:
# At least on set is empty
return true
if q[0].total == 0 or q[1].total == 0:
# At least one set is maximal and the other non-empty
return false
let (a,b) = if q[0].chunks < q[1].chunks: (0,1) else: (1,0)
for iv in q[a].increasing:
if 0 < q[b].covered(iv):
return false
true
# ------------------------------------------------------------------------------
# Public helpers: SlotsQueue
# ------------------------------------------------------------------------------
@ -196,7 +219,10 @@ proc merge*(q: var SnapSlotsQueue; kvp: SnapSlotsQueuePair) =
let
reqKey = kvp.key
rc = q.eq(reqKey)
if rc.isOk:
if rc.isErr:
# Append to list
discard q.append(reqKey, kvp.data)
else:
# Entry exists already
let qData = rc.value
if not qData.slots.isNil:
@ -204,23 +230,17 @@ proc merge*(q: var SnapSlotsQueue; kvp: SnapSlotsQueuePair) =
if kvp.data.slots.isNil:
# Remove restriction for this entry and move it to the right end
qData.slots = nil
discard q.lruFetch(reqKey)
discard q.lruFetch reqKey
else:
# Merge argument intervals into target set
for ivSet in kvp.data.slots.unprocessed:
for iv in ivSet.increasing:
qData.slots.unprocessed.reduce iv
else:
# Only add non-existing entries
if kvp.data.slots.isNil:
# Append full range to the right of the list
discard q.append(reqKey, kvp.data)
else:
# Partial range, add healing support and interval
discard q.unshift(reqKey, kvp.data)
proc merge*(q: var SnapSlotsQueue; fetchReq: AccountSlotsHeader) =
## Append/prepend a slot header record into the batch queue.
## Append/prepend a slot header record into the batch queue. If there is
## a range merger, the argument range will be sortred in a way so that it
## is processed separately with highest priority.
let
reqKey = fetchReq.storageRoot
rc = q.eq(reqKey)
@ -232,26 +252,31 @@ proc merge*(q: var SnapSlotsQueue; fetchReq: AccountSlotsHeader) =
if fetchReq.subRange.isNone:
# Remove restriction for this entry and move it to the right end
qData.slots = nil
discard q.lruFetch(reqKey)
discard q.lruFetch reqKey
else:
# Merge argument interval into target set
let iv = fetchReq.subRange.unsafeGet
discard qData.slots.unprocessed[0].reduce(iv)
discard qData.slots.unprocessed[1].merge(iv)
else:
let reqData = SnapSlotsQueueItemRef(accKey: fetchReq.accKey)
# Merge argument interval into target separated from the already
# existing sets (note that this works only for the last set)
for iv in qData.slots.unprocessed[0].increasing:
# Move all to second set
discard qData.slots.unprocessed[1].merge iv
# Clear first set and add argument range
qData.slots.unprocessed[0].clear()
qData.slots.unprocessed.merge fetchReq.subRange.unsafeGet
elif fetchReq.subRange.isNone:
# Append full range to the list
discard q.append(reqKey, SnapSlotsQueueItemRef(
accKey: fetchReq.accKey))
# Only add non-existing entries
if fetchReq.subRange.isNone:
# Append full range to the right of the list
discard q.append(reqKey, reqData)
else:
# Partial range, add healing support and interval
reqData.slots = SnapTrieRangeBatchRef()
for n in 0 ..< reqData.slots.unprocessed.len:
reqData.slots.unprocessed[n] = NodeTagRangeSet.init()
discard reqData.slots.unprocessed[0].merge(fetchReq.subRange.unsafeGet)
discard q.unshift(reqKey, reqData)
var unprocessed = [NodeTagRangeSet.init(), NodeTagRangeSet.init()]
discard unprocessed[0].merge(fetchReq.subRange.unsafeGet)
discard q.append(reqKey, SnapSlotsQueueItemRef(
accKey: fetchReq.accKey,
slots: SnapRangeBatchRef(
unprocessed: unprocessed,
processed: NodeTagRangeSet.init())))
proc merge*(
q: var SnapSlotsQueue;

View File

@ -17,15 +17,18 @@
## Global set up. This function will be called before any worker peer is
## started. If that function returns `false`, no worker peers will be run.
##
## Also, this function should decide whether the `runDaemon()` job will be
## started next by controlling the `ctx.daemon` flag (default is `false`.)
##
## *runRelease(ctx: CtxRef[S])*
## Global clean up, done with all the worker peers.
##
## *runDaemon(ctx: CtxRef[S]) {.async.}*
## Global background job that will be re-started as long as the variable
## `ctx.daemon` is set `true`. If that job was stopped due to setting
## `ctx.daemon` to `false`, it will be restarted when reset to `true` when
## there is some activity on the `runPool()`, `runSingle()`, or `runMulti()`
## functions.
## `ctx.daemon` is set `true`. If that job was stopped due to re-setting
## `ctx.daemon` to `false`, it will be restarted next after it was reset
## as `true` not before there is some activity on the `runPool()`,
## `runSingle()`, or `runMulti()` functions.
##
##
## *runStart(buddy: BuddyRef[S,W]): bool*
@ -35,10 +38,11 @@
## Clean up this worker peer.
##
##
## *runPool(buddy: BuddyRef[S,W], last: bool)*
## *runPool(buddy: BuddyRef[S,W], last: bool): bool*
## Once started, the function `runPool()` is called for all worker peers in
## sequence as the body of an iteration. There will be no other worker peer
## functions activated simultaneously.
## sequence as the body of an iteration as long as the function returns
## `false`. There will be no other worker peer functions activated
## simultaneously.
##
## This procedure is started if the global flag `buddy.ctx.poolMode` is set
## `true` (default is `false`.) It is the responsibility of the `runPool()`
@ -48,7 +52,7 @@
## The argument `last` is set `true` if the last entry is reached.
##
## Note:
## + This function does not run in `async` mode.
## + This function does *not* runs in `async` mode.
## + The flag `buddy.ctx.poolMode` has priority over the flag
## `buddy.ctrl.multiOk` which controls `runSingle()` and `runMulti()`.
##
@ -110,6 +114,7 @@ type
singleRunLock: bool ## Some single mode runner is activated
monitorLock: bool ## Monitor mode is activated
activeMulti: int ## Number of activated runners in multi-mode
shutdown: bool ## Internal shut down flag
RunnerBuddyRef[S,W] = ref object
## Per worker peer descriptor
@ -144,7 +149,7 @@ proc hash(peer: Peer): Hash =
proc daemonLoop[S,W](dsc: RunnerSyncRef[S,W]) {.async.} =
mixin runDaemon
if dsc.ctx.daemon:
if dsc.ctx.daemon and not dsc.shutdown:
dsc.daemonRunning = true
# Continue until stopped
@ -179,7 +184,7 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} =
# Continue until stopped
block taskExecLoop:
while worker.ctrl.running:
while worker.ctrl.running and not dsc.shutdown:
# Enforce minimum time spend on this loop
let startMoment = Moment.now()
@ -199,7 +204,8 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} =
var count = dsc.buddies.len
for w in dsc.buddies.nextValues:
count.dec
worker.runPool(count == 0)
if worker.runPool(count == 0):
break # `true` => stop
dsc.monitorLock = false
else:
@ -335,7 +341,7 @@ proc initSync*[S,W](
dsc.tickerOk = noisy
dsc.buddies.init(dsc.ctx.buddiesMax)
proc startSync*[S,W](dsc: RunnerSyncRef[S,W]; daemon = false): bool =
proc startSync*[S,W](dsc: RunnerSyncRef[S,W]): bool =
## Set up `PeerObserver` handlers and start syncing.
mixin runSetup
# Initialise sub-systems
@ -350,8 +356,7 @@ proc startSync*[S,W](dsc: RunnerSyncRef[S,W]; daemon = false): bool =
po.setProtocol eth
dsc.pool.addObserver(dsc, po)
if daemon:
dsc.ctx.daemon = true
if dsc.ctx.daemon:
asyncSpawn dsc.daemonLoop()
return true
@ -360,7 +365,8 @@ proc stopSync*[S,W](dsc: RunnerSyncRef[S,W]) =
mixin runRelease
dsc.pool.delObserver(dsc)
# Shut down async services
# Gracefully shut down async services
dsc.shutdown = true
for buddy in dsc.buddies.nextValues:
buddy.worker.ctrl.stopped = true
dsc.ctx.daemon = false

View File

@ -15,10 +15,11 @@ import
std/[algorithm, distros, hashes, math, os, sets,
sequtils, strformat, strutils, tables, times],
chronicles,
eth/[common, p2p, rlp, trie/db],
eth/[common, p2p, rlp],
eth/trie/[db, nibbles],
rocksdb,
stint,
stew/[byteutils, results],
stew/[byteutils, interval_set, results],
unittest2,
../nimbus/[chain_config, config, genesis],
../nimbus/db/[db_chain, select_backend, storage_types],
@ -26,7 +27,7 @@ import
../nimbus/sync/types,
../nimbus/sync/snap/range_desc,
../nimbus/sync/snap/worker/db/[
hexary_desc, hexary_error, hexary_inspect, rocky_bulk_load,
hexary_desc, hexary_error, hexary_inspect, hexary_paths, rocky_bulk_load,
snapdb_accounts, snapdb_desc, snapdb_pivot, snapdb_storage_slots],
../nimbus/utils/prettify,
./replay/[pp, undump_blocks, undump_accounts, undump_storages],
@ -301,6 +302,7 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) =
var
desc: SnapDbAccountsRef
accKeys: seq[NodeKey]
accBaseTag: NodeTag
test &"Snap-proofing {accountsList.len} items for state root ..{root.pp}":
let
@ -316,14 +318,13 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) =
desc = SnapDbAccountsRef.init(dbBase, root, peer)
# Load/accumulate data from several samples (needs some particular sort)
let
lowerBound = accountsList.mapIt(it.base).sortMerge
packed = PackedAccountRange(
accBaseTag = accountsList.mapIt(it.base).sortMerge
let packed = PackedAccountRange(
accounts: accountsList.mapIt(it.data.accounts).sortMerge,
proof: accountsList.mapIt(it.data.proof).flatten)
# Merging intervals will produce gaps, so the result is expected OK but
# different from `.isImportOk`
check desc.importAccounts(lowerBound, packed, true).isOk
check desc.importAccounts(accBaseTag, packed, true).isOk
# check desc.merge(lowerBound, accounts) == OkHexDb
desc.assignPrettyKeys() # for debugging, make sure that state root ~ "$0"
@ -406,7 +407,8 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) =
# added later (typically these nodes are update `Mutable` nodes.)
#
# Beware: dumping a large database is not recommended
#noisy.say "***", "database dump\n ", desc.dumpAccDB()
#true.say "***", "database dump\n ", desc.dumpHexaDB()
test &"Storing/retrieving {accKeys.len} items " &
"on persistent state root registry":
@ -415,13 +417,18 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) =
else:
let
dbBase = SnapDbRef.init(db.cdb[0])
dangling = @[@[1u8],@[2u8,3u8],@[4u8,5u8,6u8],@[7u8,8u8,9u8,0u8]]
processed = @[(1.to(NodeTag),2.to(NodeTag)),
(4.to(NodeTag),5.to(NodeTag)),
(6.to(NodeTag),7.to(NodeTag))]
slotAccounts = seq[NodeKey].default
for n,w in accKeys:
check dbBase.savePivot(
BlockHeader(
stateRoot: w.to(Hash256)), n.uint64, n.uint64,
dangling, slotAccounts, 0).isOk
SnapDbPivotRegistry(
header: BlockHeader(stateRoot: w.to(Hash256)),
nAccounts: n.uint64,
nSlotLists: n.uint64,
processed: processed,
slotAccounts: slotAccounts)).isOk
# verify latest state root
block:
let rc = dbBase.recoverPivot()
@ -429,7 +436,7 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) =
if rc.isOk:
check rc.value.nAccounts == n.uint64
check rc.value.nSlotLists == n.uint64
check rc.value.dangling == dangling
check rc.value.processed == processed
for n,w in accKeys:
block:
let rc = dbBase.recoverPivot(w)
@ -439,15 +446,19 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) =
check rc.value.nSlotLists == n.uint64
# Update record in place
check dbBase.savePivot(
BlockHeader(
stateRoot: w.to(Hash256)), n.uint64, 0, @[], @[], 0).isOk
SnapDbPivotRegistry(
header: BlockHeader(stateRoot: w.to(Hash256)),
nAccounts: n.uint64,
nSlotLists: 0,
processed: @[],
slotAccounts: @[])).isOk
block:
let rc = dbBase.recoverPivot(w)
check rc.isOk
if rc.isOk:
check rc.value.nAccounts == n.uint64
check rc.value.nSlotLists == 0
check rc.value.dangling == seq[Blob].default
check rc.value.processed == seq[(NodeTag,NodeTag)].default
proc storagesRunner(
noisy = true;
@ -600,6 +611,7 @@ proc inspectionRunner(
rootKey = root.to(NodeKey)
dbBase = SnapDbRef.init(db.cdb[2+n])
desc = SnapDbAccountsRef.init(dbBase, root, peer)
for w in accList:
check desc.importAccounts(w.base,w.data, persistent=true).isImportOk
let rc = desc.inspectAccountsTrie(persistent=true)