mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-01-30 22:16:25 +00:00
e14fd4b96c
* Update log ticker, using time interval rather than ticker count why: Counting and logging ticker occurrences is inherently imprecise. So time intervals are used. * Use separate storage tables for snap sync data * Left boundary proof update why: Was not properly implemented, yet. * Capture pivot in peer worker (aka buddy) tasks why: The pivot environment is linked to the `buddy` descriptor. While there is a task switch, the pivot may change. So it is passed on as function argument `env` rather than retrieved from the buddy at the start of a sub-function. * Split queues `fetchStorage` into `fetchStorageFull` and `fetchStoragePart` * Remove obsolete account range returned from `GetAccountRange` message why: Handler returned the wrong right value of the range. This range was for convenience, only. * Prioritise storage slots if the queue becomes large why: Currently, accounts processing is prioritised up until all accounts are downloaded. The new prioritisation has two thresholds for + start processing storage slots with a new worker + stop account processing and switch to storage processing also: Provide api for `SnapTodoRanges` pair of range sets in `worker_desc.nim` * Generalise left boundary proof for accounts or storage slots. why: Detailed explanation how this works is documented with `snapdb_accounts.importAccounts()`. Instead of enforcing a left boundary proof (which is still the default), the importer functions return a list of `holes` (aka node paths) found in the argument ranges of leaf nodes. This in turn is used by the book keeping software for data download. * Forgot to pass on variable in function wrapper also: + Start healing not before 99% accounts covered (previously 95%) + Logging updated/prettified
223 lines
7.5 KiB
Nim
223 lines
7.5 KiB
Nim
# Nimbus
|
|
# Copyright (c) 2021 Status Research & Development GmbH
|
|
# Licensed under either of
|
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
|
# http://www.apache.org/licenses/LICENSE-2.0)
|
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
|
# http://opensource.org/licenses/MIT)
|
|
# at your option. This file may not be copied, modified, or distributed
|
|
# except according to those terms.
|
|
|
|
## Fetch account ranges
|
|
## ====================
|
|
##
|
|
## Acccount ranges not on the database yet are organised in the set
|
|
## `env.fetchAccounts.unprocessed` of intervals (of account hashes.)
|
|
##
|
|
## When processing, the followin happens.
|
|
##
|
|
## * Some interval `iv` is removed from the `env.fetchAccounts.unprocessed`
|
|
## set. This interval set might then be safely accessed and manipulated by
|
|
## other worker instances.
|
|
##
|
|
## * The data points in the interval `iv` (aka ccount hashes) are fetched from
|
|
## another peer over the network.
|
|
##
|
|
## * The received data points of the interval `iv` are verified and merged
|
|
## into the persistent database.
|
|
##
|
|
## * Data points in `iv` that were invalid or not recevied from the network
|
|
## are merged back it the set `env.fetchAccounts.unprocessed`.
|
|
##
|
|
|
|
import
|
|
chronicles,
|
|
chronos,
|
|
eth/[common, p2p],
|
|
stew/[interval_set, keyed_queue],
|
|
stint,
|
|
../../sync_desc,
|
|
".."/[constants, range_desc, worker_desc],
|
|
./com/[com_error, get_account_range],
|
|
./db/snapdb_accounts
|
|
|
|
{.push raises: [Defect].}
|
|
|
|
logScope:
|
|
topics = "snap-range"
|
|
|
|
const
|
|
extraTraceMessages = false or true
|
|
## Enabled additional logging noise
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private logging helpers
|
|
# ------------------------------------------------------------------------------
|
|
|
|
template logTxt(info: static[string]): static[string] =
|
|
"Accounts range " & info
|
|
|
|
proc dumpUnprocessed(
|
|
buddy: SnapBuddyRef;
|
|
env: SnapPivotRef;
|
|
): string =
|
|
## Debugging ...
|
|
let
|
|
peer = buddy.peer
|
|
pivot = "#" & $env.stateHeader.blockNumber # for logging
|
|
moan = proc(overlap: UInt256; iv: NodeTagRange) =
|
|
trace logTxt "unprocessed => overlap", peer, pivot, overlap, iv
|
|
|
|
env.fetchAccounts.unprocessed.dump(moan, 5)
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private functions
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc getUnprocessed(
|
|
buddy: SnapBuddyRef;
|
|
env: SnapPivotRef;
|
|
): Result[NodeTagRange,void] =
|
|
## Fetch an interval from one of the account range lists.
|
|
let accountRangeMax = high(UInt256) div buddy.ctx.buddiesMax.u256
|
|
|
|
env.fetchAccounts.unprocessed.fetch accountRangeMax
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private functions: do the account fetching for one round
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc accountsRangefetchImpl(
|
|
buddy: SnapBuddyRef;
|
|
env: SnapPivotRef;
|
|
): Future[bool] {.async.} =
|
|
## Fetch accounts and store them in the database. Returns true while more
|
|
## data can probably be fetched.
|
|
let
|
|
ctx = buddy.ctx
|
|
peer = buddy.peer
|
|
stateRoot = env.stateHeader.stateRoot
|
|
pivot = "#" & $env.stateHeader.blockNumber # for logging
|
|
|
|
# Get a range of accounts to fetch from
|
|
let iv = block:
|
|
let rc = buddy.getUnprocessed(env)
|
|
if rc.isErr:
|
|
when extraTraceMessages:
|
|
trace logTxt "currently all processed", peer, pivot
|
|
return
|
|
rc.value
|
|
|
|
# Process received accounts and stash storage slots to fetch later
|
|
let dd = block:
|
|
let rc = await buddy.getAccountRange(stateRoot, iv, pivot)
|
|
if rc.isErr:
|
|
env.fetchAccounts.unprocessed.merge iv # fail => interval back to pool
|
|
let error = rc.error
|
|
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors):
|
|
when extraTraceMessages:
|
|
trace logTxt "fetch error => stop", peer, pivot, reqLen=iv.len, error
|
|
return
|
|
rc.value
|
|
|
|
# Reset error counts for detecting repeated timeouts, network errors, etc.
|
|
buddy.data.errors.resetComError()
|
|
|
|
let
|
|
gotAccounts = dd.data.accounts.len
|
|
gotStorage = dd.withStorage.len
|
|
|
|
#when extraTraceMessages:
|
|
# trace logTxt "fetched", peer, gotAccounts, gotStorage,
|
|
# pivot, reqLen=iv.len, gotLen=dd.consumed.len
|
|
|
|
# Now, as we fully own the scheduler and the original interval can savely be
|
|
# placed back for a moment -- to be corrected below.
|
|
env.fetchAccounts.unprocessed.merge iv
|
|
|
|
# Processed accounts hashes are set up as a set of intervals which is needed
|
|
# if the data range returned from the network contains holes.
|
|
let processed = NodeTagRangeSet.init()
|
|
if 0 < dd.data.accounts.len:
|
|
discard processed.merge(iv.minPt, dd.data.accounts[^1].accKey.to(NodeTag))
|
|
else:
|
|
discard processed.merge iv
|
|
|
|
let dangling = block:
|
|
# No left boundary check needed. If there is a gap, the partial path for
|
|
# that gap is returned by the import function to be registered, below.
|
|
let rc = ctx.data.snapDb.importAccounts(
|
|
peer, stateRoot, iv.minPt, dd.data, noBaseBoundCheck = true)
|
|
if rc.isErr:
|
|
# Bad data, just try another peer
|
|
buddy.ctrl.zombie = true
|
|
when extraTraceMessages:
|
|
trace logTxt "import failed => stop", peer, gotAccounts, gotStorage,
|
|
pivot, reqLen=iv.len, gotLen=processed.total, error=rc.error
|
|
return
|
|
rc.value
|
|
|
|
# Statistics
|
|
env.nAccounts.inc(gotAccounts)
|
|
|
|
# Punch holes into the reproted range from the network if it contains holes.
|
|
for w in dangling:
|
|
discard processed.reduce(
|
|
w.partialPath.min(NodeKey).to(NodeTag),
|
|
w.partialPath.max(NodeKey).to(Nodetag))
|
|
|
|
# Update book keeping
|
|
for w in processed.increasing:
|
|
# Remove the processed range from the batch of unprocessed ones.
|
|
env.fetchAccounts.unprocessed.reduce w
|
|
# Register consumed intervals on the accumulator over all state roots.
|
|
discard buddy.ctx.data.coveredAccounts.merge w
|
|
|
|
# Register accounts with storage slots on the storage TODO list.
|
|
env.fetchStorageFull.merge dd.withStorage
|
|
|
|
when extraTraceMessages:
|
|
trace logTxt "request done", peer, pivot,
|
|
nCheckNodes=env.fetchAccounts.checkNodes.len,
|
|
nMissingNodes=env.fetchAccounts.missingNodes.len,
|
|
imported=processed.dump(), unprocessed=buddy.dumpUnprocessed(env)
|
|
|
|
return true
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc rangeFetchAccounts*(buddy: SnapBuddyRef) {.async.} =
|
|
## Fetch accounts and store them in the database.
|
|
let env = buddy.data.pivotEnv
|
|
if not env.fetchAccounts.unprocessed.isEmpty():
|
|
let
|
|
ctx = buddy.ctx
|
|
peer = buddy.peer
|
|
pivot = "#" & $env.stateHeader.blockNumber # for logging
|
|
|
|
when extraTraceMessages:
|
|
trace logTxt "start", peer, pivot
|
|
|
|
var nFetchAccounts = 0
|
|
while not env.fetchAccounts.unprocessed.isEmpty() and
|
|
buddy.ctrl.running and
|
|
env == buddy.data.pivotEnv:
|
|
nFetchAccounts.inc
|
|
if not await buddy.accountsRangefetchImpl(env):
|
|
break
|
|
|
|
# Clean up storage slots queue first it it becomes too large
|
|
let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
|
|
if snapAccountsBuddyStoragesSlotsQuPrioThresh < nStoQu:
|
|
break
|
|
|
|
when extraTraceMessages:
|
|
trace logTxt "done", peer, pivot, nFetchAccounts,
|
|
runState=buddy.ctrl.state
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# End
|
|
# ------------------------------------------------------------------------------
|