nimbus-eth1/nimbus/sync/snap/worker/store_storages.nim

178 lines
6.0 KiB
Nim
Raw Normal View History

Prep for full sync after snap (#1253) * Split fetch accounts into sub-modules details: There will be separated modules for accounts snapshot, storage snapshot, and healing for either. * Allow to rebase pivot before negotiated header why: Peers seem to have not too many snapshots available. By setting back the pivot block header slightly, the chances might be higher to find more peers to serve this pivot. Experiment on mainnet showed that setting back too much (tested with 1024), the chances to find matching snapshot peers seem to decrease. * Add accounts healing * Update variable/field naming in `worker_desc` for readability * Handle leaf nodes in accounts healing why: There is no need to fetch accounts when they had been added by the healing process. On the flip side, these accounts must be checked for storage data and the batch queue updated, accordingly. * Reorganising accounts hash ranges batch queue why: The aim is to formally cover as many accounts as possible for different pivot state root environments. Formerly, this was tried by starting the accounts batch queue at a random value for each pivot (and wrapping around.) Now, each pivot environment starts with an interval set mutually disjunct from any interval set retrieved with other pivot state roots. also: Stop fishing for more pivots in `worker` if 100% download is reached * Reorganise/update accounts healing why: Error handling was wrong and the (math. complexity of) whole process could be better managed. details: Much of the algorithm is now documented at the top of the file `heal_accounts.nim`
2022-10-08 17:20:50 +00:00
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Fetch accounts stapshot
## =======================
##
## Worker items state diagram:
## ::
## unprocessed slot requests | peer workers + storages database update
## ===================================================================
##
## +-----------------------------------------------+
## | |
## v |
## <unprocessed> ------------+-------> <worker-0> ------+
## | |
## +-------> <worker-1> ------+
## | |
## +-------> <worker-2> ------+
## : :
##
import
chronicles,
chronos,
eth/[common/eth_types, p2p],
stew/keyed_queue,
stint,
../../sync_desc,
".."/[range_desc, worker_desc],
./com/[com_error, get_storage_ranges],
./db/snap_db
{.push raises: [Defect].}
logScope:
topics = "snap-fetch"
const
extraTraceMessages = false or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc getNextSlotItem(buddy: SnapBuddyRef): Result[SnapSlotQueueItemRef,void] =
let env = buddy.data.pivotEnv
for w in env.fetchStorage.nextKeys:
# Make sure that this item was not fetched and rejected earlier
if w notin buddy.data.vetoSlots:
env.fetchStorage.del(w)
return ok(w)
err()
proc fetchAndImportStorageSlots(
buddy: SnapBuddyRef;
reqSpecs: seq[AccountSlotsHeader];
): Future[Result[seq[SnapSlotQueueItemRef],ComError]]
{.async.} =
## Fetch storage slots data from the network, store it on disk and
## return data to process in the next cycle.
let
ctx = buddy.ctx
peer = buddy.peer
env = buddy.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
# Get storage slots
var stoRange = block:
let rc = await buddy.getStorageRanges(stateRoot, reqSpecs)
if rc.isErr:
return err(rc.error)
rc.value
if 0 < stoRange.data.storages.len:
# Verify/process data and save to disk
block:
let rc = ctx.data.snapDb.importStorages(peer, stoRange.data)
if rc.isErr:
# Push back parts of the error item
var once = false
for w in rc.error:
if 0 <= w[0]:
# Reset any partial requests by not copying the `firstSlot` field.
# So all the storage slots are re-fetched completely for this
# account.
stoRange.addLeftOver(
@[AccountSlotsHeader(
accHash: stoRange.data.storages[w[0]].account.accHash,
storageRoot: stoRange.data.storages[w[0]].account.storageRoot)],
forceNew = not once)
once = true
# Do not ask for the same entries again on this `peer`
if once:
buddy.data.vetoSlots.incl stoRange.leftOver[^1]
if rc.error[^1][0] < 0:
discard
# TODO: disk storage failed or something else happend, so what?
# Return the remaining part to be processed later
return ok(stoRange.leftOver)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc storeStorages*(buddy: SnapBuddyRef) {.async.} =
## Fetch account storage slots and store them in the database.
let
ctx = buddy.ctx
peer = buddy.peer
env = buddy.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
var
once = true # for logging
# Fetch storage data and save it on disk. Storage requests are managed by
# a request queue for handling partioal replies and re-fetch issues. For
# all practical puroses, this request queue should mostly be empty.
while true:
# Pull out the next request item from the queue
let req = block:
let rc = buddy.getNextSlotItem()
if rc.isErr:
return # currently nothing to do
rc.value
when extraTraceMessages:
if once:
once = false
let nAccounts = 1 + env.fetchStorage.len
trace "Start fetching storage slotss", peer,
nAccounts, nVetoSlots=buddy.data.vetoSlots.len
block:
# Fetch and store account storage slots. On success, the `rc` value will
# contain a list of left-over items to be re-processed.
let rc = await buddy.fetchAndImportStorageSlots(req.q)
if rc.isErr:
# Save accounts/storage list to be processed later, then stop
discard env.fetchStorage.append req
let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors):
trace "Error fetching storage slots => stop", peer, error
discard
return
# Reset error counts for detecting repeated timeouts
buddy.data.errors.nTimeouts = 0
for qLo in rc.value:
# Handle queue left-overs for processing in the next cycle
if qLo.q[0].firstSlot == Hash256() and 0 < env.fetchStorage.len:
# Appending to last queue item is preferred over adding new item
let item = env.fetchStorage.first.value
item.q = item.q & qLo.q
else:
# Put back as-is.
discard env.fetchStorage.append qLo
# End while
when extraTraceMessages:
trace "Done fetching storage slots", peer, nAccounts=env.fetchStorage.len
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------