mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-02-12 03:57:05 +00:00
* Update logging * Fix node hash associated with partial path for missing nodes why: Healing uses the partial paths for fetching nodes from the network. The node hash (or key) is used to verify the node data retrieved. The trie inspector function returned the parent hash instead of the node hash with the partial path when a missing node was detected. So all nodes for healing were rejected. * Must not modify sequence while looping over it
194 lines
6.3 KiB
Nim
194 lines
6.3 KiB
Nim
# Nimbus
|
|
# Copyright (c) 2021 Status Research & Development GmbH
|
|
# Licensed under either of
|
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
|
# http://www.apache.org/licenses/LICENSE-2.0)
|
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
|
# http://opensource.org/licenses/MIT)
|
|
# at your option. This file may not be copied, modified, or distributed
|
|
# except according to those terms.
|
|
|
|
## Fetch accounts stapshot
|
|
## =======================
|
|
##
|
|
## Worker items state diagram:
|
|
## ::
|
|
## unprocessed | peer workers + |
|
|
## account ranges | account database update | unprocessed storage slots
|
|
## ========================================================================
|
|
##
|
|
## +---------------------------------------+
|
|
## | |
|
|
## v |
|
|
## <unprocessed> -----+------> <worker-0> ------+-----> OUTPUT
|
|
## | |
|
|
## +------> <worker-1> ------+
|
|
## | |
|
|
## +------> <worker-2> ------+
|
|
## : :
|
|
##
|
|
|
|
import
|
|
chronicles,
|
|
chronos,
|
|
eth/[common, p2p],
|
|
stew/[interval_set, keyed_queue],
|
|
stint,
|
|
../../sync_desc,
|
|
".."/[range_desc, worker_desc],
|
|
./com/[com_error, get_account_range],
|
|
./db/snapdb_accounts
|
|
|
|
{.push raises: [Defect].}
|
|
|
|
logScope:
|
|
topics = "snap-range"
|
|
|
|
const
|
|
extraTraceMessages = false or true
|
|
## Enabled additional logging noise
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private logging helpers
|
|
# ------------------------------------------------------------------------------
|
|
|
|
template logTxt(info: static[string]): static[string] =
|
|
"Accounts range " & info
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private helpers
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc withMaxLen(
|
|
buddy: SnapBuddyRef;
|
|
iv: NodeTagRange;
|
|
maxlen: UInt256;
|
|
): NodeTagRange =
|
|
## Reduce accounts interval to maximal size
|
|
if 0 < iv.len and iv.len <= maxLen:
|
|
iv
|
|
else:
|
|
NodeTagRange.new(iv.minPt, iv.minPt + (maxLen - 1.u256))
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private functions
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc getUnprocessed(buddy: SnapBuddyRef): Result[NodeTagRange,void] =
|
|
## Fetch an interval from one of the account range lists.
|
|
let
|
|
env = buddy.data.pivotEnv
|
|
accountRangeMax = high(UInt256) div buddy.ctx.buddiesMax.u256
|
|
|
|
for ivSet in env.fetchAccounts.unprocessed:
|
|
let rc = ivSet.ge()
|
|
if rc.isOk:
|
|
let iv = buddy.withMaxLen(rc.value, accountRangeMax)
|
|
discard ivSet.reduce(iv)
|
|
return ok(iv)
|
|
|
|
err()
|
|
|
|
proc putUnprocessed(buddy: SnapBuddyRef; iv: NodeTagRange) =
|
|
## Shortcut
|
|
discard buddy.data.pivotEnv.fetchAccounts.unprocessed[1].merge(iv)
|
|
|
|
proc delUnprocessed(buddy: SnapBuddyRef; iv: NodeTagRange) =
|
|
## Shortcut
|
|
discard buddy.data.pivotEnv.fetchAccounts.unprocessed[1].reduce(iv)
|
|
|
|
proc markGloballyProcessed(buddy: SnapBuddyRef; iv: NodeTagRange) =
|
|
## Shortcut
|
|
discard buddy.ctx.data.coveredAccounts.merge(iv)
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc rangeFetchAccounts*(buddy: SnapBuddyRef) {.async.} =
|
|
## Fetch accounts and store them in the database.
|
|
let
|
|
ctx = buddy.ctx
|
|
peer = buddy.peer
|
|
env = buddy.data.pivotEnv
|
|
stateRoot = env.stateHeader.stateRoot
|
|
|
|
# Get a range of accounts to fetch from
|
|
let iv = block:
|
|
let rc = buddy.getUnprocessed()
|
|
if rc.isErr:
|
|
when extraTraceMessages:
|
|
trace logTxt "currently all processed", peer, stateRoot
|
|
return
|
|
rc.value
|
|
|
|
# Process received accounts and stash storage slots to fetch later
|
|
let dd = block:
|
|
let rc = await buddy.getAccountRange(stateRoot, iv)
|
|
if rc.isErr:
|
|
buddy.putUnprocessed(iv) # fail => interval back to pool
|
|
let error = rc.error
|
|
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors):
|
|
when extraTraceMessages:
|
|
trace logTxt "fetch error => stop", peer,
|
|
stateRoot, req=iv.len, error
|
|
return
|
|
# Reset error counts for detecting repeated timeouts
|
|
buddy.data.errors.nTimeouts = 0
|
|
rc.value
|
|
|
|
let
|
|
gotAccounts = dd.data.accounts.len
|
|
gotStorage = dd.withStorage.len
|
|
|
|
when extraTraceMessages:
|
|
trace logTxt "fetched", peer, gotAccounts, gotStorage,
|
|
stateRoot, req=iv.len, got=dd.consumed
|
|
|
|
block:
|
|
let rc = ctx.data.snapDb.importAccounts(peer, stateRoot, iv.minPt, dd.data)
|
|
if rc.isErr:
|
|
# Bad data, just try another peer
|
|
buddy.putUnprocessed(iv)
|
|
buddy.ctrl.zombie = true
|
|
when extraTraceMessages:
|
|
trace logTxt "import failed => stop", peer, gotAccounts, gotStorage,
|
|
stateRoot, req=iv.len, got=dd.consumed, error=rc.error
|
|
return
|
|
|
|
# Statistics
|
|
env.nAccounts.inc(gotAccounts)
|
|
|
|
# Register consumed intervals on the accumulator over all state roots
|
|
buddy.markGloballyProcessed(dd.consumed)
|
|
|
|
# Register consumed and bulk-imported (well, not yet) accounts range
|
|
block registerConsumed:
|
|
block:
|
|
# Both intervals `min(iv)` and `min(dd.consumed)` are equal
|
|
let rc = iv - dd.consumed
|
|
if rc.isOk:
|
|
# Now, `dd.consumed` < `iv`, return some unused range
|
|
buddy.putUnprocessed(rc.value)
|
|
break registerConsumed
|
|
block:
|
|
# The processed interval might be a bit larger
|
|
let rc = dd.consumed - iv
|
|
if rc.isOk:
|
|
# Remove from unprocessed data. If it is not unprocessed, anymore
|
|
# then it was doubly processed which is ok.
|
|
buddy.delUnprocessed(rc.value)
|
|
break registerConsumed
|
|
# End registerConsumed
|
|
|
|
# Store accounts on the storage TODO list.
|
|
env.fetchStorage.merge dd.withStorage
|
|
|
|
when extraTraceMessages:
|
|
trace logTxt "done", peer, gotAccounts, gotStorage,
|
|
stateRoot, req=iv.len, got=dd.consumed
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# End
|
|
# ------------------------------------------------------------------------------
|