Prep for full sync after snap make 5 (#1286)

* Update docu and logging

* Extracted and updated constants from `worker_desc` into separate file

* Update and re-calibrate communication error handling

* Allow simplified pivot negotiation

why:
  This feature allows to turn off pivot negotiation so that peers agree
  on a a pivot header.

  For snap sync with fast changing pivots this only throttles the sync
  process. The finally downloaded DB snapshot is typically a merged
  version of different pivot states augmented by a healing process.

* Re-model worker queues for accounts download & healing

why:
  Currently there is only one data fetch per download or healing task.
  This task is then repeated by the scheduler after a short time. In
  many cases, this short time seems enough for some peers to decide to
  terminate connection.

* Update main task batch `runMulti()`

details:
  The function `runMulti()` is activated in quasi-parallel mode by the
  scheduler. This function calls the download, healing and fast-sync
  functions.

  While in debug mode, after each set of jobs run by this function the
  database is analysed (by the `snapdb_check` module) and the result
  printed.
This commit is contained in:
Jordan Hrycaj 2022-11-01 15:07:44 +00:00 committed by GitHub
parent ec59819953
commit a689e9185a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 889 additions and 364 deletions

View File

@ -30,7 +30,7 @@ const
extraTraceMessages = false or true
## Additional trace commands
minPeersToStartSync = 2
pivotMinPeersToStartSync* = 2
## Wait for consensus of at least this number of peers before syncing.
type
@ -39,6 +39,7 @@ type
rng: ref HmacDrbgContext ## Random generator
untrusted: seq[Peer] ## Clean up list
trusted: HashSet[Peer] ## Peers ready for delivery
relaxedMode: bool ## Not using strictly `trusted` set
BestPivotWorkerRef* = ref object of RootRef
## Data for this peer only
@ -242,13 +243,30 @@ proc clear*(bp: BestPivotWorkerRef) =
# Public functions
# ------------------------------------------------------------------------------
proc pivotRelaxedMode*(ctx: BestPivotCtxRef; enable = false) =
## Controls relaxed mode. In relaxed mode, the *best header* is fetched
## from the network and used as pivot if its block number is large enough.
## Otherwise, the default is to find at least `pivotMinPeersToStartSync`
## peers (this one included) that agree on a minimum pivot.
ctx.relaxedMode = enable
proc pivotHeader*(bp: BestPivotWorkerRef): Result[BlockHeader,void] =
## Returns cached block header if available and the buddy `peer` is trusted.
## In relaxed mode (see `pivotRelaxedMode()`), also lesser trusted pivots
## are returned.
if bp.header.isSome and
bp.peer notin bp.global.untrusted and
minPeersToStartSync <= bp.global.trusted.len and
bp.peer notin bp.global.untrusted:
if pivotMinPeersToStartSync <= bp.global.trusted.len and
bp.peer in bp.global.trusted:
return ok(bp.header.unsafeGet)
if bp.global.relaxedMode:
when extraTraceMessages:
trace "Returning not fully trusted pivot", peer=bp.peer,
trusted=bp.global.trusted.len, untrusted=bp.global.untrusted.len
return ok(bp.header.unsafeGet)
err()
proc pivotNegotiate*(
@ -261,6 +279,10 @@ proc pivotNegotiate*(
## the current `buddy` can be used for syncing and the function
## `bestPivotHeader()` will succeed returning a `BlockHeader`.
##
## In relaxed mode (see `pivotRelaxedMode()`), negotiation stopps when there
## is a *best header*. It caches the best header and returns `true` it the
## block number is large enough.
##
## Ackn: nim-eth/eth/p2p/blockchain_sync.nim: `startSyncWithPeer()`
##
let peer = bp.peer
@ -291,9 +313,14 @@ proc pivotNegotiate*(
trace "Useless peer, best number too low", peer,
trusted=bp.global.trusted.len, runState=bp.ctrl.state,
minNumber, bestNumber
return false
bp.header = some(rc.value)
if minPeersToStartSync <= bp.global.trusted.len:
# No further negotiation if in relaxed mode
if bp.global.relaxedMode:
return true
if pivotMinPeersToStartSync <= bp.global.trusted.len:
# We have enough trusted peers. Validate new peer against trusted
let rc = bp.getRandomTrustedPeer()
if rc.isOK:
@ -311,10 +338,11 @@ proc pivotNegotiate*(
if not rx.error:
# Other peer is dead
bp.global.trusted.excl rc.value
return false
# If there are no trusted peers yet, assume this very peer is trusted,
# but do not finish initialisation until there are more peers.
elif bp.global.trusted.len == 0:
if bp.global.trusted.len == 0:
bp.global.trusted.incl peer
when extraTraceMessages:
let bestHeader =
@ -322,12 +350,12 @@ proc pivotNegotiate*(
else: "nil"
trace "Assume initial trusted peer", peer,
trusted=bp.global.trusted.len, runState=bp.ctrl.state, bestHeader
return false
elif bp.global.trusted.len == 1 and bp.peer in bp.global.trusted:
if bp.global.trusted.len == 1 and bp.peer in bp.global.trusted:
# Ignore degenerate case, note that `trusted.len < minPeersToStartSync`
discard
return false
else:
# At this point we have some "trusted" candidates, but they are not
# "trusted" enough. We evaluate `peer` against all other candidates. If
# one of the candidates disagrees, we swap it for `peer`. If all candidates
@ -383,7 +411,7 @@ proc pivotNegotiate*(
discard
# Evaluate status, finally
if minPeersToStartSync <= bp.global.trusted.len:
if pivotMinPeersToStartSync <= bp.global.trusted.len:
when extraTraceMessages:
let bestHeader =
if bp.header.isSome: "#" & $bp.header.get.blockNumber

View File

@ -0,0 +1,109 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises: [Defect].}
const
pivotBlockDistanceMin* = 128
## The minimal depth of two block headers needed to activate a new state
## root pivot.
##
## Effects on assembling the state via `snap/1` protocol:
##
## * A small value of this constant increases the propensity to update the
## pivot header more often. This is so because each new peer negoiates a
## pivot block number at least the current one.
##
## * A large value keeps the current pivot more stable but some experiments
## suggest that the `snap/1` protocol is answered only for later block
## numbers (aka pivot blocks.) So a large value tends to keep the pivot
## farther away from the chain head.
##
## Note that 128 is the magic distance for snapshots used by *Geth*.
pivotEnvStopChangingIfComplete* = true
## If set `true`, new peers will not change the pivot even if the
## negotiated pivot would be newer. This should be the default.
# --------------
snapRequestBytesLimit* = 2 * 1024 * 1024
## Soft bytes limit to request in `snap` protocol calls.
snapStoragesSlotsFetchMax* = 2 * 1024
## Maximal number of storage tries to fetch with a single message.
snapTrieNodeFetchMax* = 1024
## Informal maximal number of trie nodes to fetch at once. This is not
## an official limit but found on several implementations (e.g. Geth.)
##
## Resticting the fetch list length early allows to better paralellise
## healing.
snapAccountsHealBatchFetchMax* = 5 * snapTrieNodeFetchMax
## Keap on gloing in healing task up until this many nodes have been
## fetched from the network or some error contition therminates the task.
# --------------
healAccountsTrigger* = 0.95
## Apply accounts healing if the global snap download coverage factor
## exceeds this setting. The global coverage factor is derived by merging
## all account ranges retrieved for all pivot state roots (see
## `coveredAccounts` in `CtxData`.)
##
## A small value of this constant leads to early healing. This produces
## stray leaf account records so fragmenting larger intervals of missing
## account ranges. This in turn leads to smaller but more range requests
## over the network. More requests might be a disadvantage if peers only
## serve a maximum number requests (rather than data.)
healSlorageSlotsTrigger* = 0.70
## Consider per account storage slost healing if a per-account hexary
## sub-trie has reached this factor of completeness.
healStoragesSlotsBatchMax* = 32
## Maximal number of storage tries to to heal in a single batch run. Only
## this many items will be removed from the batch queue. These items will
## then be processed one by one.
# --------------
comErrorsTimeoutMax* = 4
## Maximal number of non-resonses accepted in a row. If there are more than
## `comErrorsTimeoutMax` consecutive errors, the worker will be degraded
## as zombie.
comErrorsTimeoutSleepMSecs* = 5000
## Wait/suspend for this many seconds after a timeout error if there are
## not more than `comErrorsTimeoutMax` errors in a row (maybe some other
## network or no-data errors mixed in.) Set 0 to disable.
comErrorsNetworkMax* = 5
## Similar to `comErrorsTimeoutMax` but for network errors.
comErrorsNetworkSleepMSecs* = 5000
## Similar to `comErrorsTimeoutSleepSecs` but for network errors.
## Set 0 to disable.
comErrorsNoDataMax* = 3
## Similar to `comErrorsTimeoutMax` but for missing data errors.
comErrorsNoDataSleepMSecs* = 0
## Similar to `comErrorsTimeoutSleepSecs` but for missing data errors.
## Set 0 to disable.
static:
doAssert healAccountsTrigger < 1.0 # larger values make no sense
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -12,20 +12,20 @@ import
std/[hashes, math, options, sets, strutils],
chronicles,
chronos,
eth/[common/eth_types, p2p],
eth/[common, p2p],
stew/[interval_set, keyed_queue],
../../db/select_backend,
".."/[handlers, misc/best_pivot, protocol, sync_desc],
./worker/[heal_accounts, heal_storage_slots,
range_fetch_accounts, range_fetch_storage_slots, ticker],
./worker/com/com_error,
./worker/db/snapdb_desc,
"."/[range_desc, worker_desc]
./worker/db/[snapdb_check, snapdb_desc],
"."/[constants, range_desc, worker_desc]
{.push raises: [Defect].}
logScope:
topics = "snap-sync"
topics = "snap-buddy"
const
extraTraceMessages = false or true
@ -121,7 +121,7 @@ proc appendPivotEnv(buddy: SnapBuddyRef; header: BlockHeader) =
ctx = buddy.ctx
minNumber = block:
let rc = ctx.data.pivotTable.lastValue
if rc.isOk: rc.value.stateHeader.blockNumber + minPivotBlockDistance
if rc.isOk: rc.value.stateHeader.blockNumber + pivotBlockDistanceMin
else: 1.toBlockNumber
# Check whether the new header follows minimum depth requirement. This is
@ -151,19 +151,20 @@ proc updateSinglePivot(buddy: SnapBuddyRef): Future[bool] {.async.} =
var header = buddy.pivot.pivotHeader.value
# Check whether there is no environment change needed
when noPivotEnvChangeIfComplete:
when pivotEnvStopChangingIfComplete:
let rc = ctx.data.pivotTable.lastValue
if rc.isOk and rc.value.serialSync:
if rc.isOk and rc.value.storageDone:
# No neede to change
if extraTraceMessages:
trace "No need to change snap pivot", peer,
pivot=("#" & $rc.value.stateHeader.blockNumber),
stateRoot=rc.value.stateHeader.stateRoot,
multiOk=buddy.ctrl.multiOk, runState=buddy.ctrl.state
return true
buddy.appendPivotEnv(header)
trace "Snap pivot initialised", peer, pivot=("#" & $header.blockNumber),
info "Snap pivot initialised", peer, pivot=("#" & $header.blockNumber),
multiOk=buddy.ctrl.multiOk, runState=buddy.ctrl.state
return true
@ -222,6 +223,7 @@ proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool =
if ctx.data.dbBackend.isNil: SnapDbRef.init(ctx.chain.db.db)
else: SnapDbRef.init(ctx.data.dbBackend)
ctx.pivot = BestPivotCtxRef.init(ctx.data.rng)
ctx.pivot.pivotRelaxedMode(enable = true)
if tickerOK:
ctx.data.ticker = TickerRef.init(ctx.tickerUpdate)
else:
@ -309,23 +311,31 @@ proc runPool*(buddy: SnapBuddyRef, last: bool) =
let rc = ctx.data.pivotTable.lastValue
if rc.isOk:
# Check whether accounts and storage might be complete.
let env = rc.value
if not env.serialSync:
# Check whether last pivot accounts and storage are complete.
let
env = rc.value
peer = buddy.peer
pivot = "#" & $env.stateHeader.blockNumber # for logging
if not env.storageDone:
# Check whether accounts download is complete
block checkAccountsComplete:
for ivSet in env.fetchAccounts.unprocessed:
if ivSet.chunks != 0:
break checkAccountsComplete
env.accountsDone = true
if env.fetchAccounts.unprocessed.isEmpty():
# FIXME: This check might not be needed. It will visit *every* node
# in the hexary trie for checking the account leaves.
if buddy.checkAccountsTrieIsComplete(env):
env.accountsState = HealerDone
# Check whether storage slots are complete
if env.fetchStorage.len == 0:
env.serialSync = true
env.storageDone = true
if extraTraceMessages:
trace "Checked for pivot DB completeness",
nAccounts=env.nAccounts, accountsDone=env.accountsDone,
nSlotLists=env.nSlotLists, storageDone=env.serialSync
trace "Checked for pivot DB completeness", peer, pivot,
nAccounts=env.nAccounts, accountsState=env.accountsState,
nSlotLists=env.nSlotLists, storageDone=env.storageDone
proc runMulti*(buddy: SnapBuddyRef) {.async.} =
@ -338,43 +348,84 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} =
peer = buddy.peer
# Set up current state root environment for accounts snapshot
let env = block:
let
env = block:
let rc = ctx.data.pivotTable.lastValue
if rc.isErr:
return # nothing to do
rc.value
pivot = "#" & $env.stateHeader.blockNumber # for logging
buddy.data.pivotEnv = env
if env.serialSync:
trace "Snap serial sync -- not implemented yet", peer
# Full sync processsing based on current snapshot
# -----------------------------------------------
if env.storageDone:
if not buddy.checkAccountsTrieIsComplete(env):
error "Ooops, all accounts fetched but DvnB still incomplete", peer, pivot
if not buddy.checkStorageSlotsTrieIsComplete(env):
error "Ooops, all storages fetched but DB still incomplete", peer, pivot
trace "Snap full sync -- not implemented yet", peer, pivot
await sleepAsync(5.seconds)
return
else:
# Snapshot sync processing. Note that *serialSync => accountsDone*.
await buddy.rangeFetchAccounts()
if buddy.ctrl.stopped: return
# Snapshot sync processing
# ------------------------
await buddy.rangeFetchStorageSlots()
if buddy.ctrl.stopped: return
template runAsync(code: untyped) =
await code
if buddy.ctrl.stopped:
# To be disconnected from peer.
return
if env != ctx.data.pivotTable.lastValue.value:
# Pivot has changed, so restart with the latest one
return
# Pivot might have changed, so restart with the latest one
if env != ctx.data.pivotTable.lastValue.value: return
# If this is a new pivot, the previous one can be partially cleaned up.
# There is no point in keeping some older space consuming state data any
# longer.
block:
let rc = ctx.data.pivotTable.beforeLastValue
if rc.isOk:
let nFetchStorage = rc.value.fetchStorage.len
if 0 < nFetchStorage:
trace "Cleaning up previous pivot", peer, pivot, nFetchStorage
rc.value.fetchStorage.clear()
rc.value.fetchAccounts.checkNodes.setLen(0)
rc.value.fetchAccounts.missingNodes.setLen(0)
# If the current database is not complete yet
if 0 < env.fetchAccounts.unprocessed[0].chunks or
0 < env.fetchAccounts.unprocessed[1].chunks:
if env.accountsState != HealerDone:
runAsync buddy.rangeFetchAccounts()
runAsync buddy.rangeFetchStorageSlots()
await buddy.healAccounts()
if buddy.ctrl.stopped: return
# Can only run a single accounts healer instance at a time. This instance
# will clear the batch queue so there is nothing to do for another process.
if env.accountsState == HealerIdle:
env.accountsState = HealerRunning
runAsync buddy.healAccounts()
env.accountsState = HealerIdle
await buddy.healStorageSlots()
if buddy.ctrl.stopped: return
# Some additional storage slots might have been popped up
runAsync buddy.rangeFetchStorageSlots()
# Check whether accounts might be complete.
runAsync buddy.healStorageSlots()
# Debugging log: analyse pivot against database
discard buddy.checkAccountsListOk(env)
discard buddy.checkStorageSlotsTrieIsComplete(env)
# Check whether there are more accounts to fetch.
#
# Note that some other process might have temporarily borrowed from the
# `fetchAccounts.unprocessed` list. Whether we are done can only be decided
# if only a single buddy is active. S be it.
if env.fetchAccounts.unprocessed.isEmpty():
# Check whether pivot download is complete.
if env.fetchStorage.len == 0:
# Possibly done but some buddies might wait for an account range to be
# received from the network. So we need to sync.
trace "Running pool mode for verifying completeness", peer, pivot
buddy.ctx.poolMode = true
# ------------------------------------------------------------------------------

View File

@ -10,19 +10,15 @@
import
chronos,
../../../sync_desc
const
comErrorsTimeoutMax* = 2
## Maximal number of non-resonses accepted in a row. If there are more than
## `comErrorsTimeoutMax` consecutive errors, the worker will be degraded
## as zombie.
../../../sync_desc,
../../constants
type
ComErrorStatsRef* = ref object
## particular error counters so connections will not be cut immediately
## after a particular error.
nTimeouts*: uint
nNoData*: uint
nNetwork*: uint
ComError* = enum
@ -37,7 +33,7 @@ type
ComNoAccountsForStateRoot
ComNoByteCodesAvailable
ComNoDataForProof
ComNoHeaderAvailable
#ComNoHeaderAvailable -- unused, see get_block_header.nim
ComNoStorageForAccounts
ComNoTrieNodesAvailable
ComResponseTimeout
@ -46,10 +42,10 @@ type
ComTooManyStorageSlots
ComTooManyTrieNodes
# Other errors not directly related to communication
ComInspectDbFailed
ComImportAccountsFailed
proc resetComError*(stats: ComErrorStatsRef) =
## Reset error counts after successful network operation
stats[].reset
proc stopAfterSeriousComError*(
ctrl: BuddyCtrlRef;
@ -57,42 +53,47 @@ proc stopAfterSeriousComError*(
stats: ComErrorStatsRef;
): Future[bool]
{.async.} =
## Error handling after data protocol failed.
## Error handling after data protocol failed. Returns `true` if the current
## worker should be terminated as *zombie*.
case error:
of ComResponseTimeout:
stats.nTimeouts.inc
if comErrorsTimeoutMax < stats.nTimeouts:
# Mark this peer dead, i.e. avoid fetching from this peer for a while
ctrl.zombie = true
else:
# Otherwise try again some time later. Nevertheless, stop the
# current action.
await sleepAsync(5.seconds)
return true
of ComNetworkProblem,
ComMissingProof,
ComAccountsMinTooSmall,
ComAccountsMaxTooLarge:
when 0 < comErrorsTimeoutSleepMSecs:
# Otherwise try again some time later.
await sleepAsync(comErrorsTimeoutSleepMSecs.milliseconds)
of ComNetworkProblem:
stats.nNetwork.inc
# Mark this peer dead, i.e. avoid fetching from this peer for a while
if comErrorsNetworkMax < stats.nNetwork:
ctrl.zombie = true
return true
of ComEmptyAccountsArguments,
ComEmptyRequestArguments,
ComEmptyPartialRange,
ComInspectDbFailed,
ComImportAccountsFailed,
ComNoDataForProof,
ComNothingSerious:
discard
when 0 < comErrorsNetworkSleepMSecs:
# Otherwise try again some time later.
await sleepAsync(comErrorsNetworkSleepMSecs.milliseconds)
of ComNoAccountsForStateRoot,
ComNoStorageForAccounts,
ComNoByteCodesAvailable,
ComNoHeaderAvailable,
ComNoTrieNodesAvailable,
ComNoStorageForAccounts,
#ComNoHeaderAvailable,
ComNoTrieNodesAvailable:
stats.nNoData.inc
if comErrorsNoDataMax < stats.nNoData:
ctrl.zombie = true
return true
when 0 < comErrorsNoDataSleepMSecs:
# Otherwise try again some time later.
await sleepAsync(comErrorsNoDataSleepMSecs.milliseconds)
of ComMissingProof,
ComAccountsMinTooSmall,
ComAccountsMaxTooLarge,
ComTooManyByteCodes,
ComTooManyHeaders,
ComTooManyStorageSlots,
@ -101,4 +102,11 @@ proc stopAfterSeriousComError*(
ctrl.zombie = true
return true
of ComEmptyAccountsArguments,
ComEmptyRequestArguments,
ComEmptyPartialRange,
ComNoDataForProof,
ComNothingSerious:
discard
# End

View File

@ -18,7 +18,7 @@ import
eth/[common, p2p, trie/trie_defs],
stew/interval_set,
"../../.."/[protocol, protocol/trace_config],
"../.."/[range_desc, worker_desc],
"../.."/[constants, range_desc, worker_desc],
./com_error
{.push raises: [Defect].}
@ -40,6 +40,7 @@ proc getAccountRangeReq(
buddy: SnapBuddyRef;
root: Hash256;
iv: NodeTagRange;
pivot: string;
): Future[Result[Option[SnapAccountRange],void]] {.async.} =
let
peer = buddy.peer
@ -48,7 +49,7 @@ proc getAccountRangeReq(
root, iv.minPt.to(Hash256), iv.maxPt.to(Hash256), snapRequestBytesLimit)
return ok(reply)
except CatchableError as e:
trace trSnapRecvError & "waiting for GetAccountRange reply", peer,
trace trSnapRecvError & "waiting for GetAccountRange reply", peer, pivot,
error=e.msg
return err()
@ -58,22 +59,23 @@ proc getAccountRangeReq(
proc getAccountRange*(
buddy: SnapBuddyRef;
stateRoot: Hash256;
iv: NodeTagRange;
stateRoot: Hash256; ## Current DB base (see `pivot` for logging)
iv: NodeTagRange; ## Range to be fetched
pivot: string; ## For logging, instead of `stateRoot`
): Future[Result[GetAccountRange,ComError]] {.async.} =
## Fetch data using the `snap#` protocol, returns the range covered.
let
peer = buddy.peer
if trSnapTracePacketsOk:
trace trSnapSendSending & "GetAccountRange", peer,
accRange=iv, stateRoot, bytesLimit=snapRequestBytesLimit
trace trSnapSendSending & "GetAccountRange", peer, pivot,
accRange=iv, bytesLimit=snapRequestBytesLimit
var dd = block:
let rc = await buddy.getAccountRangeReq(stateRoot, iv)
let rc = await buddy.getAccountRangeReq(stateRoot, iv, pivot)
if rc.isErr:
return err(ComNetworkProblem)
if rc.value.isNone:
trace trSnapRecvTimeoutWaiting & "for reply to GetAccountRange", peer
trace trSnapRecvTimeoutWaiting & "for AccountRange", peer, pivot
return err(ComResponseTimeout)
let snAccRange = rc.value.get
GetAccountRange(
@ -108,15 +110,15 @@ proc getAccountRange*(
# any) account after limitHash must be provided.
if nProof == 0:
# Maybe try another peer
trace trSnapRecvReceived & "empty AccountRange", peer,
nAccounts, nProof, accRange="n/a", reqRange=iv, stateRoot
trace trSnapRecvReceived & "empty AccountRange", peer, pivot,
nAccounts, nProof, accRange="n/a", reqRange=iv
return err(ComNoAccountsForStateRoot)
# So there is no data, otherwise an account beyond the interval end
# `iv.maxPt` would have been returned.
dd.consumed = NodeTagRange.new(iv.minPt, high(NodeTag))
trace trSnapRecvReceived & "terminal AccountRange", peer,
nAccounts, nProof, accRange=dd.consumed, reqRange=iv, stateRoot
trace trSnapRecvReceived & "terminal AccountRange", peer, pivot,
nAccounts, nProof, accRange=dd.consumed, reqRange=iv
return ok(dd)
let (accMinPt, accMaxPt) = (
@ -133,15 +135,15 @@ proc getAccountRange*(
# across both.
if 0.to(NodeTag) < iv.minPt:
trace trSnapRecvProtocolViolation & "proof-less AccountRange", peer,
nAccounts, nProof, accRange=NodeTagRange.new(iv.minPt, accMaxPt),
reqRange=iv, stateRoot
pivot, nAccounts, nProof, accRange=NodeTagRange.new(iv.minPt, accMaxPt),
reqRange=iv
return err(ComMissingProof)
if accMinPt < iv.minPt:
# Not allowed
trace trSnapRecvProtocolViolation & "min too small in AccountRange", peer,
nAccounts, nProof, accRange=NodeTagRange.new(accMinPt, accMaxPt),
reqRange=iv, stateRoot
pivot, nAccounts, nProof, accRange=NodeTagRange.new(accMinPt, accMaxPt),
reqRange=iv
return err(ComAccountsMinTooSmall)
if iv.maxPt < accMaxPt:
@ -157,13 +159,13 @@ proc getAccountRange*(
if iv.maxPt < dd.data.accounts[^2].accKey.to(NodeTag):
# The segcond largest should not excceed the top one requested.
trace trSnapRecvProtocolViolation & "AccountRange top exceeded", peer,
nAccounts, nProof, accRange=NodeTagRange.new(iv.minPt, accMaxPt),
reqRange=iv, stateRoot
pivot, nAccounts, nProof,
accRange=NodeTagRange.new(iv.minPt, accMaxPt), reqRange=iv
return err(ComAccountsMaxTooLarge)
dd.consumed = NodeTagRange.new(iv.minPt, max(iv.maxPt,accMaxPt))
trace trSnapRecvReceived & "AccountRange", peer,
nAccounts, nProof, accRange=dd.consumed, reqRange=iv, stateRoot
trace trSnapRecvReceived & "AccountRange", peer, pivot,
nAccounts, nProof, accRange=dd.consumed, reqRange=iv
return ok(dd)

View File

@ -15,7 +15,7 @@ import
eth/[common, p2p],
stew/interval_set,
"../../.."/[protocol, protocol/trace_config],
"../.."/[range_desc, worker_desc],
"../.."/[constants, range_desc, worker_desc],
./com_error
{.push raises: [Defect].}
@ -46,8 +46,9 @@ const
proc getStorageRangesReq(
buddy: SnapBuddyRef;
root: Hash256;
accounts: seq[Hash256],
iv: Option[NodeTagRange]
accounts: seq[Hash256];
iv: Option[NodeTagRange];
pivot: string;
): Future[Result[Option[SnapStorageRanges],void]]
{.async.} =
let
@ -70,7 +71,7 @@ proc getStorageRangesReq(
return ok(reply)
except CatchableError as e:
trace trSnapRecvError & "waiting for GetStorageRanges reply", peer,
trace trSnapRecvError & "waiting for GetStorageRanges reply", peer, pivot,
error=e.msg
return err()
@ -80,8 +81,9 @@ proc getStorageRangesReq(
proc getStorageRanges*(
buddy: SnapBuddyRef;
stateRoot: Hash256;
accounts: seq[AccountSlotsHeader],
stateRoot: Hash256; ## Current DB base (`pivot` for logging)
accounts: seq[AccountSlotsHeader]; ## List of per-account storage slots
pivot: string; ## For logging, instead of `stateRoot`
): Future[Result[GetStorageRanges,ComError]]
{.async.} =
## Fetch data using the `snap#` protocol, returns the range covered.
@ -99,16 +101,16 @@ proc getStorageRanges*(
return err(ComEmptyAccountsArguments)
if trSnapTracePacketsOk:
trace trSnapSendSending & "GetStorageRanges", peer,
nAccounts, stateRoot, bytesLimit=snapRequestBytesLimit
trace trSnapSendSending & "GetStorageRanges", peer, pivot,
nAccounts, bytesLimit=snapRequestBytesLimit
let snStoRanges = block:
let rc = await buddy.getStorageRangesReq(
stateRoot, accounts.mapIt(it.accKey.to(Hash256)), accounts[0].subRange)
let rc = await buddy.getStorageRangesReq(stateRoot,
accounts.mapIt(it.accKey.to(Hash256)), accounts[0].subRange, pivot)
if rc.isErr:
return err(ComNetworkProblem)
if rc.value.isNone:
trace trSnapRecvTimeoutWaiting & "for reply to GetStorageRanges", peer,
trace trSnapRecvTimeoutWaiting & "for StorageRanges", peer, pivot,
nAccounts
return err(ComResponseTimeout)
if nAccounts < rc.value.get.slotLists.len:
@ -129,8 +131,8 @@ proc getStorageRanges*(
# for any requested account hash, it must return an empty reply. It is
# the responsibility of the caller to query an state not older than 128
# blocks; and the caller is expected to only ever query existing accounts.
trace trSnapRecvReceived & "empty StorageRanges", peer,
nAccounts, nSlotLists, nProof, stateRoot, firstAccount=accounts[0].accKey
trace trSnapRecvReceived & "empty StorageRanges", peer, pivot,
nAccounts, nSlotLists, nProof, firstAccount=accounts[0].accKey
return err(ComNoStorageForAccounts)
# Assemble return structure for given peer response
@ -171,8 +173,8 @@ proc getStorageRanges*(
# assigning empty slice isa ok
dd.leftOver = dd.leftOver & accounts[nSlotLists ..< nAccounts]
trace trSnapRecvReceived & "StorageRanges", peer, nAccounts, nSlotLists,
nProof, nLeftOver=dd.leftOver.len, stateRoot
trace trSnapRecvReceived & "StorageRanges", peer, pivot, nAccounts,
nSlotLists, nProof, nLeftOver=dd.leftOver.len
return ok(dd)

View File

@ -13,7 +13,7 @@ import
chronos,
eth/[common, p2p],
"../../.."/[protocol, protocol/trace_config],
"../.."/[range_desc, worker_desc],
"../.."/[constants, range_desc, worker_desc],
./com_error
{.push raises: [Defect].}
@ -37,6 +37,7 @@ proc getTrieNodesReq(
buddy: SnapBuddyRef;
stateRoot: Hash256;
paths: seq[seq[Blob]];
pivot: string;
): Future[Result[Option[SnapTrieNodes],void]]
{.async.} =
let
@ -46,7 +47,7 @@ proc getTrieNodesReq(
return ok(reply)
except CatchableError as e:
trace trSnapRecvError & "waiting for GetByteCodes reply", peer,
trace trSnapRecvError & "waiting for GetByteCodes reply", peer, pivot,
error=e.msg
return err()
@ -56,8 +57,9 @@ proc getTrieNodesReq(
proc getTrieNodes*(
buddy: SnapBuddyRef;
stateRoot: Hash256;
paths: seq[seq[Blob]],
stateRoot: Hash256; ## Current DB base (see `pivot` for logging)
paths: seq[seq[Blob]]; ## Nodes to fetch
pivot: string; ## For logging, instead of `stateRoot`
): Future[Result[GetTrieNodes,ComError]]
{.async.} =
## Fetch data using the `snap#` protocol, returns the trie nodes requested
@ -72,15 +74,15 @@ proc getTrieNodes*(
let nTotal = paths.mapIt(it.len).foldl(a+b, 0)
if trSnapTracePacketsOk:
trace trSnapSendSending & "GetTrieNodes", peer,
trace trSnapSendSending & "GetTrieNodes", peer, pivot,
nPaths, nTotal, bytesLimit=snapRequestBytesLimit
let trieNodes = block:
let rc = await buddy.getTrieNodesReq(stateRoot, paths)
let rc = await buddy.getTrieNodesReq(stateRoot, paths, pivot)
if rc.isErr:
return err(ComNetworkProblem)
if rc.value.isNone:
trace trSnapRecvTimeoutWaiting & "for reply to GetTrieNodes", peer, nPaths
trace trSnapRecvTimeoutWaiting & "for TrieNodes", peer, pivot, nPaths
return err(ComResponseTimeout)
let blobs = rc.value.get.nodes
if nTotal < blobs.len:
@ -104,7 +106,7 @@ proc getTrieNodes*(
# nodes.
# * The responding node is allowed to return less data than requested
# (serving QoS limits), but the node must return at least one trie node.
trace trSnapRecvReceived & "empty TrieNodes", peer, nPaths, nNodes
trace trSnapRecvReceived & "empty TrieNodes", peer, pivot, nPaths, nNodes
return err(ComNoByteCodesAvailable)
# Assemble return value
@ -153,7 +155,7 @@ proc getTrieNodes*(
if 0 < pushBack.len:
dd.leftOver.add paths[n][0] & pushBack
trace trSnapRecvReceived & "TrieNodes", peer,
trace trSnapRecvReceived & "TrieNodes", peer, pivot,
nPaths, nNodes, nLeftOver=dd.leftOver.len
return ok(dd)

View File

@ -216,10 +216,10 @@ proc importAccounts*(
error "Import Accounts exception", peer=ps.peer, name=($e.name), msg=e.msg
return err(OSErrorException)
when extraTraceMessages:
trace "Accounts and proofs ok", peer=ps.peer,
root=ps.root.ByteArray32.toHex,
proof=data.proof.len, base, accounts=data.accounts.len
#when extraTraceMessages:
# trace "Accounts imported", peer=ps.peer,
# root=ps.root.ByteArray32.toHex,
# proof=data.proof.len, base, accounts=data.accounts.len
ok()
proc importAccounts*(

View File

@ -0,0 +1,266 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Check/analyse DB completeness
## =============================
import
chronicles,
eth/[common, p2p, trie/trie_defs],
stew/keyed_queue,
../../../../utils/prettify,
../../../sync_desc,
"../.."/[range_desc, worker_desc],
"."/[hexary_desc, hexary_error, snapdb_accounts, snapdb_storage_slots]
{.push raises: [Defect].}
logScope:
topics = "snap-db"
const
extraTraceMessages = false or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private logging helpers
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"Check DB " & info
proc accountsCtx(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): string =
let
ctx = buddy.ctx
"{" &
"pivot=" & "#" & $env.stateHeader.blockNumber & "," &
"nAccounts=" & $env.nAccounts & "," &
("covered=" & env.fetchAccounts.unprocessed.emptyFactor.toPC(0) & "/" &
ctx.data.coveredAccounts.fullFactor.toPC(0)) & "," &
"nCheckNodes=" & $env.fetchAccounts.checkNodes.len & "," &
"nMissingNodes=" & $env.fetchAccounts.missingNodes.len & "}"
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc storageSlotsCtx(
buddy: SnapBuddyRef;
storageRoot: Hash256;
env: SnapPivotRef;
): string =
let
ctx = buddy.ctx
rc = env.fetchStorage.eq(storageRoot)
if rc.isErr:
return "n/a"
let
data = rc.value
slots = data.slots
result = "{" &
"inherit=" & (if data.inherit: "t" else: "f") & ","
if not slots.isNil:
result &= "" &
"covered=" & slots.unprocessed.emptyFactor.toPC(0) &
"nCheckNodes=" & $slots.checkNodes.len & "," &
"nMissingNodes=" & $slots.missingNodes.len
result &= "}"
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc checkStorageSlotsTrie(
buddy: SnapBuddyRef;
accKey: NodeKey;
storageRoot: Hash256;
env: SnapPivotRef;
): Result[bool,HexaryDbError] =
## Check whether a storage slots hexary trie is complete.
let
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
rc = db.inspectStorageSlotsTrie(peer, accKey, storageRoot)
if rc.isErr:
return err(rc.error)
ok(rc.value.dangling.len == 0)
iterator accountsWalk(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): (NodeKey,Account,HexaryDbError) =
let
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
stateRoot = env.stateHeader.stateRoot
walk = SnapDbAccountsRef.init(db, stateRoot, peer)
var
accKey = NodeKey.default
count = 0
runOk = true
while runOk:
count.inc
let nextKey = block:
let rc = walk.nextAccountsChainDbKey(accKey)
if rc.isErr:
if rc.error != AccountNotFound:
error logTxt "accounts walk stopped", peer,
account=accKey.to(NodeTag),
ctx=buddy.accountsCtx(env), count, reason=rc.error
runOk = false
continue
rc.value
accKey = nextKey
let accData = block:
let rc = walk.getAccountsData(accKey, persistent = true)
if rc.isErr:
error logTxt "accounts walk error", peer, account=accKey,
ctx=buddy.accountsCtx(env), count, error=rc.error
runOk = false
continue
rc.value
yield (accKey, accData, NothingSerious)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc checkAccountsTrieIsComplete*(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): bool =
## Check whether accounts hexary trie is complete
let
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
stateRoot = env.stateHeader.stateRoot
rc = db.inspectAccountsTrie(peer, stateRoot)
if rc.isErr:
error logTxt "accounts health check failed", peer,
ctx=buddy.accountsCtx(env), error=rc.error
return false
rc.value.dangling.len == 0
proc checkAccountsListOk*(
buddy: SnapBuddyRef;
env: SnapPivotRef;
noisy = false;
): bool =
## Loop over accounts, returns `false` for some error.
let
ctx = buddy.ctx
peer = buddy.peer
var
accounts = 0
storage = 0
nextMsgThresh = 1
for (key,accData,error) in buddy.accountsWalk(env):
if error != NothingSerious:
error logTxt "accounts loop stopped", peer, ctx=buddy.accountsCtx(env),
accounts, storage, error
return false
accounts.inc
if accData.storageRoot != emptyRlpHash:
storage.inc
when extraTraceMessages:
if noisy and nextMsgThresh <= accounts:
debug logTxt "accounts loop check point", peer,
ctx=buddy.accountsCtx(env), accounts, storage
nextMsgThresh *= 2
when extraTraceMessages:
let isComplete = buddy.checkAccountsTrieIsComplete(env)
debug logTxt "accounts list report", peer, ctx=buddy.accountsCtx(env),
accounts, storage, isComplete
true
proc checkStorageSlotsTrieIsComplete*(
buddy: SnapBuddyRef;
accKey: NodeKey;
storageRoot: Hash256;
env: SnapPivotRef;
): bool =
## Check whether a storage slots hexary trie is complete.
let
peer = buddy.peer
rc = buddy.checkStorageSlotsTrie(accKey, storageRoot, env)
if rc.isOk:
return rc.value
when extraTraceMessages:
debug logTxt "atorage slots health check failed", peer,
nStoQueue=env.fetchStorage.len,
ctx=buddy.storageSlotsCtx(storageRoot, env), error=rc.error
proc checkStorageSlotsTrieIsComplete*(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): bool =
## Check for all accounts thye whether storage slot hexary tries are complete.
let
ctx = buddy.ctx
peer = buddy.peer
var
accounts = 0
incomplete = 0
complete = 0
for (accKey,accData,error) in buddy.accountsWalk(env):
if error != NothingSerious:
error logTxt "atorage slots accounts loop stopped", peer,
nStoQueue=env.fetchStorage.len, accounts, incomplete, complete, error
return false
accounts.inc
let storageRoot = accData.storageRoot
if storageRoot == emptyRlpHash:
continue
let rc = buddy.checkStorageSlotsTrie(accKey, storageRoot, env)
if rc.isOk and rc.value:
complete.inc
else:
incomplete.inc
when extraTraceMessages:
debug logTxt "storage slots report", peer, ctx=buddy.accountsCtx(env),
nStoQueue=env.fetchStorage.len, accounts, incomplete, complete
0 < accounts and incomplete == 0
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -251,10 +251,10 @@ proc importStorageSlots*(
error "Import storage slots exception", peer, itemInx, nItems,
name=($e.name), msg=e.msg, nErrors=result.len
when extraTraceMessages:
if result.len == 0:
trace "Storage slots imported", peer, nItems,
nSlotLists=data.storages.len, proofs=data.proof.len
#when extraTraceMessages:
# if result.len == 0:
# trace "Storage slots imported", peer, nItems,
# nSlotLists=data.storages.len, proofs=data.proof.len
proc importStorageSlots*(
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`

View File

@ -112,7 +112,7 @@ import
stew/[interval_set, keyed_queue],
../../../utils/prettify,
../../sync_desc,
".."/[range_desc, worker_desc],
".."/[constants, range_desc, worker_desc],
./com/[com_error, get_trie_nodes],
./db/[hexary_desc, hexary_error, snapdb_accounts]
@ -137,6 +137,7 @@ proc healingCtx(buddy: SnapBuddyRef): string =
ctx = buddy.ctx
env = buddy.data.pivotEnv
"{" &
"pivot=" & "#" & $env.stateHeader.blockNumber & "," &
"nAccounts=" & $env.nAccounts & "," &
("covered=" & env.fetchAccounts.unprocessed.emptyFactor.toPC(0) & "/" &
ctx.data.coveredAccounts.fullFactor.toPC(0)) & "," &
@ -212,9 +213,10 @@ proc getMissingNodesFromNetwork(
peer = buddy.peer
env = buddy.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
pivot = "#" & $env.stateHeader.blockNumber # for logging
nMissingNodes = env.fetchAccounts.missingNodes.len
inxLeft = max(0, nMissingNodes - maxTrieNodeFetch)
inxLeft = max(0, nMissingNodes - snapTrieNodeFetchMax)
# There is no point in processing too many nodes at the same time. So leave
# the rest on the `missingNodes` queue to be handled later.
@ -231,8 +233,11 @@ proc getMissingNodesFromNetwork(
# Fetch nodes from the network. Note that the remainder of the `missingNodes`
# list might be used by another process that runs semi-parallel.
let rc = await buddy.getTrieNodes(stateRoot, pathList)
let rc = await buddy.getTrieNodes(stateRoot, pathList, pivot)
if rc.isOk:
# Reset error counts for detecting repeated timeouts, network errors, etc.
buddy.data.errors.resetComError()
# Register unfetched missing nodes for the next pass
for w in rc.value.leftOver:
env.fetchAccounts.missingNodes.add NodeSpecs(
@ -316,35 +321,18 @@ proc registerAccountLeaf(
storageRoot: acc.storageRoot)
# ------------------------------------------------------------------------------
# Public functions
# Private functions: do the healing for one round
# ------------------------------------------------------------------------------
proc healAccounts*(buddy: SnapBuddyRef) {.async.} =
## Fetching and merging missing account trie database nodes.
proc accountsHealingImpl(buddy: SnapBuddyRef): Future[int] {.async.} =
## Fetching and merging missing account trie database nodes. It returns the
## number of nodes fetched from the network, and -1 upon error.
let
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
env = buddy.data.pivotEnv
# Only start healing if there is some completion level, already.
#
# We check against the global coverage factor, i.e. a measure for how
# much of the total of all accounts have been processed. Even if the trie
# database for the current pivot state root is sparsely filled, there
# is a good chance that it can inherit some unchanged sub-trie from an
# earlier pivor state root download. The healing process then works like
# sort of glue.
#
if env.nAccounts == 0 or
ctx.data.coveredAccounts.fullFactor < healAccountsTrigger:
#when extraTraceMessages:
# trace logTxt "postponed", peer, ctx=buddy.healingCtx()
return
when extraTraceMessages:
trace logTxt "started", peer, ctx=buddy.healingCtx()
# Update for changes since last visit
buddy.updateMissingNodesList()
@ -353,17 +341,17 @@ proc healAccounts*(buddy: SnapBuddyRef) {.async.} =
if env.fetchAccounts.checkNodes.len != 0 or
env.fetchAccounts.missingNodes.len == 0:
if not buddy.appendMoreDanglingNodesToMissingNodesList():
return
return 0
# Check whether the trie is complete.
if env.fetchAccounts.missingNodes.len == 0:
trace logTxt "complete", peer, ctx=buddy.healingCtx()
return # nothing to do
return 0 # nothing to do
# Get next batch of nodes that need to be merged it into the database
let nodeSpecs = await buddy.getMissingNodesFromNetwork()
if nodeSpecs.len == 0:
return
return 0
# Store nodes onto disk
let report = db.importRawAccountsNodes(peer, nodeSpecs)
@ -372,11 +360,7 @@ proc healAccounts*(buddy: SnapBuddyRef) {.async.} =
error logTxt "error updating persistent database", peer,
ctx=buddy.healingCtx(), nNodes=nodeSpecs.len, error=report[^1].error
env.fetchAccounts.missingNodes = env.fetchAccounts.missingNodes & nodeSpecs
return
when extraTraceMessages:
trace logTxt "merged into database", peer,
ctx=buddy.healingCtx(), nNodes=nodeSpecs.len
return -1
# Filter out error and leaf nodes
var nLeafNodes = 0 # for logging
@ -405,7 +389,54 @@ proc healAccounts*(buddy: SnapBuddyRef) {.async.} =
env.fetchAccounts.checkNodes.add nodePath
when extraTraceMessages:
trace logTxt "job done", peer, ctx=buddy.healingCtx(), nLeafNodes
trace logTxt "merged into database", peer,
ctx=buddy.healingCtx(), nNodes=nodeSpecs.len, nLeafNodes
return nodeSpecs.len
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc healAccounts*(buddy: SnapBuddyRef) {.async.} =
## Fetching and merging missing account trie database nodes.
let
ctx = buddy.ctx
peer = buddy.peer
env = buddy.data.pivotEnv
# Only start healing if there is some completion level, already.
#
# We check against the global coverage factor, i.e. a measure for how
# much of the total of all accounts have been processed. Even if the trie
# database for the current pivot state root is sparsely filled, there
# is a good chance that it can inherit some unchanged sub-trie from an
# earlier pivor state root download. The healing process then works like
# sort of glue.
#
if env.nAccounts == 0 or
ctx.data.coveredAccounts.fullFactor < healAccountsTrigger:
#when extraTraceMessages:
# trace logTxt "postponed", peer, ctx=buddy.healingCtx()
return
when extraTraceMessages:
trace logTxt "started", peer, ctx=buddy.healingCtx()
var
nNodesFetched = 0
nFetchLoop = 0
# Stop after `snapAccountsHealBatchFetchMax` nodes have been fetched
while nNodesFetched < snapAccountsHealBatchFetchMax:
var nNodes = await buddy.accountsHealingImpl()
if nNodes <= 0:
break
nNodesFetched.inc(nNodes)
nFetchLoop.inc
when extraTraceMessages:
trace logTxt "job done", peer, ctx=buddy.healingCtx(),
nNodesFetched, nFetchLoop
# ------------------------------------------------------------------------------
# End

View File

@ -11,8 +11,18 @@
## Heal storage DB:
## ================
##
## This module works similar to `heal_accounts` applied to each
## per-account storage slots hexary trie.
## This module works similar to `heal_accounts` applied to each per-account
## storage slots hexary trie. These per-account trie work items are stored in
## the list `env.fetchStorage`.
##
## There is one additional short cut for speeding up processing. If a
## per-account storage slots hexary trie is marked inheritable, it will be
## checked whether it is complete and can be used wholesale.
##
## Inheritable tries appear after a pivot state root change. Typically, not all
## account data have changed and so the same per-account storage slots are
## valid.
##
import
std/sequtils,
@ -22,7 +32,7 @@ import
stew/[interval_set, keyed_queue],
../../../utils/prettify,
../../sync_desc,
".."/[range_desc, worker_desc],
".."/[constants, range_desc, worker_desc],
./com/[com_error, get_trie_nodes],
./db/[hexary_desc, hexary_error, snapdb_storage_slots]
@ -47,8 +57,10 @@ proc healingCtx(
kvp: SnapSlotsQueuePair;
): string =
let
env = buddy.data.pivotEnv
slots = kvp.data.slots
"{" &
"pivot=" & "#" & $env.stateHeader.blockNumber & "," &
"covered=" & slots.unprocessed.emptyFactor.toPC(0) & "," &
"nCheckNodes=" & $slots.checkNodes.len & "," &
"nMissingNodes=" & $slots.missingNodes.len & "}"
@ -158,10 +170,11 @@ proc getMissingNodesFromNetwork(
env = buddy.data.pivotEnv
accKey = kvp.data.accKey
storageRoot = kvp.key
pivot = "#" & $env.stateHeader.blockNumber # for logging
slots = kvp.data.slots
nMissingNodes = slots.missingNodes.len
inxLeft = max(0, nMissingNodes - maxTrieNodeFetch)
inxLeft = max(0, nMissingNodes - snapTrieNodeFetchMax)
# There is no point in processing too many nodes at the same time. So leave
# the rest on the `missingNodes` queue to be handled later.
@ -180,8 +193,11 @@ proc getMissingNodesFromNetwork(
# list might be used by another process that runs semi-parallel.
let
req = @[accKey.to(Blob)] & fetchNodes.mapIt(it.partialPath)
rc = await buddy.getTrieNodes(storageRoot, @[req])
rc = await buddy.getTrieNodes(storageRoot, @[req], pivot)
if rc.isOk:
# Reset error counts for detecting repeated timeouts, network errors, etc.
buddy.data.errors.resetComError()
# Register unfetched missing nodes for the next pass
for w in rc.value.leftOver:
for n in 1 ..< w.len:
@ -418,7 +434,7 @@ proc healStorageSlots*(buddy: SnapBuddyRef) {.async.} =
for kvp in env.fetchStorage.nextPairs:
# Marked items indicate that a partial sub-trie existsts which might have
# been inherited from an earlier state root.
# been inherited from an earlier storage root.
if not kvp.data.inherit:
let slots = kvp.data.slots
@ -442,7 +458,7 @@ proc healStorageSlots*(buddy: SnapBuddyRef) {.async.} =
# Add to local batch to be processed, below
toBeHealed.add kvp
if maxStoragesHeal <= toBeHealed.len:
if healStoragesSlotsBatchMax <= toBeHealed.len:
break
# Run against local batch

View File

@ -8,24 +8,26 @@
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Fetch accounts stapshot
## =======================
## Fetch account ranges
## ====================
##
## Worker items state diagram:
## ::
## unprocessed | peer workers + |
## account ranges | account database update | unprocessed storage slots
## ========================================================================
## Acccount ranges not on the database yet are organised in the set
## `env.fetchAccounts.unprocessed` of intervals (of account hashes.)
##
## +---------------------------------------+
## | |
## v |
## <unprocessed> -----+------> <worker-0> ------+-----> OUTPUT
## | |
## +------> <worker-1> ------+
## | |
## +------> <worker-2> ------+
## : :
## When processing, the followin happens.
##
## * Some interval `iv` is removed from the `env.fetchAccounts.unprocessed`
## set. This interval set might then be safely accessed and manipulated by
## other worker instances.
##
## * The data points in the interval `iv` (aka ccount hashes) are fetched from
## another peer over the network.
##
## * The received data points of the interval `iv` are verified and merged
## into the persistent database.
##
## * Data points in `iv` that were invalid or not recevied from the network
## are merged back it the set `env.fetchAccounts.unprocessed`.
##
import
@ -48,6 +50,12 @@ const
extraTraceMessages = false or true
## Enabled additional logging noise
numChunksMax = 2000
## Bound for `numChunks()` (some fancy number)
addToFetchLoopMax = 4
## Add some extra when calculating number of fetch/store rounds
# ------------------------------------------------------------------------------
# Private logging helpers
# ------------------------------------------------------------------------------
@ -59,6 +67,12 @@ template logTxt(info: static[string]): static[string] =
# Private helpers
# ------------------------------------------------------------------------------
proc numChunks(buddy: SnapBuddyRef): int =
var total = 0u64
for ivSet in buddy.data.pivotEnv.fetchAccounts.unprocessed:
total += ivSet.chunks.uint64
min(numChunksMax.uint64, total).int
proc withMaxLen(
buddy: SnapBuddyRef;
iv: NodeTagRange;
@ -102,48 +116,50 @@ proc markGloballyProcessed(buddy: SnapBuddyRef; iv: NodeTagRange) =
discard buddy.ctx.data.coveredAccounts.merge(iv)
# ------------------------------------------------------------------------------
# Public functions
# Private functions: do the account fetching for one round
# ------------------------------------------------------------------------------
proc rangeFetchAccounts*(buddy: SnapBuddyRef) {.async.} =
## Fetch accounts and store them in the database.
proc accountsRagefetchImpl(buddy: SnapBuddyRef): Future[bool] {.async.} =
## Fetch accounts and store them in the database. Returns true while more
## data can probably be fetched.
let
ctx = buddy.ctx
peer = buddy.peer
env = buddy.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
pivot = "#" & $env.stateHeader.blockNumber # for logging
# Get a range of accounts to fetch from
let iv = block:
let rc = buddy.getUnprocessed()
if rc.isErr:
when extraTraceMessages:
trace logTxt "currently all processed", peer, stateRoot
trace logTxt "currently all processed", peer, pivot
return
rc.value
# Process received accounts and stash storage slots to fetch later
let dd = block:
let rc = await buddy.getAccountRange(stateRoot, iv)
let rc = await buddy.getAccountRange(stateRoot, iv, pivot)
if rc.isErr:
buddy.putUnprocessed(iv) # fail => interval back to pool
let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors):
when extraTraceMessages:
trace logTxt "fetch error => stop", peer,
stateRoot, req=iv.len, error
trace logTxt "fetch error => stop", peer, pivot, reqLen=iv.len, error
return
# Reset error counts for detecting repeated timeouts
buddy.data.errors.nTimeouts = 0
rc.value
# Reset error counts for detecting repeated timeouts, network errors, etc.
buddy.data.errors.resetComError()
let
gotAccounts = dd.data.accounts.len
gotStorage = dd.withStorage.len
when extraTraceMessages:
trace logTxt "fetched", peer, gotAccounts, gotStorage,
stateRoot, req=iv.len, got=dd.consumed
#when extraTraceMessages:
# trace logTxt "fetched", peer, gotAccounts, gotStorage,
# pivot, reqLen=iv.len, gotLen=dd.consumed.len
block:
let rc = ctx.data.snapDb.importAccounts(peer, stateRoot, iv.minPt, dd.data)
@ -153,7 +169,7 @@ proc rangeFetchAccounts*(buddy: SnapBuddyRef) {.async.} =
buddy.ctrl.zombie = true
when extraTraceMessages:
trace logTxt "import failed => stop", peer, gotAccounts, gotStorage,
stateRoot, req=iv.len, got=dd.consumed, error=rc.error
pivot, reqLen=iv.len, gotLen=dd.consumed.len, error=rc.error
return
# Statistics
@ -184,9 +200,35 @@ proc rangeFetchAccounts*(buddy: SnapBuddyRef) {.async.} =
# Store accounts on the storage TODO list.
env.fetchStorage.merge dd.withStorage
return true
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc rangeFetchAccounts*(buddy: SnapBuddyRef) {.async.} =
## Fetch accounts and store them in the database.
let numChunks = buddy.numChunks()
if 0 < numChunks:
let
ctx = buddy.ctx
peer = buddy.peer
env = buddy.data.pivotEnv
pivot = "#" & $env.stateHeader.blockNumber # for logging
nFetchLoopMax = max(ctx.buddiesMax + 1, numChunks) + addToFetchLoopMax
when extraTraceMessages:
trace logTxt "done", peer, gotAccounts, gotStorage,
stateRoot, req=iv.len, got=dd.consumed
trace logTxt "start", peer, pivot, nFetchLoopMax
var nFetchAccounts = 0
while nFetchAccounts < nFetchLoopMax:
if not await buddy.accountsRagefetchImpl():
break
nFetchAccounts.inc
when extraTraceMessages:
trace logTxt "done", peer, pivot, nFetchAccounts, nFetchLoopMax
# ------------------------------------------------------------------------------
# End

View File

@ -8,8 +8,8 @@
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Fetch accounts stapshot
## =======================
## Fetch storage slots
## ===================
##
## Flow chart for storage slots download
## -------------------------------------
@ -39,6 +39,17 @@
## * `{completed}`: list is optimised out
## * `{partial}`: list is optimised out
##
## Discussion
## ----------
## Handling storage slots can be seen as an generalisation of handling account
## ranges (see `range_fetch_accounts` module.) Contrary to the situation with
## accounts, storage slots are typically downloaded in the size of a full list
## that can be expanded to a full hexary trie for the given storage root.
##
## Only in rare cases a storage slots list is incomplete, a partial hexary
## trie. In that case, the list of storage slots is processed as described
## for accounts (see `range_fetch_accounts` module.)
##
import
chronicles,
@ -47,7 +58,7 @@ import
stew/[interval_set, keyed_queue],
stint,
../../sync_desc,
".."/[range_desc, worker_desc],
".."/[constants, range_desc, worker_desc],
./com/[com_error, get_storage_ranges],
./db/snapdb_storage_slots
@ -81,10 +92,10 @@ proc getNextSlotItems(
## item. An explicit list of slots is only calculated if there was a queue
## item with a partially completed slots download.
##
## * Otherwise, a list of at most `maxStoragesFetch` work items is returned.
## These work items were checked for that there was no trace of a previously
## installed (probably partial) storage trie on the database (e.g. inherited
## from an earlier state root pivot.)
## * Otherwise, a list of at most `snapStoragesSlotsFetchMax` work items is
## returned. These work items were checked for that there was no trace of a
## previously installed (probably partial) storage trie on the database
## (e.g. inherited from an earlier state root pivot.)
##
## If there is an indication that the storage trie may have some data
## already it is ignored here and marked `inherit` so that it will be
@ -152,7 +163,7 @@ proc getNextSlotItems(
env.fetchStorage.del(kvp.key) # ok to delete this item from batch queue
# Maximal number of items to fetch
if maxStoragesFetch <= result.len:
if snapStoragesSlotsFetchMax <= result.len:
break
when extraTraceMessages:
@ -170,6 +181,7 @@ proc storeStoragesSingleBatch(
peer = buddy.peer
env = buddy.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
pivot = "#" & $env.stateHeader.blockNumber # for logging
# Fetch storage data and save it on disk. Storage requests are managed by
# a request queue for handling partioal replies and re-fetch issues. For
@ -182,26 +194,26 @@ proc storeStoragesSingleBatch(
# Get storages slots data from the network
var stoRange = block:
let rc = await buddy.getStorageRanges(stateRoot, req)
let rc = await buddy.getStorageRanges(stateRoot, req, pivot)
if rc.isErr:
env.fetchStorage.merge req
let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors):
discard
trace logTxt "fetch error => stop", peer,
trace logTxt "fetch error => stop", peer, pivot,
nSlotLists=env.nSlotLists, nReq=req.len,
nStorageQueue=env.fetchStorage.len, error
return
rc.value
# Reset error counts for detecting repeated timeouts
buddy.data.errors.nTimeouts = 0
# Reset error counts for detecting repeated timeouts, network errors, etc.
buddy.data.errors.resetComError()
var gotSlotLists = stoRange.data.storages.len
#when extraTraceMessages:
# trace logTxt "fetched", peer,
# trace logTxt "fetched", peer, pivot,
# nSlotLists=env.nSlotLists, nSlotLists=gotSlotLists, nReq=req.len,
# nStorageQueue=env.fetchStorage.len, nLeftOvers=stoRange.leftOver.len
@ -217,7 +229,7 @@ proc storeStoragesSingleBatch(
env.fetchStorage.merge req
gotSlotLists.dec(report.len - 1) # for logging only
error logTxt "import failed", peer,
error logTxt "import failed", peer, pivot,
nSlotLists=env.nSlotLists, nSlotLists=gotSlotLists, nReq=req.len,
nStorageQueue=env.fetchStorage.len, error=report[^1].error
return
@ -246,7 +258,7 @@ proc storeStoragesSingleBatch(
# Update local statistics counter for `nSlotLists` counter update
gotSlotLists.dec
trace logTxt "processing error", peer, nSlotLists=env.nSlotLists,
trace logTxt "processing error", peer, pivot, nSlotLists=env.nSlotLists,
nSlotLists=gotSlotLists, nReqInx=inx, nReq=req.len,
nStorageQueue=env.fetchStorage.len, error=report[inx].error
@ -279,7 +291,8 @@ proc rangeFetchStorageSlots*(buddy: SnapBuddyRef) {.async.} =
if 0 < env.fetchStorage.len:
# Run at most the minimum number of times to get the batch queue cleaned up.
var
fullRangeLoopCount = 1 + (env.fetchStorage.len - 1) div maxStoragesFetch
fullRangeLoopCount =
1 + (env.fetchStorage.len - 1) div snapStoragesSlotsFetchMax
subRangeLoopCount = 0
# Add additional counts for partial slot range items

View File

@ -39,15 +39,15 @@ type
## Account fetching state that is shared among all peers.
nBuddies: int
lastStats: TickerStats
lastTick: uint64
statsCb: TickerStatsUpdater
logTicker: TimerCallback
tick: uint64 # more than 5*10^11y before wrap when ticking every sec
started: Time
visited: Time
const
tickerStartDelay = chronos.milliseconds(100)
tickerLogInterval = chronos.seconds(1)
tickerLogSuppressMax = 100
tickerLogSuppressMax = initDuration(seconds = 100)
# ------------------------------------------------------------------------------
# Private functions: pretty printing
@ -105,11 +105,13 @@ template noFmtError(info: static[string]; code: untyped) =
proc setLogTicker(t: TickerRef; at: Moment) {.gcsafe.}
proc runLogTicker(t: TickerRef) {.gcsafe.} =
let data = t.statsCb()
let
data = t.statsCb()
now = getTime().utc.toTime
if data != t.lastStats or t.lastTick + tickerLogSuppressMax < t.tick:
if data != t.lastStats or tickerLogSuppressMax < (now - t.visited):
t.lastStats = data
t.lastTick = t.tick
t.visited = now
var
nAcc, nSto, bulk: string
pivot = "n/a"
@ -119,7 +121,9 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
"(" & data.accountsFill[1].toPC(0) & ")" &
"/" & data.accountsFill[2].toPC(0)
buddies = t.nBuddies
tick = t.tick.toSI
# With `int64`, there are more than 29*10^10 years range for seconds
up = (now - t.started).inSeconds.uint64.toSI
mem = getTotalMem().uint.toSI
noFmtError("runLogTicker"):
@ -134,9 +138,8 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
nStoQue = $data.nStorageQueue.unsafeGet
info "Snap sync statistics",
tick, buddies, pivot, nAcc, accCov, nSto, nStoQue, mem
up, buddies, pivot, nAcc, accCov, nSto, nStoQue, mem
t.tick.inc
t.setLogTicker(Moment.fromNow(tickerLogInterval))
@ -156,6 +159,8 @@ proc start*(t: TickerRef) =
## Re/start ticker unconditionally
#debug "Started ticker"
t.logTicker = safeSetTimer(Moment.fromNow(tickerStartDelay), runLogTicker, t)
if t.started == Time.default:
t.started = getTime().utc.toTime
proc stop*(t: TickerRef) =
## Stop ticker unconditionally

View File

@ -19,60 +19,6 @@ import
{.push raises: [Defect].}
const
snapRequestBytesLimit* = 2 * 1024 * 1024
## Soft bytes limit to request in `snap` protocol calls.
minPivotBlockDistance* = 128
## The minimal depth of two block headers needed to activate a new state
## root pivot.
##
## Effects on assembling the state via `snap/1` protocol:
##
## * A small value of this constant increases the propensity to update the
## pivot header more often. This is so because each new peer negoiates a
## pivot block number at least the current one.
##
## * A large value keeps the current pivot more stable but some experiments
## suggest that the `snap/1` protocol is answered only for later block
## numbers (aka pivot blocks.) So a large value tends to keep the pivot
## farther away from the chain head.
##
## Note that 128 is the magic distance for snapshots used by *Geth*.
healAccountsTrigger* = 0.95
## Apply accounts healing if the global snap download coverage factor
## exceeds this setting. The global coverage factor is derived by merging
## all account ranges retrieved for all pivot state roots (see
## `coveredAccounts` in `CtxData`.)
healSlorageSlotsTrigger* = 0.70
## Consider per account storage slost healing if this particular sub-trie
## has reached this factor of completeness
maxStoragesFetch* = 5 * 1024
## Maximal number of storage tries to fetch with a single message.
maxStoragesHeal* = 32
## Maximal number of storage tries to to heal in a single batch run.
maxTrieNodeFetch* = 1024
## Informal maximal number of trie nodes to fetch at once. This is nor
## an official limit but found on several implementations (e.g. geth.)
##
## Resticting the fetch list length early allows to better paralellise
## healing.
maxHealingLeafPaths* = 1024
## Retrieve this many leave nodes with proper 32 bytes path when inspecting
## for dangling nodes. This allows to run healing paralell to accounts or
## storage download without requestinng an account/storage slot found by
## healing again with the download.
noPivotEnvChangeIfComplete* = true
## If set `true`, new peers will not change the pivot even if the
## negotiated pivot would be newer. This should be the default.
type
SnapSlotsQueue* = KeyedQueue[Hash256,SnapSlotsQueueItemRef]
## Handles list of storage slots data for fetch indexed by storage root.
@ -110,6 +56,13 @@ type
## Referenced object, so it can be made optional for the storage
## batch list
SnapHealingState* = enum
## State of healing process. The `HealerRunning` state indicates that
## dangling and/or missing nodes have been temprarily removed from the
## batch queue while processing.
HealerIdle
HealerRunning
HealerDone
SnapPivotRef* = ref object
## Per-state root cache for particular snap data environment
@ -117,11 +70,11 @@ type
# Accounts download
fetchAccounts*: SnapTrieRangeBatch ## Set of accounts ranges to fetch
accountsDone*: bool ## All accounts have been processed
accountsState*: SnapHealingState ## All accounts have been processed
# Storage slots download
fetchStorage*: SnapSlotsQueue ## Fetch storage for these accounts
serialSync*: bool ## Done with storage, block sync next
storageDone*: bool ## Done with storage, block sync next
# Info
nAccounts*: uint64 ## Imported # of accounts
@ -155,9 +108,6 @@ type
SnapCtxRef* = CtxRef[CtxData]
## Extended global descriptor
static:
doAssert healAccountsTrigger < 1.0 # larger values make no sense
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------

View File

@ -161,7 +161,7 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} =
discard dsc.buddies.lruFetch(peer.hash)
# Allow task switch
await sleepAsync(50.milliseconds)
await sleepAsync(1.milliseconds)
if worker.ctrl.stopped:
break