Prep for full sync after snap make 3 (#1270)

* For snap sync, publish `EthWireRef` in sync descriptor

why:
  currently used for noise control

* Detect and reuse existing storage slots

* Provide healing module for storage slots

* Update statistic ticker (adding range factor for unprocessed storage)

* Complete mere function for work item ranges

why:
  Merging interval into existing partial item was missing

* Show av storage queue lengths in ticker

detail;
  Previous attempt shows average completeness which did not tell much

* Correct the meaning of the storage counter (per pivot)

detail:
  Is the # accounts that have a storage saved
This commit is contained in:
Jordan Hrycaj 2022-10-19 11:04:06 +01:00 committed by GitHub
parent a48cc04ea7
commit 85fdb61699
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1049 additions and 208 deletions

View File

@ -3,6 +3,7 @@ import
eth/[common, p2p, trie/db],
./types,
./protocol/eth/eth_types,
./protocol/trace_config, # gossip noise control
../db/db_chain,
../p2p/chain,
../utils/tx_pool
@ -12,6 +13,7 @@ type
db: BaseChainDB
chain: Chain
txPool: TxPoolRef
disablePool: bool
proc new*(_: type EthWireRef, chain: Chain, txPool: TxPoolRef): EthWireRef =
EthWireRef(
@ -20,9 +22,15 @@ proc new*(_: type EthWireRef, chain: Chain, txPool: TxPoolRef): EthWireRef =
txPool: txPool
)
proc notEnabled(name: string) =
debug "Wire handler method is disabled", meth = name
proc notImplemented(name: string) =
debug "Wire handler method not implemented", meth = name
method poolEnabled*(ctx: EthWireRef; ena: bool) =
ctx.disablePool = not ena
method getStatus*(ctx: EthWireRef): EthState {.gcsafe.} =
let
db = ctx.db
@ -113,10 +121,15 @@ method getBlockHeaders*(ctx: EthWireRef, req: BlocksRequest): seq[BlockHeader] {
result.add foundBlock
method handleAnnouncedTxs*(ctx: EthWireRef, peer: Peer, txs: openArray[Transaction]) {.gcsafe.} =
ctx.txPool.jobAddTxs(txs)
if ctx.disablePool:
when trMissingOrDisabledGossipOk:
notEnabled("handleAnnouncedTxs")
else:
ctx.txPool.jobAddTxs(txs)
method handleAnnouncedTxsHashes*(ctx: EthWireRef, peer: Peer, txHashes: openArray[Hash256]) {.gcsafe.} =
notImplemented("handleAnnouncedTxsHashes")
when trMissingOrDisabledGossipOk:
notImplemented("handleAnnouncedTxsHashes")
method handleNewBlock*(ctx: EthWireRef, peer: Peer, blk: EthBlock, totalDifficulty: DifficultyInt) {.gcsafe.} =
notImplemented("handleNewBlock")

View File

@ -13,7 +13,7 @@ const
# Some static noisy settings for `eth` debugging
trEthTracePacketsOk* = true
## `trace` log each sync network message.
trEthTraceGossipOk* = true
trEthTraceGossipOk* = true and false
## `trace` log each sync network message.
trEthTraceHandshakesOk* = true
## `trace` log each network handshake message.
@ -24,7 +24,8 @@ const
trSnapTracePacketsOk* = true
## `trace` log each sync network message.
# The files and lines clutter differently when sync tracing is enabled.
# publicLogScope: chroniclesLineNumbers=false
# Shut up particular eth context handler gossip
trMissingOrDisabledGossipOk* = true and false
## Control `handleAnnouncedTxsHashes`, `handleAnnouncedTxsHashes`, etc.
# End

View File

@ -15,7 +15,7 @@ import
../db/select_backend,
../p2p/chain,
./snap/[worker, worker_desc],
"."/[sync_desc, sync_sched, protocol]
"."/[protocol, sync_desc, sync_sched]
{.push raises: [Defect].}
@ -67,6 +67,8 @@ proc init*(
result.ctx.chain = chain # explicitely override
result.ctx.data.rng = rng
result.ctx.data.dbBackend = dbBackend
# Required to have been initialised via `addCapability()`
doAssert not result.ctx.ethWireCtx.isNil
proc start*(ctx: SnapSyncRef) =
doAssert ctx.startSync()

View File

@ -23,14 +23,13 @@ type
ByteArray32* = array[32,byte]
## Used for 32 byte database keys
NodeTag* = ##\
## Trie leaf item, account hash etc.
distinct UInt256
NodeKey* = distinct ByteArray32
## Hash key without the hash wrapper (as opposed to `NodeTag` which is a
## number)
NodeTag* = distinct UInt256
## Trie leaf item, account hash etc.
NodeTagRange* = Interval[NodeTag,UInt256]
## Interval `[minPt,maxPt]` of` NodeTag` elements, can be managed in an
## `IntervalSet` data type.
@ -93,6 +92,10 @@ proc to*(hash: Hash256; T: type NodeKey): T =
## Syntactic sugar
hash.data.NodeKey
proc to*(key: NodeKey; T: type Hash256): T =
## Syntactic sugar
T(data: key.ByteArray32)
proc to*(key: NodeKey; T: type Blob): T =
## Syntactic sugar
key.ByteArray32.toSeq
@ -101,6 +104,15 @@ proc to*(n: SomeUnsignedInt|UInt256; T: type NodeTag): T =
## Syntactic sugar
n.u256.T
proc hash*(a: NodeKey): Hash =
## Table/KeyedQueue mixin
a.ByteArray32.hash
proc `==`*(a, b: NodeKey): bool =
## Table/KeyedQueue mixin
a.ByteArray32 == b.ByteArray32
# ------------------------------------------------------------------------------
# Public constructors
# ------------------------------------------------------------------------------
@ -194,7 +206,7 @@ proc emptyFactor*(lrs: openArray[NodeTagRangeSet]): float =
## Variant of `emptyFactor()` where intervals are distributed across several
## sets. This function makes sense only if the interval sets are mutually
## disjunct.
var accu: Nodetag
var accu: NodeTag
for ivSet in lrs:
if 0 < ivSet.total:
if high(NodeTag) - ivSet.total < accu:
@ -208,6 +220,7 @@ proc emptyFactor*(lrs: openArray[NodeTagRangeSet]): float =
return 1.0
((high(NodeTag) - accu).u256 + 1).to(float) / (2.0^256)
proc fullFactor*(lrs: NodeTagRangeSet): float =
## Relative covered total, i.e. `#points-covered / 2^256` to be used
## in statistics or triggers
@ -218,6 +231,24 @@ proc fullFactor*(lrs: NodeTagRangeSet): float =
else:
1.0 # number of points in `lrs` is `2^256 + 1`
proc fullFactor*(lrs: openArray[NodeTagRangeSet]): float =
## Variant of `fullFactor()` where intervals are distributed across several
## sets. This function makes sense only if the interval sets are mutually
## disjunct.
var accu: NodeTag
for ivSet in lrs:
if 0 < ivSet.total:
if high(NodeTag) - ivSet.total < accu:
return 1.0
accu = accu + ivSet.total
elif ivSet.chunks == 0:
discard
else: # number of points in `ivSet` is `2^256 + 1`
return 1.0
if accu == 0.to(NodeTag):
return 0.0
accu.u256.to(float) / (2.0^256)
# ------------------------------------------------------------------------------
# Public functions: printing & pretty printing
# ------------------------------------------------------------------------------

View File

@ -9,14 +9,15 @@
# except according to those terms.
import
std/[hashes, math, options, sets],
std/[hashes, math, options, sets, strutils],
chronicles,
chronos,
eth/[common/eth_types, p2p],
stew/[interval_set, keyed_queue],
../../db/select_backend,
".."/[protocol, sync_desc],
./worker/[heal_accounts, store_accounts, store_storages, ticker],
".."/[handlers, protocol, sync_desc],
./worker/[heal_accounts, heal_storages, store_accounts, store_storages,
ticker],
./worker/com/[com_error, get_block_header],
./worker/db/snapdb_desc,
"."/[range_desc, worker_desc]
@ -167,14 +168,6 @@ proc appendPivotEnv(buddy: SnapBuddyRef; header: BlockHeader) =
# Append per-state root environment to LRU queue
discard ctx.data.pivotTable.lruAppend(header.stateRoot, env, ctx.buddiesMax)
# Debugging, will go away
block:
let ivSet = env.fetchAccounts.unprocessed[0].clone
for iv in env.fetchAccounts.unprocessed[1].increasing:
doAssert ivSet.merge(iv) == iv.len
doAssert ivSet.chunks == 1
doAssert ivSet.total == 0
proc updatePivotImpl(buddy: SnapBuddyRef): Future[bool] {.async.} =
## Helper, negotiate pivot unless present
@ -233,7 +226,7 @@ else:
proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater =
result = proc: TickerStats =
var
aSum, aSqSum, uSum, uSqSum, sSum, sSqSum: float
aSum, aSqSum, uSum, uSqSum, sSum, sSqSum, wSum, wSqSum: float
count = 0
for kvp in ctx.data.pivotTable.nextPairs:
@ -253,6 +246,16 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater =
sSum += sLen
sSqSum += sLen * sLen
# Storage queue size for that account
var stoFill: float
for stoKvp in kvp.data.fetchStorage.nextPairs:
if stoKvp.data.slots.isNil:
stoFill += 1.0
else:
stoFill += stoKvp.data.slots.unprocessed.fullFactor
wSum += stoFill
wSqSum += stoFill * stoFill
let
env = ctx.data.pivotTable.lastValue.get(otherwise = nil)
pivotBlock = if env.isNil: none(BlockNumber)
@ -265,7 +268,8 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater =
nQueues: ctx.data.pivotTable.len,
nAccounts: meanStdDev(aSum, aSqSum, count),
nStorage: meanStdDev(sSum, sSqSum, count),
accountsFill: (accFill[0], accFill[1], accCoverage))
accountsFill: (accFill[0], accFill[1], accCoverage),
storageQueue: meanStdDev(wSum, wSqSum, count))
# ------------------------------------------------------------------------------
# Public start/stop and admin functions
@ -273,6 +277,8 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater =
proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool =
## Global set up
noExceptionOops("worker.setup()"):
ctx.ethWireCtx.poolEnabled(false)
ctx.data.coveredAccounts = NodeTagRangeSet.init()
ctx.data.snapDb =
if ctx.data.dbBackend.isNil: SnapDbRef.init(ctx.chain.db.db)
@ -412,18 +418,18 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} =
await buddy.storeStorages()
if buddy.ctrl.stopped: return
# Pivot might have changed, so restart with the latest one
if env != ctx.data.pivotTable.lastValue.value: return
# If the current database is not complete yet
if 0 < env.fetchAccounts.unprocessed[0].chunks or
0 < env.fetchAccounts.unprocessed[1].chunks:
# Healing applies to the latest pivot only. The pivot might have changed
# in the background (while netwoking) due to a new peer worker that has
# negotiated another, newer pivot.
if env == ctx.data.pivotTable.lastValue.value:
await buddy.healAccountsDb()
if buddy.ctrl.stopped: return
await buddy.healAccountsDb()
if buddy.ctrl.stopped: return
# TODO: use/apply storage healer
await buddy.healStoragesDb()
if buddy.ctrl.stopped: return
# Check whether accounts might be complete.
if env.fetchStorage.len == 0:

View File

@ -146,9 +146,10 @@ type
repairKeyGen*: uint64 ## Unique tmp key generator
keyPp*: HexaryPpFn ## For debugging, might go away
HexaryGetFn* = proc(key: Blob): Blob {.gcsafe.}
## Persistent database get() function. For read-only cacses, this function
## can be seen as the persistent alternative to `HexaryTreeDbRef`.
HexaryGetFn* = proc(key: openArray[byte]): Blob {.gcsafe.}
## Persistent database `get()` function. For read-only cases, this function
## can be seen as the persistent alternative to ``tab[]` on a
## `HexaryTreeDbRef` descriptor.
HexaryNodeReport* = object
## Return code for single node operations
@ -357,18 +358,10 @@ proc newRepairKey*(db: HexaryTreeDbRef): RepairKey =
# Public functions
# ------------------------------------------------------------------------------
proc hash*(a: NodeKey): Hash =
## Tables mixin
a.ByteArray32.hash
proc hash*(a: RepairKey): Hash =
## Tables mixin
a.ByteArray33.hash
proc `==`*(a, b: NodeKey): bool =
## Tables mixin
a.ByteArray32 == b.ByteArray32
proc `==`*(a, b: RepairKey): bool =
## Tables mixin
a.ByteArray33 == b.ByteArray33

View File

@ -24,6 +24,7 @@ logScope:
type
SnapDbAccountsRef* = ref object of SnapDbBaseRef
peer: Peer ## For log messages
getFn: HexaryGetFn ## Persistent database `get()` closure
const
@ -140,8 +141,9 @@ proc init*(
## Constructor, starts a new accounts session.
let db = pv.kvDb
new result
result.init(pv, root.to(NodeKey), peer)
result.getFn = proc(key: Blob): Blob = db.get(key)
result.init(pv, root.to(NodeKey))
result.peer = peer
result.getFn = proc(key: openArray[byte]): Blob = db.get(key)
proc dup*(
ps: SnapDbAccountsRef;
@ -182,7 +184,7 @@ proc importAccounts*(
var accounts: seq[RLeafSpecs]
try:
if 0 < data.proof.len:
let rc = ps.mergeProofs(ps.root, data.proof)
let rc = ps.mergeProofs(ps.peer, ps.root, data.proof)
if rc.isErr:
return err(rc.error)
block:
@ -227,7 +229,7 @@ proc importAccounts*(
pv, root, peer).importAccounts(base, data, persistent=true)
proc importRawAccountNodes*(
proc importRawAccountsNodes*(
ps: SnapDbAccountsRef; ## Re-usable session descriptor
nodes: openArray[Blob]; ## Node records
reportNodes = {Leaf}; ## Additional node types to report
@ -293,7 +295,7 @@ proc importRawAccountNodes*(
if nErrors == 0:
trace "Raw account nodes imported", peer, slot, nItems, report=result.len
proc importRawAccountNodes*(
proc importRawAccountsNodes*(
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer, ## For log messages, only
nodes: openArray[Blob]; ## Node records
@ -301,7 +303,7 @@ proc importRawAccountNodes*(
): seq[HexaryNodeReport] =
## Variant of `importRawNodes()` for persistent storage.
SnapDbAccountsRef.init(
pv, Hash256(), peer).importRawAccountNodes(
pv, Hash256(), peer).importRawAccountsNodes(
nodes, reportNodes, persistent=true)
@ -334,6 +336,8 @@ proc inspectAccountsTrie*(
break checkForError
trace "Inspect account trie failed", peer, nPathList=pathList.len,
nDangling=stats.dangling.len, stoppedAt=stats.level, error
if ignoreError:
return ok(stats)
return err(error)
when extraTraceMessages:
@ -353,7 +357,7 @@ proc inspectAccountsTrie*(
pv, root, peer).inspectAccountsTrie(pathList, persistent=true, ignoreError)
proc getAccountNodeKey*(
proc getAccountsNodeKey*(
ps: SnapDbAccountsRef; ## Re-usable session descriptor
path: Blob; ## Partial node path
persistent = false; ## Read data from disk
@ -369,18 +373,18 @@ proc getAccountNodeKey*(
return ok(rc.value)
err(NodeNotFound)
proc getAccountNodeKey*(
proc getAccountsNodeKey*(
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer; ## For log messages, only
root: Hash256; ## state root
path: Blob; ## Partial node path
): Result[NodeKey,HexaryDbError] =
## Variant of `inspectAccountsPath()` for persistent storage.
## Variant of `getAccountsNodeKey()` for persistent storage.
SnapDbAccountsRef.init(
pv, root, peer).getAccountNodeKey(path, persistent=true)
pv, root, peer).getAccountsNodeKey(path, persistent=true)
proc getAccountData*(
proc getAccountsData*(
ps: SnapDbAccountsRef; ## Re-usable session descriptor
path: NodeKey; ## Account to visit
persistent = false; ## Read data from disk
@ -404,14 +408,15 @@ proc getAccountData*(
return ok(acc)
proc getAccountData*(
proc getAccountsData*(
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer, ## For log messages, only
root: Hash256; ## state root
peer: Peer; ## For log messages, only
root: Hash256; ## State root
path: NodeKey; ## Account to visit
): Result[Account,HexaryDbError] =
## Variant of `getAccount()` for persistent storage.
SnapDbAccountsRef.init(pv, root, peer).getAccountData(path, persistent=true)
## Variant of `getAccountsData()` for persistent storage.
SnapDbAccountsRef.init(
pv, root, peer).getAccountsData(path, persistent=true)
# ------------------------------------------------------------------------------
# Public functions: additional helpers
@ -439,16 +444,16 @@ proc sortMerge*(acc: openArray[seq[PackedAccount]]): seq[PackedAccount] =
accounts[item.accHash.to(NodeTag)] = item
result = toSeq(accounts.keys).sorted(cmp).mapIt(accounts[it])
proc getChainDbAccount*(
proc getAccountsChainDb*(
ps: SnapDbAccountsRef;
accHash: Hash256
accHash: Hash256;
): Result[Account,HexaryDbError] =
## Fetch account via `BaseChainDB`
ps.getAccountData(accHash.to(NodeKey),persistent=true)
ps.getAccountsData(accHash.to(NodeKey),persistent=true)
proc nextChainDbKey*(
proc nextAccountsChainDbKey*(
ps: SnapDbAccountsRef;
accHash: Hash256
accHash: Hash256;
): Result[Hash256,HexaryDbError] =
## Fetch the account path on the `BaseChainDB`, the one next to the
## argument account.
@ -462,9 +467,9 @@ proc nextChainDbKey*(
err(AccountNotFound)
proc prevChainDbKey*(
proc prevAccountsChainDbKey*(
ps: SnapDbAccountsRef;
accHash: Hash256
accHash: Hash256;
): Result[Hash256,HexaryDbError] =
## Fetch the account path on the `BaseChainDB`, the one before to the
## argument account.

View File

@ -33,7 +33,6 @@ type
## Session descriptor
xDb: HexaryTreeDbRef ## Hexary database
base: SnapDbRef ## Back reference to common parameters
peer*: Peer ## For log messages
root*: NodeKey ## Session DB root node key
# ------------------------------------------------------------------------------
@ -126,7 +125,6 @@ proc init*(
peer: Peer = nil) =
## Session base constructor
ps.base = pv
ps.peer = peer
ps.root = root
ps.xDb = HexaryTreeDbRef.init(pv)
@ -137,7 +135,7 @@ proc init*(
peer: Peer = nil): T =
## Variant of session base constructor
new result
result.init(ps.base, root, peer)
result.init(ps.base, root)
# ------------------------------------------------------------------------------
# Public getters
@ -173,6 +171,7 @@ proc dbBackendRocksDb*(ps: SnapDbBaseRef): bool =
proc mergeProofs*(
ps: SnapDbBaseRef; ## Session database
peer: Peer; ## For log messages
root: NodeKey; ## Root for checking nodes
proof: seq[Blob]; ## Node records
freeStandingOk = false; ## Remove freestanding nodes
@ -183,7 +182,6 @@ proc mergeProofs*(
## trie at a later stage and used for validating account data.
let
db = ps.hexaDb
peer = ps.peer
var
nodes: HashSet[RepairKey]
refs = @[root.to(RepairKey)].toHashSet

View File

@ -9,12 +9,13 @@
# except according to those terms.
import
std/[tables],
std/tables,
chronicles,
eth/[common/eth_types, p2p],
eth/[common, p2p, rlp, trie/db],
../../../protocol,
../../range_desc,
"."/[bulk_storage, hexary_desc, hexary_error, hexary_interpolate, snapdb_desc]
"."/[bulk_storage, hexary_desc, hexary_error, hexary_import, hexary_inspect,
hexary_interpolate, hexary_paths, snapdb_desc]
{.push raises: [Defect].}
@ -25,8 +26,13 @@ const
extraTraceMessages = false or true
type
GetAccFn = proc(accHash: Hash256, key: openArray[byte]): Blob {.gcsafe.}
## The `get()` function for the storage trie depends on the current account
SnapDbStorageSlotsRef* = ref object of SnapDbBaseRef
accHash*: Hash256 ## Accounts address hash (curr.unused)
peer: Peer ## For log messages
accHash: Hash256 ## Accounts address hash (curr.unused)
getAccFn: GetAccFn ## Persistent database `get()` closure
# ------------------------------------------------------------------------------
# Private helpers
@ -38,6 +44,11 @@ proc to(h: Hash256; T: type NodeKey): T =
proc convertTo(data: openArray[byte]; T: type Hash256): T =
discard result.data.NodeKey.init(data) # size error => zero
proc getAccCls(ps: SnapDbStorageSlotsRef; accHash: Hash256): HexaryGetFn =
## Fix `accHash` argument in `GetAccFn` closure => `HexaryGetFn`
result = proc(key: openArray[byte]): Blob = ps.getAccFn(accHash,key)
template noKeyError(info: static[string]; code: untyped) =
try:
code
@ -56,11 +67,21 @@ template noRlpExceptionOops(info: static[string]; code: untyped) =
except Exception as e:
raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg
template noGenericExOrKeyError(info: static[string]; code: untyped) =
try:
code
except KeyError as e:
raiseAssert "Not possible (" & info & "): " & e.msg
except Defect as e:
raise e
except Exception as e:
raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc persistentStorages(
proc persistentStorageSlots(
db: HexaryTreeDbRef; ## Current table
ps: SnapDbStorageSlotsRef; ## For persistent database
): Result[void,HexaryDbError]
@ -116,7 +137,7 @@ proc importStorageSlots(
var
slots: seq[RLeafSpecs]
if 0 < proof.len:
let rc = tmpDb.mergeProofs(root, proof)
let rc = tmpDb.mergeProofs(ps.peer, root, proof)
if rc.isErr:
return err(rc.error)
block:
@ -145,20 +166,27 @@ proc importStorageSlots(
proc init*(
T: type SnapDbStorageSlotsRef;
pv: SnapDbRef;
account = Hash256();
root = Hash256();
account: Hash256;
root: Hash256;
peer: Peer = nil
): T =
## Constructor, starts a new accounts session.
let db = pv.kvDb
new result
result.init(pv, root.to(NodeKey), peer)
result.init(pv, root.to(NodeKey))
result.peer = peer
result.accHash = account
# At the moment, the resulting `getAccFn()` is independent of `accHash`
result.getAccFn = proc(accHash: Hash256, key: openArray[byte]): Blob =
db.get(key)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc importStorages*(
proc importStorageSlots*(
ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor
data: AccountStorageRange; ## Account storage reply from `snap/1` protocol
persistent = false; ## store data on disk
@ -210,7 +238,7 @@ proc importStorages*(
# Store to disk
if persistent and 0 < ps.hexaDb.tab.len:
slot = none(int)
let rc = ps.hexaDb.persistentStorages(ps)
let rc = ps.hexaDb.persistentStorageSlots(ps)
if rc.isErr:
result.add HexaryNodeReport(slot: slot, error: rc.error)
@ -231,14 +259,236 @@ proc importStorages*(
trace "Storage slots imported", peer, nItems,
slots=data.storages.len, proofs=data.proof.len
proc importStorages*(
proc importStorageSlots*(
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer, ## For log messages, only
peer: Peer; ## For log messages, only
data: AccountStorageRange; ## Account storage reply from `snap/1` protocol
): seq[HexaryNodeReport] =
## Variant of `importStorages()`
SnapDbStorageSlotsRef.init(
pv, peer=peer).importStorages(data, persistent=true)
pv, Hash256(), Hash256(), peer).importStorageSlots(data, persistent=true)
proc importRawStorageSlotsNodes*(
ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor
nodes: openArray[Blob]; ## Node records
reportNodes = {Leaf}; ## Additional node types to report
persistent = false; ## store data on disk
): seq[HexaryNodeReport] =
## Store data nodes given as argument `nodes` on the persistent database.
##
## If there were an error when processing a particular argument `notes` item,
## it will be reported with the return value providing argument slot/index,
## node type, end error code.
##
## If there was an error soring persistent data, the last report item will
## have an error code, only.
##
## Additional node items might be reported if the node type is in the
## argument set `reportNodes`. These reported items will have no error
## code set (i.e. `NothingSerious`.)
##
let
peer = ps.peer
db = HexaryTreeDbRef.init(ps)
nItems = nodes.len
var
nErrors = 0
slot: Option[int]
try:
# Import nodes
for n,rec in nodes:
if 0 < rec.len: # otherwise ignore empty placeholder
slot = some(n)
var rep = db.hexaryImport(rec)
if rep.error != NothingSerious:
rep.slot = slot
result.add rep
nErrors.inc
trace "Error importing storage slots nodes", peer, inx=n, nItems,
error=rep.error, nErrors
elif rep.kind.isSome and rep.kind.unsafeGet in reportNodes:
rep.slot = slot
result.add rep
# Store to disk
if persistent and 0 < db.tab.len:
slot = none(int)
let rc = db.persistentStorageSlots(ps)
if rc.isErr:
result.add HexaryNodeReport(slot: slot, error: rc.error)
except RlpError:
result.add HexaryNodeReport(slot: slot, error: RlpEncoding)
nErrors.inc
trace "Error importing storage slots nodes", peer, slot, nItems,
error=RlpEncoding, nErrors
except KeyError as e:
raiseAssert "Not possible @ importRawSorageSlotsNodes: " & e.msg
except OSError as e:
result.add HexaryNodeReport(slot: slot, error: OSErrorException)
nErrors.inc
trace "Import storage slots nodes exception", peer, slot, nItems,
name=($e.name), msg=e.msg, nErrors
when extraTraceMessages:
if nErrors == 0:
trace "Raw storage slots nodes imported", peer, slot, nItems,
report=result.len
proc importRawStorageSlotsNodes*(
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer, ## For log messages, only
accHash: Hash256; ## Account key
nodes: openArray[Blob]; ## Node records
reportNodes = {Leaf}; ## Additional node types to report
): seq[HexaryNodeReport] =
## Variant of `importRawNodes()` for persistent storage.
SnapDbStorageSlotsRef.init(
pv, accHash, Hash256(), peer).importRawStorageSlotsNodes(
nodes, reportNodes, persistent=true)
proc inspectStorageSlotsTrie*(
ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor
pathList = seq[Blob].default; ## Starting nodes for search
persistent = false; ## Read data from disk
ignoreError = false; ## Always return partial results if available
): Result[TrieNodeStat, HexaryDbError] =
## Starting with the argument list `pathSet`, find all the non-leaf nodes in
## the hexary trie which have at least one node key reference missing in
## the trie database. Argument `pathSet` list entries that do not refer to a
## valid node are silently ignored.
##
let peer = ps.peer
var stats: TrieNodeStat
noRlpExceptionOops("inspectStorageSlotsTrie()"):
if persistent:
stats = ps.getAccCls(ps.accHash).hexaryInspectTrie(ps.root, pathList)
else:
stats = ps.hexaDb.hexaryInspectTrie(ps.root, pathList)
block checkForError:
let error = block:
if stats.stopped:
TrieLoopAlert
elif stats.level == 0:
TrieIsEmpty
else:
break checkForError
trace "Inspect storage slots trie failed", peer, nPathList=pathList.len,
nDangling=stats.dangling.len, stoppedAt=stats.level, error
if ignoreError:
return ok(stats)
return err(error)
when extraTraceMessages:
trace "Inspect storage slots trie ok", peer, nPathList=pathList.len,
nDangling=stats.dangling.len, level=stats.level
return ok(stats)
proc inspectStorageSlotsTrie*(
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer; ## For log messages, only
accHash: Hash256; ## Account key
root: Hash256; ## state root
pathList = seq[Blob].default; ## Starting paths for search
ignoreError = false; ## Always return partial results when avail.
): Result[TrieNodeStat, HexaryDbError] =
## Variant of `inspectStorageSlotsTrieTrie()` for persistent storage.
SnapDbStorageSlotsRef.init(
pv, accHash, root, peer).inspectStorageSlotsTrie(
pathList, persistent=true, ignoreError)
proc getStorageSlotsNodeKey*(
ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor
path: Blob; ## Partial node path
persistent = false; ## Read data from disk
): Result[NodeKey,HexaryDbError] =
## For a partial node path argument `path`, return the raw node key.
var rc: Result[NodeKey,void]
noRlpExceptionOops("inspectAccountsPath()"):
if persistent:
rc = ps.getAccCls(ps.accHash).hexaryInspectPath(ps.root, path)
else:
rc = ps.hexaDb.hexaryInspectPath(ps.root, path)
if rc.isOk:
return ok(rc.value)
err(NodeNotFound)
proc getStorageSlotsNodeKey*(
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer; ## For log messages, only
accHash: Hash256; ## Account key
root: Hash256; ## state root
path: Blob; ## Partial node path
): Result[NodeKey,HexaryDbError] =
## Variant of `getStorageSlotsNodeKey()` for persistent storage.
SnapDbStorageSlotsRef.init(
pv, accHash, root, peer).getStorageSlotsNodeKey(path, persistent=true)
proc getStorageSlotsData*(
ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor
path: NodeKey; ## Account to visit
persistent = false; ## Read data from disk
): Result[Account,HexaryDbError] =
## Fetch storage slots data.
##
## Caveat: There is no unit test yet
let peer = ps.peer
var acc: Account
noRlpExceptionOops("getStorageSlotsData()"):
var leaf: Blob
if persistent:
leaf = path.hexaryPath(ps.root, ps.getAccCls(ps.accHash)).leafData
else:
leaf = path.hexaryPath(ps.root.to(RepairKey),ps.hexaDb).leafData
if leaf.len == 0:
return err(AccountNotFound)
acc = rlp.decode(leaf,Account)
return ok(acc)
proc getStorageSlotsData*(
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer, ## For log messages, only
accHash: Hash256; ## Account key
root: Hash256; ## state root
path: NodeKey; ## Account to visit
): Result[Account,HexaryDbError] =
## Variant of `getStorageSlotsData()` for persistent storage.
SnapDbStorageSlotsRef.init(
pv, accHash, root, peer).getStorageSlotsData(path, persistent=true)
proc haveStorageSlotsData*(
ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor
persistent = false; ## Read data from disk
): bool =
## Return `true` if there is at least one intermediate hexary node for this
## accounts storage slots trie.
##
## Caveat: There is no unit test yet
noGenericExOrKeyError("haveStorageSlotsData()"):
if persistent:
let getFn = ps.getAccCls(ps.accHash)
return 0 < ps.root.ByteArray32.getFn().len
else:
return ps.hexaDb.tab.hasKey(ps.root.to(RepairKey))
proc haveStorageSlotsData*(
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer, ## For log messages, only
accHash: Hash256; ## Account key
root: Hash256; ## state root
): bool =
## Variant of `haveStorageSlotsData()` for persistent storage.
SnapDbStorageSlotsRef.init(
pv, accHash, root, peer).haveStorageSlotsData(persistent=true)
# ------------------------------------------------------------------------------
# End

View File

@ -110,7 +110,7 @@ import
chronos,
eth/[common/eth_types, p2p, trie/nibbles, trie/trie_defs, rlp],
stew/[interval_set, keyed_queue],
../../../utils/prettify,
../../../utils/prettify,
../../sync_desc,
".."/[range_desc, worker_desc],
./com/[com_error, get_trie_nodes],
@ -150,6 +150,7 @@ proc updateMissingNodesList(buddy: SnapBuddyRef) =
## time. These nodes will me moved to `checkNodes` for further processing.
let
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
env = buddy.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
@ -160,7 +161,7 @@ proc updateMissingNodesList(buddy: SnapBuddyRef) =
trace "Start accounts healing", peer, ctx=buddy.healingCtx()
for accKey in env.fetchAccounts.missingNodes:
let rc = ctx.data.snapDb.getAccountNodeKey(peer, stateRoot, accKey)
let rc = db.getAccountsNodeKey(peer, stateRoot, accKey)
if rc.isOk:
# Check nodes for dangling links
env.fetchAccounts.checkNodes.add accKey
@ -177,12 +178,12 @@ proc appendMoreDanglingNodesToMissingNodesList(buddy: SnapBuddyRef): bool =
## fed back to the vey same list `checkNodes`
let
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
env = buddy.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
rc = ctx.data.snapDb.inspectAccountsTrie(
peer, stateRoot, env.fetchAccounts.checkNodes)
rc = db.inspectAccountsTrie(peer, stateRoot, env.fetchAccounts.checkNodes)
if rc.isErr:
when extraTraceMessages:
@ -190,7 +191,7 @@ proc appendMoreDanglingNodesToMissingNodesList(buddy: SnapBuddyRef): bool =
ctx=buddy.healingCtx(), error=rc.error
# Attempt to switch peers, there is not much else we can do here
buddy.ctrl.zombie = true
return
return false
# Global/env batch list to be replaced by by `rc.value.leaves` return value
env.fetchAccounts.checkNodes.setLen(0)
@ -257,7 +258,6 @@ proc kvAccountLeaf(
## Read leaf node from persistent database (if any)
let
peer = buddy.peer
env = buddy.data.pivotEnv
nodeRlp = rlpFromBytes node
(_,prefix) = hexPrefixDecode partialPath
@ -277,7 +277,7 @@ proc registerAccountLeaf(
accKey: NodeKey;
acc: Account) =
## Process single account node as would be done with an interval by
## the `storeAccounts()` functoon
## the `storeAccounts()` function
let
peer = buddy.peer
env = buddy.data.pivotEnv
@ -304,7 +304,8 @@ proc registerAccountLeaf(
storageRoot: acc.storageRoot)
when extraTraceMessages:
trace "Isolated node for healing", peer, ctx=buddy.healingCtx(), accKey=pt
trace "Isolated account for healing", peer,
ctx=buddy.healingCtx(), accKey=pt
# ------------------------------------------------------------------------------
# Public functions
@ -314,6 +315,7 @@ proc healAccountsDb*(buddy: SnapBuddyRef) {.async.} =
## Fetching and merging missing account trie database nodes.
let
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
env = buddy.data.pivotEnv
@ -353,7 +355,7 @@ proc healAccountsDb*(buddy: SnapBuddyRef) {.async.} =
return
# Store nodes to disk
let report = ctx.data.snapDb.importRawAccountNodes(peer, nodesData)
let report = db.importRawAccountsNodes(peer, nodesData)
if 0 < report.len and report[^1].slot.isNone:
# Storage error, just run the next lap (not much else that can be done)
error "Accounts healing, error updating persistent database", peer,

View File

@ -0,0 +1,400 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Heal storage DB:
## ================
##
## This module works similar to `heal_accounts` applied to each
## per-account storage slots hexary trie.
import
std/sequtils,
chronicles,
chronos,
eth/[common/eth_types, p2p, trie/nibbles, trie/trie_defs, rlp],
stew/[interval_set, keyed_queue],
../../../utils/prettify,
../../sync_desc,
".."/[range_desc, worker_desc],
./com/[com_error, get_trie_nodes],
./db/[hexary_desc, hexary_error, snapdb_storage_slots]
{.push raises: [Defect].}
logScope:
topics = "snap-heal"
const
extraTraceMessages = false or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private logging helpers
# ------------------------------------------------------------------------------
proc healingCtx(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
): string =
let
slots = kvp.data.slots
"[" &
"covered=" & slots.unprocessed.emptyFactor.toPC(0) &
"nCheckNodes=" & $slots.checkNodes.len & "," &
"nMissingNodes=" & $slots.missingNodes.len & "]"
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc updateMissingNodesList(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair) =
## Check whether previously missing nodes from the `missingNodes` list
## have been magically added to the database since it was checked last
## time. These nodes will me moved to `checkNodes` for further processing.
let
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
accHash = kvp.data.accHash
storageRoot = kvp.key.to(Hash256)
slots = kvp.data.slots
var
nodes: seq[Blob]
when extraTraceMessages:
trace "Start storage slots healing", peer, ctx=buddy.healingCtx(kvp)
for slotKey in slots.missingNodes:
let rc = db.getStorageSlotsNodeKey(peer, accHash, storageRoot, slotKey)
if rc.isOk:
# Check nodes for dangling links
slots.checkNodes.add slotKey
else:
# Node is still missing
nodes.add slotKey
slots.missingNodes = nodes
proc appendMoreDanglingNodesToMissingNodesList(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
): bool =
## Starting with a given set of potentially dangling intermediate trie nodes
## `checkNodes`, this set is filtered and processed. The outcome is fed back
## to the vey same list `checkNodes`
let
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
accHash = kvp.data.accHash
storageRoot = kvp.key.to(Hash256)
slots = kvp.data.slots
rc = db.inspectStorageSlotsTrie(
peer, accHash, storageRoot, slots.checkNodes)
if rc.isErr:
when extraTraceMessages:
error "Storage slots healing failed => stop", peer,
ctx=buddy.healingCtx(kvp), error=rc.error
# Attempt to switch peers, there is not much else we can do here
buddy.ctrl.zombie = true
return false
# Update batch lists
slots.checkNodes.setLen(0)
slots.missingNodes = slots.missingNodes & rc.value.dangling
true
proc getMissingNodesFromNetwork(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
): Future[seq[Blob]]
{.async.} =
## Extract from `missingNodes` the next batch of nodes that need
## to be merged it into the database
let
ctx = buddy.ctx
peer = buddy.peer
accHash = kvp.data.accHash
storageRoot = kvp.key.to(Hash256)
slots = kvp.data.slots
nMissingNodes = slots.missingNodes.len
inxLeft = max(0, nMissingNodes - maxTrieNodeFetch)
# There is no point in processing too many nodes at the same time. So leave
# the rest on the `missingNodes` queue to be handled later.
let fetchNodes = slots.missingNodes[inxLeft ..< nMissingNodes]
slots.missingNodes.setLen(inxLeft)
# Fetch nodes from the network. Note that the remainder of the `missingNodes`
# list might be used by another process that runs semi-parallel.
let
req = @[accHash.data.toSeq] & fetchNodes.mapIt(@[it])
rc = await buddy.getTrieNodes(storageRoot, req)
if rc.isOk:
# Register unfetched missing nodes for the next pass
slots.missingNodes = slots.missingNodes & rc.value.leftOver.mapIt(it[0])
return rc.value.nodes
# Restore missing nodes list now so that a task switch in the error checker
# allows other processes to access the full `missingNodes` list.
slots.missingNodes = slots.missingNodes & fetchNodes
let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors):
discard
when extraTraceMessages:
trace "Error fetching storage slots nodes for healing => stop", peer,
ctx=buddy.healingCtx(kvp), error
else:
discard
when extraTraceMessages:
trace "Error fetching storage slots nodes for healing", peer,
ctx=buddy.healingCtx(kvp), error
return @[]
proc kvStorageSlotsLeaf(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
partialPath: Blob;
node: Blob;
): (bool,NodeKey)
{.gcsafe, raises: [Defect,RlpError]} =
## Read leaf node from persistent database (if any)
let
peer = buddy.peer
nodeRlp = rlpFromBytes node
(_,prefix) = hexPrefixDecode partialPath
(_,segment) = hexPrefixDecode nodeRlp.listElem(0).toBytes
nibbles = prefix & segment
if nibbles.len == 64:
return (true, nibbles.getBytes.convertTo(NodeKey))
when extraTraceMessages:
trace "Isolated node path for healing => ignored", peer,
ctx=buddy.healingCtx(kvp)
proc registerStorageSlotsLeaf(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
slotKey: NodeKey) =
## Process single trie node as would be done with an interval by
## the `storeStorageSlots()` function
let
peer = buddy.peer
env = buddy.data.pivotEnv
slots = kvp.data.slots
pt = slotKey.to(NodeTag)
# Find range set (from list) containing `pt`
var ivSet: NodeTagRangeSet
block foundCoveringRange:
for w in slots.unprocessed:
if 0 < w.covered(pt,pt):
ivSet = w
break foundCoveringRange
return # already processed, forget this account leaf
# Register this isolated leaf node that was added
discard ivSet.reduce(pt,pt)
when extraTraceMessages:
trace "Isolated storage slot for healing",
peer, ctx=buddy.healingCtx(kvp), slotKey=pt
# ------------------------------------------------------------------------------
# Private functions: do the healing for one work item (sub-trie)
# ------------------------------------------------------------------------------
proc storageSlotsHealing(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
): Future[bool]
{.async.} =
## Returns `true` is the sub-trie is complete (probably inherited), and
## `false` if there are nodes left to be completed.
let
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
accHash = kvp.data.accHash
slots = kvp.data.slots
# Update for changes since last visit
buddy.updateMissingNodesList(kvp)
# ???
if slots.checkNodes.len != 0:
if not buddy.appendMoreDanglingNodesToMissingNodesList(kvp):
return false
# Check whether the trie is complete.
if slots.missingNodes.len == 0:
trace "Storage slots healing complete", peer, ctx=buddy.healingCtx(kvp)
return true
# Get next batch of nodes that need to be merged it into the database
let nodesData = await buddy.getMissingNodesFromNetwork(kvp)
if nodesData.len == 0:
return
# Store nodes to disk
let report = db.importRawStorageSlotsNodes(peer, accHash, nodesData)
if 0 < report.len and report[^1].slot.isNone:
# Storage error, just run the next lap (not much else that can be done)
error "Storage slots healing, error updating persistent database", peer,
ctx=buddy.healingCtx(kvp), nNodes=nodesData.len, error=report[^1].error
slots.missingNodes = slots.missingNodes & nodesData
return false
when extraTraceMessages:
trace "Storage slots healing, nodes merged into database", peer,
ctx=buddy.healingCtx(kvp), nNodes=nodesData.len
# Filter out error and leaf nodes
for w in report:
if w.slot.isSome: # non-indexed entries appear typically at the end, though
let
inx = w.slot.unsafeGet
nodePath = nodesData[inx]
if w.error != NothingSerious or w.kind.isNone:
# error, try downloading again
slots.missingNodes.add nodePath
elif w.kind.unsafeGet != Leaf:
# re-check this node
slots.checkNodes.add nodePath
else:
# Node has been stored, double check
let (isLeaf, slotKey) =
buddy.kvStorageSlotsLeaf(kvp, nodePath, nodesData[inx])
if isLeaf:
# Update `uprocessed` registry, collect storage roots (if any)
buddy.registerStorageSlotsLeaf(kvp, slotKey)
else:
slots.checkNodes.add nodePath
when extraTraceMessages:
trace "Storage slots healing job done", peer, ctx=buddy.healingCtx(kvp)
proc healingIsComplete(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
): Future[bool]
{.async.} =
## Check whether the storage trie can be completely inherited and prepare for
## healing if not.
##
## Returns `true` is the sub-trie is complete (probably inherited), and
## `false` if there are nodes left to be completed.
let
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
accHash = kvp.data.accHash
storageRoot = kvp.key.to(Hash256)
# Check whether this work item can be completely inherited
if kvp.data.inherit:
let rc = db.inspectStorageSlotsTrie(peer, accHash, storageRoot)
if rc.isErr:
# Oops, not much we can do here (looping trie?)
error "Problem inspecting storage trie", peer, storageRoot, error=rc.error
return false
# Check whether the hexary trie can be inherited as-is.
if rc.value.dangling.len == 0:
return true # done
# Set up healing structure for this work item
let slots = SnapTrieRangeBatchRef(
missingNodes: rc.value.dangling)
kvp.data.slots = slots
# Full range covered vy unprocessed items
for n in 0 ..< kvp.data.slots.unprocessed.len:
slots.unprocessed[n] = NodeTagRangeSet.init()
discard slots.unprocessed[0].merge(
NodeTagRange.new(low(NodeTag),high(NodeTag)))
# Proceed with healing
return await buddy.storageSlotsHealing(kvp)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc healStoragesDb*(buddy: SnapBuddyRef) {.async.} =
## Fetching and merging missing slorage slots trie database nodes.
let
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
env = buddy.data.pivotEnv
var
toBeHealed: seq[SnapSlotsQueuePair]
# Search the current slot item batch list for items to complete via healing
for kvp in env.fetchStorage.nextPairs:
# Marked items indicate that a partial sub-trie existsts which might have
# been inherited from an earlier state root.
if not kvp.data.inherit:
let slots = kvp.data.slots
# Otherwise check partally fetched sub-tries only if they have a certain
# degree of completeness.
if slots.isNil or slots.unprocessed.emptyFactor < healSlorageSlotsTrigger:
continue
# Add to local batch to be processed, below
env.fetchStorage.del(kvp.key) # ok to delete this item from batch queue
toBeHealed.add kvp # to be held in local queue
if maxStoragesHeal <= toBeHealed.len:
break
when extraTraceMessages:
let nToBeHealed = toBeHealed.len
if 0 < nToBeHealed:
trace "Processing storage healing items", peer, nToBeHealed
# Run against local batch
for n in 0 ..< toBeHealed.len:
let
kvp = toBeHealed[n]
isComplete = await buddy.healingIsComplete(kvp)
if isComplete:
env.nStorage.inc
else:
env.fetchStorage.merge kvp
if buddy.ctrl.stopped:
# Oops, peer has gone
env.fetchStorage.merge toBeHealed[n+1 ..< toBeHealed.len]
break
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -115,9 +115,6 @@ proc storeAccounts*(buddy: SnapBuddyRef) {.async.} =
return
rc.value
when extraTraceMessages:
trace "Start fetching accounts", peer, stateRoot, iv
# Process received accounts and stash storage slots to fetch later
let dd = block:
let rc = await buddy.getAccountRange(stateRoot, iv)
@ -126,15 +123,20 @@ proc storeAccounts*(buddy: SnapBuddyRef) {.async.} =
let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors):
when extraTraceMessages:
trace "Error fetching accounts => stop", peer, error
trace "Error fetching accounts => stop", peer,
stateRoot, req=iv.len, error
return
# Reset error counts for detecting repeated timeouts
buddy.data.errors.nTimeouts = 0
rc.value
let
nAccounts = dd.data.accounts.len
nStorage = dd.withStorage.len
gotAccounts = dd.data.accounts.len
gotStorage = dd.withStorage.len
when extraTraceMessages:
trace "Fetched accounts", peer, gotAccounts, gotStorage,
stateRoot, req=iv.len, got=dd.consumed
block:
let rc = ctx.data.snapDb.importAccounts(peer, stateRoot, iv.minPt, dd.data)
@ -144,13 +146,12 @@ proc storeAccounts*(buddy: SnapBuddyRef) {.async.} =
buddy.ctrl.zombie = true
when extraTraceMessages:
let error = ComImportAccountsFailed
trace "Accounts import failed => stop", peer, stateRoot,
range=dd.consumed, nAccounts, nStorage, error
trace "Accounts import failed => stop", peer, gotAccounts, gotStorage,
stateRoot, req=iv.len, got=dd.consumed, error
return
# Statistics
env.nAccounts.inc(nAccounts)
env.nStorage.inc(nStorage)
env.nAccounts.inc(gotAccounts)
# Register consumed intervals on the accumulator over all state roots
buddy.markGloballyProcessed(dd.consumed)
@ -178,8 +179,8 @@ proc storeAccounts*(buddy: SnapBuddyRef) {.async.} =
env.fetchStorage.merge dd.withStorage
when extraTraceMessages:
trace "Done fetching accounts", peer, stateRoot, nAccounts,
withStorage=dd.withStorage.len, iv
trace "Done fetching accounts", peer, gotAccounts, gotStorage,
stateRoot, req=iv.len, got=dd.consumed
# ------------------------------------------------------------------------------
# End

View File

@ -11,20 +11,33 @@
## Fetch accounts stapshot
## =======================
##
## Worker items state diagram:
## Flow chart for storage slots download
## -------------------------------------
## ::
## unprocessed slot requests | peer workers + storages database update
## ===================================================================
## {missing-storage-slots} <-----------------+
## | |
## v |
## <fetch-storage-slots-from-network> |
## | |
## v |
## {storage-slots} |
## | |
## v |
## <merge-to-persistent-database> |
## | | |
## v v |
## {completed} {partial} |
## | | |
## | +------------------------+
## v
## <done-for-this-account>
##
## +-----------------------------------------------+
## | |
## v |
## <unprocessed> ------------+-------> <worker-0> ------+
## | |
## +-------> <worker-1> ------+
## | |
## +-------> <worker-2> ------+
## : :
## Legend:
## * `<..>`: some action, process, etc.
## * `{missing-storage-slots}`: list implemented as `env.fetchStorage`
## * `(storage-slots}`: list is optimised out
## * `{completed}`: list is optimised out
## * `{partial}`: list is optimised out
##
import
@ -36,7 +49,7 @@ import
../../sync_desc,
".."/[range_desc, worker_desc],
./com/[com_error, get_storage_ranges],
./db/[hexary_error, snapdb_storage_slots]
./db/snapdb_storage_slots
{.push raises: [Defect].}
@ -51,43 +64,82 @@ const
# ------------------------------------------------------------------------------
proc getNextSlotItems(buddy: SnapBuddyRef): seq[AccountSlotsHeader] =
## Get list of work item from the batch queue.
##
## * If the storage slots requested come with an explicit sub-range of slots
## (i.e. not an implied complete list), then the result has only on work
## item. An explicit list of slots is only calculated if there was a queue
## item with a partially completed slots download.
##
## * Otherwise, a list of at most `maxStoragesFetch` work items is returned.
## These work items were checked for that there was no trace of a previously
## installed (probably partial) storage trie on the database (e.g. inherited
## from an earlier state root pivot.)
##
## If there is an indication that the storage trie may have some data
## already it is ignored here and marked `inherit` so that it will be
## picked up by the healing process.
let
ctx = buddy.ctx
peer = buddy.peer
env = buddy.data.pivotEnv
(reqKey, reqData) = block:
let rc = env.fetchStorage.shift
let rc = env.fetchStorage.first # peek
if rc.isErr:
return
(rc.value.key, rc.value.data)
# Assemble first request
result.add AccountSlotsHeader(
accHash: reqData.accHash,
storageRoot: Hash256(data: reqKey))
# Check whether it comes with a sub-range
if not reqData.slots.isNil:
# Extract some interval and return single item request queue
# Assemble first request which might come with a sub-range.
while not reqData.slots.isNil:
# Extract first interval and return single item request queue
for ivSet in reqData.slots.unprocessed:
let rc = ivSet.ge()
if rc.isOk:
# Extraxt interval => done
result[0].subRange = some rc.value
# Extraxt this interval from the range set
discard ivSet.reduce rc.value
# Puch back on batch queue unless it becomes empty
if not reqData.slots.unprocessed.isEmpty:
discard env.fetchStorage.unshift(reqKey, reqData)
return
# Delete from batch queue if the range set becomes empty
if reqData.slots.unprocessed.isEmpty:
env.fetchStorage.del(reqKey)
# Append more full requests to returned list
while result.len < maxStoragesFetch:
let rc = env.fetchStorage.shift
if rc.isErr:
return
result.add AccountSlotsHeader(
accHash: rc.value.data.accHash,
storageRoot: Hash256(data: rc.value.key))
when extraTraceMessages:
trace "Prepare fetching partial storage slots", peer,
nStorageQueue=env.fetchStorage.len, subRange=rc.value,
account=reqData.accHash.to(NodeTag)
return @[AccountSlotsHeader(
accHash: reqData.accHash,
storageRoot: reqKey.to(Hash256),
subRange: some rc.value)]
# Oops, empty range set? Remove range and move item to the right end
reqData.slots = nil
discard env.fetchStorage.lruFetch(reqKey)
# So there are no partial ranges (aka `slots`) anymore. Assemble maximal
# request queue.
for kvp in env.fetchStorage.nextPairs:
let it = AccountSlotsHeader(
accHash: kvp.data.accHash,
storageRoot: kvp.key.to(Hash256))
# Verify whether a storage sub-trie exists, already
if kvp.data.inherit or
ctx.data.snapDb.haveStorageSlotsData(peer, it.accHash, it.storageRoot):
kvp.data.inherit = true
when extraTraceMessages:
trace "Inheriting storage slots", peer,
nStorageQueue=env.fetchStorage.len, account=it.accHash.to(NodeTag)
continue
result.add it
env.fetchStorage.del(kvp.key) # ok to delete this item from batch queue
# Maximal number of items to fetch
if maxStoragesFetch <= result.len:
break
# ------------------------------------------------------------------------------
# Public functions
@ -110,63 +162,90 @@ proc storeStorages*(buddy: SnapBuddyRef) {.async.} =
if req.len == 0:
return # currently nothing to do
when extraTraceMessages:
trace "Start fetching storage slots", peer,
nSlots=env.fetchStorage.len,
nReq=req.len
# Get storages slots data from the network
var stoRange = block:
let rc = await buddy.getStorageRanges(stateRoot, req)
if rc.isErr:
env.fetchStorage.merge req
let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors):
trace "Error fetching storage slots => stop", peer,
nSlots=env.fetchStorage.len,
nReq=req.len,
error
discard
env.fetchStorage.merge req
trace "Error fetching storage slots => stop", peer,
nReq=req.len, nStorageQueue=env.fetchStorage.len, error
return
rc.value
# Reset error counts for detecting repeated timeouts
buddy.data.errors.nTimeouts = 0
if 0 < stoRange.data.storages.len:
var gotStorage = stoRange.data.storages.len
when extraTraceMessages:
trace "Fetched storage slots", peer, gotStorage,
nLeftOvers=stoRange.leftOver.len, nReq=req.len,
nStorageQueue=env.fetchStorage.len
if 0 < gotStorage:
# Verify/process storages data and save it to disk
let report = ctx.data.snapDb.importStorages(peer, stoRange.data)
let report = ctx.data.snapDb.importStorageSlots(peer, stoRange.data)
if 0 < report.len:
let topStoRange = stoRange.data.storages.len - 1
if report[^1].slot.isNone:
# Failed to store on database, not much that can be done here
trace "Error writing storage slots to database", peer,
nSlots=env.fetchStorage.len,
nReq=req.len,
error=report[^1].error
env.fetchStorage.merge req
gotStorage.dec(report.len - 1) # for logging only
error "Error writing storage slots to database", peer, gotStorage,
nReq=req.len, nStorageQueue=env.fetchStorage.len,
error=report[^1].error
return
# Push back error entries to be processed later
for w in report:
if w.slot.isSome:
let n = w.slot.unsafeGet
# if w.error in {RootNodeMismatch, RightBoundaryProofFailed}:
# ???
trace "Error processing storage slots", peer,
nSlots=env.fetchStorage.len,
nReq=req.len,
nReqInx=n,
error=report[n].error
# Reset any partial requests to requesting the full interval. So
# all the storage slots are re-fetched completely for this account.
env.fetchStorage.merge AccountSlotsHeader(
accHash: stoRange.data.storages[n].account.accHash,
storageRoot: stoRange.data.storages[n].account.storageRoot)
# All except the last item always index to a node argument. The last
# item has been checked for, already.
let inx = w.slot.get
# if w.error in {RootNodeMismatch, RightBoundaryProofFailed}:
# ???
# Reset any partial result (which would be the last entry) to
# requesting the full interval. So all the storage slots are
# re-fetched completely for this account.
env.fetchStorage.merge AccountSlotsHeader(
accHash: stoRange.data.storages[inx].account.accHash,
storageRoot: stoRange.data.storages[inx].account.storageRoot)
# Last entry might be partial
if inx == topStoRange:
# No partial result processing anymore to consider
stoRange.data.proof = @[]
# Update local statistics counter for `nStorage` counter update
gotStorage.dec
trace "Error processing storage slots", peer, gotStorage,
nReqInx=inx, nReq=req.len, nStorageQueue=env.fetchStorage.len,
error=report[inx].error
# Update statistics
if gotStorage == 1 and
req[0].subRange.isSome and
env.fetchStorage.hasKey req[0].storageRoot.to(NodeKey):
# Successful partial request, but not completely done with yet.
gotStorage = 0
env.nStorage.inc(gotStorage)
# Return unprocessed left overs to batch queue
env.fetchStorage.merge stoRange.leftOver
when extraTraceMessages:
trace "Done fetching storage slots", peer,
nSlots=env.fetchStorage.len
trace "Done fetching storage slots", peer, gotStorage,
nStorageQueue=env.fetchStorage.len
# ------------------------------------------------------------------------------
# End

View File

@ -27,9 +27,9 @@ type
TickerStats* = object
pivotBlock*: Option[BlockNumber]
nAccounts*: (float,float) ## mean and standard deviation
nStorage*: (float,float) ## mean and standard deviation
accountsFill*: (float,float,float) ## mean, standard deviation, merged total
accCoverage*: float ## as factor
nStorage*: (float,float) ## mean and standard deviation
storageQueue*: (float,float) ## mean and standard deviation
nQueues*: int
TickerStatsUpdater* =
@ -107,17 +107,18 @@ proc setLogTicker(t: TickerRef; at: Moment) {.gcsafe.}
proc runLogTicker(t: TickerRef) {.gcsafe.} =
let data = t.statsCb()
if data != t.lastStats or
t.lastTick + tickerLogSuppressMax < t.tick:
if data != t.lastStats or t.lastTick + tickerLogSuppressMax < t.tick:
t.lastStats = data
t.lastTick = t.tick
var
nAcc, nStore, bulk: string
nAcc, nSto, bulk: string
pivot = "n/a"
let
accCov = data.accountsFill[0].toPC(1) &
"(" & data.accountsFill[1].toPC(1) & ")" &
"/" & data.accountsFill[2].toPC(0)
stoQue = data.storageQueue[0].uint64.toSI &
"(" & data.storageQueue[1].uint64.toSI & ")"
buddies = t.nBuddies
tick = t.tick.toSI
mem = getTotalMem().uint.toSI
@ -126,10 +127,10 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
if data.pivotBlock.isSome:
pivot = &"#{data.pivotBlock.get}/{data.nQueues}"
nAcc = &"{(data.nAccounts[0]+0.5).int64}({(data.nAccounts[1]+0.5).int64})"
nStore = &"{(data.nStorage[0]+0.5).int64}({(data.nStorage[1]+0.5).int64})"
nSto = &"{(data.nStorage[0]+0.5).int64}({(data.nStorage[1]+0.5).int64})"
info "Snap sync statistics",
tick, buddies, pivot, nAcc, accCov, nStore, mem
tick, buddies, pivot, nAcc, accCov, nSto, stoQue, mem
t.tick.inc
t.setLogTicker(Moment.fromNow(tickerLogInterval))

View File

@ -58,8 +58,15 @@ const
## all account ranges retrieved for all pivot state roots (see
## `coveredAccounts` in `CtxData`.)
maxStoragesFetch* = 128
## Maximal number of storage tries to fetch with a signe message.
healSlorageSlotsTrigger* = 0.70
## Consider per account storage slost healing if this particular sub-trie
## has reached this factor of completeness
maxStoragesFetch* = 5 * 1024
## Maximal number of storage tries to fetch with a single message.
maxStoragesHeal* = 32
## Maximal number of storage tries to to heal in a single batch run.
maxTrieNodeFetch* = 1024
## Informal maximal number of trie nodes to fetch at once. This is nor
@ -84,22 +91,26 @@ const
## Internal size of LRU cache (for debugging)
type
WorkerSeenBlocks = KeyedQueue[ByteArray32,BlockNumber]
WorkerSeenBlocks = KeyedQueue[NodeKey,BlockNumber]
## Temporary for pretty debugging, `BlockHash` keyed lru cache
SnapSlotsQueue* = KeyedQueue[ByteArray32,SnapSlotQueueItemRef]
SnapSlotsQueue* = KeyedQueue[NodeKey,SnapSlotQueueItemRef]
## Handles list of storage slots data for fetch indexed by storage root.
##
## Typically, storage data requests cover the full storage slots trie. If
## there is only a partial list of slots to fetch, the queue entry is
## stored left-most for easy access.
SnapSlotsQueuePair* = KeyedQueuePair[NodeKey,SnapSlotQueueItemRef]
## Key-value return code from `SnapSlotsQueue` handler
SnapSlotQueueItemRef* = ref object
## Storage slots request data. This entry is similar to `AccountSlotsHeader`
## where the optional `subRange` interval has been replaced by an interval
## range + healing support.
accHash*: Hash256 ## Owner account, maybe unnecessary
slots*: SnapTrieRangeBatchRef ## slots to fetch, nil => all slots
inherit*: bool ## mark this trie seen already
SnapSlotsSet* = HashSet[SnapSlotQueueItemRef]
## Ditto but without order, to be used as veto set
@ -126,7 +137,6 @@ type
# Accounts download
fetchAccounts*: SnapTrieRangeBatch ## Set of accounts ranges to fetch
# vetoSlots*: SnapSlotsSet ## Do not ask for these slots, again
accountsDone*: bool ## All accounts have been processed
# Storage slots download
@ -134,8 +144,8 @@ type
serialSync*: bool ## Done with storage, block sync next
# Info
nAccounts*: uint64 ## Number of accounts imported
nStorage*: uint64 ## Number of storage spaces imported
nAccounts*: uint64 ## Imported # of accounts
nStorage*: uint64 ## Imported # of account storage tries
SnapPivotTable* = ##\
## LRU table, indexed by state root
@ -186,18 +196,61 @@ proc hash*(a: Hash256): Hash =
# Public helpers
# ------------------------------------------------------------------------------
proc merge*(q: var SnapSlotsQueue; kvp: SnapSlotsQueuePair) =
## Append/prepend a queue item record into the batch queue.
let
reqKey = kvp.key
rc = q.eq(reqKey)
if rc.isOk:
# Entry exists already
let qData = rc.value
if not qData.slots.isNil:
# So this entry is not maximal and can be extended
if kvp.data.slots.isNil:
# Remove restriction for this entry and move it to the right end
qData.slots = nil
discard q.lruFetch(reqKey)
else:
# Merge argument intervals into target set
for ivSet in kvp.data.slots.unprocessed:
for iv in ivSet.increasing:
discard qData.slots.unprocessed[0].reduce(iv)
discard qData.slots.unprocessed[1].merge(iv)
else:
# Only add non-existing entries
if kvp.data.slots.isNil:
# Append full range to the right of the list
discard q.append(reqKey, kvp.data)
else:
# Partial range, add healing support and interval
discard q.unshift(reqKey, kvp.data)
proc merge*(q: var SnapSlotsQueue; fetchReq: AccountSlotsHeader) =
## Append/prepend a slot header record into the batch queue.
let reqKey = fetchReq.storageRoot.data
if not q.hasKey(reqKey):
let
reqKey = fetchReq.storageRoot.to(NodeKey)
rc = q.eq(reqKey)
if rc.isOk:
# Entry exists already
let qData = rc.value
if not qData.slots.isNil:
# So this entry is not maximal and can be extended
if fetchReq.subRange.isNone:
# Remove restriction for this entry and move it to the right end
qData.slots = nil
discard q.lruFetch(reqKey)
else:
# Merge argument interval into target set
let iv = fetchReq.subRange.unsafeGet
discard qData.slots.unprocessed[0].reduce(iv)
discard qData.slots.unprocessed[1].merge(iv)
else:
let reqData = SnapSlotQueueItemRef(accHash: fetchReq.accHash)
# Only add non-existing entries
if fetchReq.subRange.isNone:
# Append full range to the right of the list
discard q.append(reqKey, reqData)
else:
# Partial range, add healing support and interval
reqData.slots = SnapTrieRangeBatchRef()
@ -206,7 +259,9 @@ proc merge*(q: var SnapSlotsQueue; fetchReq: AccountSlotsHeader) =
discard reqData.slots.unprocessed[0].merge(fetchReq.subRange.unsafeGet)
discard q.unshift(reqKey, reqData)
proc merge*(q: var SnapSlotsQueue; reqList: openArray[AccountSlotsHeader]) =
proc merge*(
q: var SnapSlotsQueue;
reqList: openArray[SnapSlotsQueuePair|AccountSlotsHeader]) =
## Variant fof `merge()` for a list argument
for w in reqList:
q.merge w
@ -217,30 +272,31 @@ proc merge*(q: var SnapSlotsQueue; reqList: openArray[AccountSlotsHeader]) =
proc pp*(ctx: SnapCtxRef; bh: BlockHash): string =
## Pretty printer for debugging
let rc = ctx.data.seenBlock.lruFetch(bh.to(Hash256).data)
let rc = ctx.data.seenBlock.lruFetch(bh.Hash256.to(NodeKey))
if rc.isOk:
return "#" & $rc.value
"%" & $bh.to(Hash256).data.toHex
proc pp*(ctx: SnapCtxRef; bh: BlockHash; bn: BlockNumber): string =
## Pretty printer for debugging
let rc = ctx.data.seenBlock.lruFetch(bh.to(Hash256).data)
let rc = ctx.data.seenBlock.lruFetch(bh.Hash256.to(NodeKey))
if rc.isOk:
return "#" & $rc.value
"#" & $ctx.data.seenBlock.lruAppend(bh.to(Hash256).data, bn, seenBlocksMax)
"#" & $ctx.data.seenBlock.lruAppend(bh.Hash256.to(NodeKey), bn, seenBlocksMax)
proc pp*(ctx: SnapCtxRef; bhn: HashOrNum): string =
if not bhn.isHash:
return "#" & $bhn.number
let rc = ctx.data.seenBlock.lruFetch(bhn.hash.data)
let rc = ctx.data.seenBlock.lruFetch(bhn.hash.to(NodeKey))
if rc.isOk:
return "%" & $rc.value
return "%" & $bhn.hash.data.toHex
proc seen*(ctx: SnapCtxRef; bh: BlockHash; bn: BlockNumber) =
## Register for pretty printing
if not ctx.data.seenBlock.lruFetch(bh.to(Hash256).data).isOk:
discard ctx.data.seenBlock.lruAppend(bh.to(Hash256).data, bn, seenBlocksMax)
if not ctx.data.seenBlock.lruFetch(bh.Hash256.to(NodeKey)).isOk:
discard ctx.data.seenBlock.lruAppend(
bh.Hash256.to(NodeKey), bn, seenBlocksMax)
proc pp*(a: MDigest[256]; collapse = true): string =
if not collapse:

View File

@ -16,7 +16,8 @@
import
eth/[common, p2p],
../p2p/chain,
../db/db_chain
../db/db_chain,
./handlers
export
chain,
@ -46,6 +47,7 @@ type
CtxRef*[S] = ref object
## Shared state among all syncing peer workers (aka buddies.)
buddiesMax*: int ## Max number of buddies
ethWireCtx*: EthWireRef ## Eth protocol wire context (if available)
chain*: Chain ## Block chain database (no need for `Peer`)
poolMode*: bool ## Activate `runPool()` workers if set `true`
data*: S ## Shared context for all worker peers

View File

@ -74,7 +74,7 @@ import
chronos,
eth/[common/eth_types, p2p, p2p/peer_pool, p2p/private/p2p_types],
stew/keyed_queue,
./sync_desc
"."/[handlers, sync_desc]
{.push raises: [Defect].}
@ -266,6 +266,7 @@ proc initSync*[S,W](
# are full. The effect is that a re-connect on the latest zombie will be
# rejected as long as its worker descriptor is registered.
dsc.ctx = CtxRef[S](
ethWireCtx: cast[EthWireRef](node.protocolState protocol.eth),
buddiesMax: max(1, slots + 1),
chain: chain)
dsc.pool = node.peerPool

View File

@ -321,16 +321,16 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) =
# need to check for additional records only on either end of a range.
var keySet = packed.accounts.mapIt(it.accHash).toHashSet
for w in accountsList:
var key = desc.prevChainDbKey(w.data.accounts[0].accHash)
var key = desc.prevAccountsChainDbKey(w.data.accounts[0].accHash)
while key.isOk and key.value notin keySet:
keySet.incl key.value
let newKey = desc.prevChainDbKey(key.value)
let newKey = desc.prevAccountsChainDbKey(key.value)
check newKey != key
key = newKey
key = desc.nextChainDbKey(w.data.accounts[^1].accHash)
key = desc.nextAccountsChainDbKey(w.data.accounts[^1].accHash)
while key.isOk and key.value notin keySet:
keySet.incl key.value
let newKey = desc.nextChainDbKey(key.value)
let newKey = desc.nextAccountsChainDbKey(key.value)
check newKey != key
key = newKey
accKeys = toSeq(keySet).mapIt(it.to(NodeTag)).sorted(cmp)
@ -346,9 +346,9 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) =
count.inc
let
pfx = $count & "#"
byChainDB = desc.getChainDbAccount(accHash)
byNextKey = desc.nextChainDbKey(accHash)
byPrevKey = desc.prevChainDbKey(accHash)
byChainDB = desc.getAccountsChainDb(accHash)
byNextKey = desc.nextAccountsChainDbKey(accHash)
byPrevKey = desc.prevAccountsChainDbKey(accHash)
noisy.say "*** find",
"<", count, "> byChainDb=", byChainDB.pp
check byChainDB.isOk
@ -431,7 +431,7 @@ proc storagesRunner(
test &"Merging {storagesList.len} storages lists":
let
dbDesc = SnapDbStorageSlotsRef.init(dbBase, peer=peer)
dbDesc = SnapDbStorageSlotsRef.init(dbBase,Hash256(),Hash256(),peer)
ignore = knownFailures.toTable
for n,w in storagesList:
let
@ -440,7 +440,7 @@ proc storagesRunner(
Result[void,seq[(int,HexaryDbError)]].err(ignore[testId])
else:
OkStoDb
check dbDesc.importStorages(w.data, persistent).toStoDbRc == expRc
check dbDesc.importStorageSlots(w.data, persistent).toStoDbRc == expRc
proc inspectionRunner(
noisy = true;