Snap sync accounts healing (#1225)

* Added inspect module

why:
  Find dangling references for trie healing support.

details:
 + This patch set provides only the inspect module and some unit tests.
 + There are also extensive unit tests which need bulk data from the
   `nimbus-eth1-blob` module.

* Alternative pivot finder

why:
  Attempt to be faster on start up. Also tying to decouple pivot finder
  somehow by providing different mechanisms (this one runs in `single`
  mode.)

* Use inspect module for healing

details:
 + After some progress with account and storage data, the inspect facility
   is used to find dangling links in the database to be filled nose-wise.
 + This is a crude attempt to cobble together functional elements. The
   set up needs to be honed.

* fix scheduler to avoid starting dead peers

why:
  Some peers drop out while in `sleepAsync()`. So extra `if` clauses
  make sure that this event is detected early.

* Bug fixes causing crashes

details:

+ prettify.toPC():
  int/intToStr() numeric range over/underflow

+ hexary_inspect.hexaryInspectPath():
  take care of half initialised step with branch but missing index into
  branch array

* improve handling of dropped peers in alternaive pivot finder

why:
  Strange things may happen while querying data from the network.
  Additional checks make sure that the state of other peers is updated
  immediately.

* Update trace messages

* reorganise snap fetch & store schedule
This commit is contained in:
Jordan Hrycaj 2022-09-16 08:24:12 +01:00 committed by GitHub
parent 2503964149
commit 4ff0948fed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 2242 additions and 506 deletions

View File

@ -449,10 +449,8 @@ proc initaliseWorker(buddy: FullBuddyRef): Future[bool] {.async.} =
let rc = buddy.getRandomTrustedPeer()
if rc.isOK:
if await buddy.agreesOnChain(rc.value):
# Beware of peer terminating the session
if not buddy.ctrl.stopped:
ctx.data.trusted.incl peer
return true
ctx.data.trusted.incl peer
return true
# If there are no trusted peers yet, assume this very peer is trusted,
# but do not finish initialisation until there are more peers.
@ -476,15 +474,13 @@ proc initaliseWorker(buddy: FullBuddyRef): Future[bool] {.async.} =
for p in ctx.data.trusted:
if peer == p:
inc agreeScore
else:
let agreedOk = await buddy.agreesOnChain(p)
elif await buddy.agreesOnChain(p):
inc agreeScore
elif buddy.ctrl.stopped:
# Beware of peer terminating the session
if buddy.ctrl.stopped:
return false
if agreedOk:
inc agreeScore
else:
otherPeer = p
return false
else:
otherPeer = p
# Check for the number of peers that disagree
case ctx.data.trusted.len - agreeScore

View File

@ -33,6 +33,9 @@ type
SnapStorageRanges* = storageRangesObj
## Ditto
SnapByteCodes* = byteCodesObj
## Ditto
SnapTrieNodes* = trieNodesObj
## Ditto

View File

@ -10,7 +10,7 @@
# distributed except according to those terms.
import
std/[math, hashes],
std/[math, sequtils, hashes],
eth/common/eth_types_rlp,
stew/[byteutils, interval_set],
stint,
@ -21,10 +21,17 @@ import
{.push raises: [Defect].}
type
ByteArray32* = array[32,byte]
## Used for 32 byte database keys
NodeTag* = ##\
## Trie leaf item, account hash etc.
distinct UInt256
NodeKey* = distinct ByteArray32
## Hash key without the hash wrapper (as opposed to `NodeTag` which is a
## number)
LeafRange* = ##\
## Interval `[minPt,maxPt]` of` NodeTag` elements, can be managed in an
## `IntervalSet` data type.
@ -69,21 +76,29 @@ type
# Public helpers
# ------------------------------------------------------------------------------
proc to*(nid: NodeTag; T: type Hash256): T =
proc to*(tag: NodeTag; T: type Hash256): T =
## Convert to serialised equivalent
result.data = nid.UInt256.toBytesBE
result.data = tag.UInt256.toBytesBE
proc to*(nid: NodeTag; T: type NodeHash): T =
## Syntactic sugar
nid.to(Hash256).T
proc to*(h: Hash256; T: type NodeTag): T =
proc to*(key: NodeKey; T: type NodeTag): T =
## Convert from serialised equivalent
UInt256.fromBytesBE(h.data).T
UInt256.fromBytesBE(key.ByteArray32).T
proc to*(nh: NodeHash; T: type NodeTag): T =
proc to*(key: Hash256; T: type NodeTag): T =
## Syntactic sugar
nh.Hash256.to(T)
key.data.NodeKey.to(T)
proc to*(tag: NodeTag; T: type NodeKey): T =
## Syntactic sugar
tag.UInt256.toBytesBE.T
proc to*(hash: Hash256; T: type NodeKey): T =
## Syntactic sugar
hash.data.NodeKey
proc to*(key: NodeKey; T: type Blob): T =
## Syntactic sugar
key.ByteArray32.toSeq
proc to*(n: SomeUnsignedInt|UInt256; T: type NodeTag): T =
## Syntactic sugar
@ -93,21 +108,21 @@ proc to*(n: SomeUnsignedInt|UInt256; T: type NodeTag): T =
# Public constructors
# ------------------------------------------------------------------------------
proc init*(nh: var NodeHash; data: openArray[byte]): bool =
## Import argument `data` into `nh` which must have length either `32` or `0`.
## The latter case is equivalent to an all zero byte array of size `32`.
proc init*(key: var NodeKey; data: openArray[byte]): bool =
## ## Import argument `data` into `key` which must have length either `32`, ot
## `0`. The latter case is equivalent to an all zero byte array of size `32`.
if data.len == 32:
(addr nh.Hash256.data[0]).copyMem(unsafeAddr data[0], 32)
(addr key.ByteArray32[0]).copyMem(unsafeAddr data[0], data.len)
return true
elif data.len == 0:
nh.reset
key.reset
return true
proc init*(nt: var NodeTag; data: openArray[byte]): bool =
## Similar to `init(nh: var NodeHash; .)`.
var h: NodeHash
if h.init(data):
nt = h.to(NodeTag)
proc init*(tag: var NodeTag; data: openArray[byte]): bool =
## Similar to `init(key: var NodeHash; .)`.
var key: NodeKey
if key.init(data):
tag = key.to(NodeTag)
return true
# ------------------------------------------------------------------------------

View File

@ -16,11 +16,20 @@ import
eth/[common/eth_types, p2p],
stew/[interval_set, keyed_queue],
../../db/select_backend,
../../utils/prettify,
".."/[protocol, sync_desc],
./worker/[accounts_db, fetch_accounts, pivot, ticker],
./worker/[accounts_db, fetch_accounts, ticker],
"."/[range_desc, worker_desc]
const
usePivot2ok = false or true
when usePivot2ok:
import ./worker/pivot2
else:
import ./worker/pivot
{.push raises: [Defect].}
logScope:
topics = "snap-sync"
@ -81,30 +90,47 @@ proc setPivotEnv(buddy: SnapBuddyRef; header: BlockHeader) =
# Statistics
ctx.data.pivotCount.inc
# Activate per-state root environment
# Activate per-state root environment (and hold previous one)
ctx.data.prevEnv = ctx.data.pivotEnv
ctx.data.pivotEnv = ctx.data.pivotTable.lruAppend(key, env, ctx.buddiesMax)
proc updatePivotEnv(buddy: SnapBuddyRef): bool =
## Update global state root environment from local `pivotHeader`. Choose the
## latest block number. Returns `true` if the environment was changed
if buddy.data.pivotHeader.isSome:
when usePivot2ok:
let maybeHeader = buddy.data.pivot2Header
else:
let maybeHeader = buddy.data.pivotHeader
if maybeHeader.isSome:
let
peer = buddy.peer
ctx = buddy.ctx
env = ctx.data.pivotEnv
newStateNumber = buddy.data.pivotHeader.unsafeGet.blockNumber
pivotHeader = maybeHeader.unsafeGet
newStateNumber = pivotHeader.blockNumber
stateNumber = if env.isNil: 0.toBlockNumber
else: env.stateHeader.blockNumber
stateWindow = stateNumber + maxPivotBlockWindow
when switchPivotAfterCoverage < 1.0:
if not env.isNil:
if stateNumber < newStateNumber and env.minCoverageReachedOk:
buddy.setPivotEnv(buddy.data.pivotHeader.get)
return true
block keepCurrent:
if env.isNil:
break keepCurrent # => new pivot
if stateNumber < newStateNumber:
when switchPivotAfterCoverage < 1.0:
if env.minCoverageReachedOk:
break keepCurrent # => new pivot
if stateWindow < newStateNumber:
break keepCurrent # => new pivot
if newStateNumber <= maxPivotBlockWindow:
break keepCurrent # => new pivot
# keep current
return false
if stateNumber + maxPivotBlockWindow < newStateNumber:
buddy.setPivotEnv(buddy.data.pivotHeader.get)
return true
# set new block
buddy.setPivotEnv(pivotHeader)
return true
proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater =
@ -151,6 +177,39 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater =
nStorage: meanStdDev(sSum, sSqSum, count),
accountsFill: (accFill[0], accFill[1], accCoverage))
proc havePivot(buddy: SnapBuddyRef): bool =
## ...
if buddy.data.pivotHeader.isSome and
buddy.data.pivotHeader.get.blockNumber != 0:
# So there is a `ctx.data.pivotEnv`
when 1.0 <= switchPivotAfterCoverage:
return true
else:
let
ctx = buddy.ctx
env = ctx.data.pivotEnv
# Force fetching new pivot if coverage reached by returning `false`
if not env.minCoverageReachedOk:
# Not sure yet, so check whether coverage has been reached at all
let cov = env.availAccounts.freeFactor
if switchPivotAfterCoverage <= cov:
trace " Snap accounts coverage reached", peer,
threshold=switchPivotAfterCoverage, coverage=cov.toPC
# Need to reset pivot handlers
buddy.ctx.poolMode = true
buddy.ctx.data.runPoolHook = proc(b: SnapBuddyRef) =
b.ctx.data.pivotEnv.minCoverageReachedOk = true
when usePivot2ok:
b.pivot2Restart
else:
b.pivotRestart
return true
# ------------------------------------------------------------------------------
# Public start/stop and admin functions
# ------------------------------------------------------------------------------
@ -187,7 +246,10 @@ proc start*(buddy: SnapBuddyRef): bool =
if peer.supports(protocol.snap) and
peer.supports(protocol.eth) and
peer.state(protocol.eth).initialized:
buddy.pivotStart()
when usePivot2ok:
buddy.pivot2Start()
else:
buddy.pivotStart()
if not ctx.data.ticker.isNil:
ctx.data.ticker.startBuddy()
return true
@ -198,7 +260,10 @@ proc stop*(buddy: SnapBuddyRef) =
ctx = buddy.ctx
peer = buddy.peer
buddy.ctrl.stopped = true
buddy.pivotStop()
when usePivot2ok:
buddy.pivot2Stop()
else:
buddy.pivotStop()
if not ctx.data.ticker.isNil:
ctx.data.ticker.stopBuddy()
@ -218,7 +283,31 @@ proc runSingle*(buddy: SnapBuddyRef) {.async.} =
##
## Note that this function runs in `async` mode.
##
buddy.ctrl.multiOk = true
when usePivot2ok:
#
# Run alternative pivot finder. This one harmonises difficulties of at
# least two peers. The can only be one instance active/unfinished of the
# `pivot2Exec()` functions.
#
let peer = buddy.peer
if not buddy.havePivot:
if await buddy.pivot2Exec():
discard buddy.updatePivotEnv()
else:
if not buddy.ctrl.stopped:
await sleepAsync(2.seconds)
return
buddy.ctrl.multiOk = true
trace "Snap pivot initialised", peer,
multiOk=buddy.ctrl.multiOk, runState=buddy.ctrl.state
else:
#
# The default pivot finder runs in multi mode. So there is nothing to do
# here.
#
buddy.ctrl.multiOk = true
proc runPool*(buddy: SnapBuddyRef, last: bool) =
@ -253,34 +342,12 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} =
let
ctx = buddy.ctx
peer = buddy.peer
var
havePivotOk = (buddy.data.pivotHeader.isSome and
buddy.data.pivotHeader.get.blockNumber != 0)
# Switch pivot state root if this much coverage has been achieved, already
when switchPivotAfterCoverage < 1.0:
if havePivotOk:
# So there is a `ctx.data.pivotEnv`
if ctx.data.pivotEnv.minCoverageReachedOk:
# Force fetching new pivot if coverage reached
havePivotOk = false
else:
# Not sure yet, so check whether coverage has been reached at all
let cov = ctx.data.pivotEnv.availAccounts.freeFactor
if switchPivotAfterCoverage <= cov:
trace " Snap accounts coverage reached",
threshold=switchPivotAfterCoverage, coverage=cov.toPC
# Need to reset pivot handlers
buddy.ctx.poolMode = true
buddy.ctx.data.runPoolHook = proc(b: SnapBuddyRef) =
b.ctx.data.pivotEnv.minCoverageReachedOk = true
b.pivotRestart
return
if not havePivotOk:
await buddy.pivotExec()
if not buddy.updatePivotEnv():
return
when not usePivot2ok:
if not buddy.havePivot:
await buddy.pivotExec()
if not buddy.updatePivotEnv():
return
# Ignore rest if the pivot is still acceptably covered
when switchPivotAfterCoverage < 1.0:
@ -288,7 +355,9 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} =
await sleepAsync(50.milliseconds)
return
if await buddy.fetchAccounts():
await buddy.fetchAccounts()
if ctx.data.pivotEnv.repairState == Done:
buddy.ctrl.multiOk = false
buddy.data.pivotHeader = none(BlockHeader)

View File

@ -21,7 +21,7 @@ import
"../.."/[protocol, types],
../range_desc,
./db/[bulk_storage, hexary_defs, hexary_desc, hexary_import,
hexary_interpolate, hexary_paths, rocky_bulk_load]
hexary_interpolate, hexary_inspect, hexary_paths, rocky_bulk_load]
{.push raises: [Defect].}
@ -31,6 +31,9 @@ logScope:
export
HexaryDbError
const
extraTraceMessages = false # or true
type
AccountsDbRef* = ref object
db: TrieDatabaseRef ## General database
@ -41,21 +44,21 @@ type
base: AccountsDbRef ## Back reference to common parameters
peer: Peer ## For log messages
accRoot: NodeKey ## Current accounts root node
accDb: HexaryTreeDB ## Accounts database
stoDb: HexaryTreeDB ## Storage database
accDb: HexaryTreeDbRef ## Accounts database
stoDb: HexaryTreeDbRef ## Storage database
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc newHexaryTreeDb(ps: AccountsDbSessionRef): HexaryTreeDB =
result.keyPp = ps.stoDb.keyPp # for debugging, will go away
proc newHexaryTreeDbRef(ps: AccountsDbSessionRef): HexaryTreeDbRef =
HexaryTreeDbRef(keyPp: ps.stoDb.keyPp) # for debugging, will go away
proc to(h: Hash256; T: type NodeKey): T =
h.data.T
proc convertTo(data: openArray[byte]; T: type Hash256): T =
discard result.NodeHash.init(data) # error => zero
discard result.data.NodeKey.init(data) # size error => zero
template noKeyError(info: static[string]; code: untyped) =
try:
@ -68,6 +71,8 @@ template noRlpExceptionOops(info: static[string]; code: untyped) =
code
except RlpError:
return err(RlpEncoding)
except KeyError as e:
raiseAssert "Not possible (" & info & "): " & e.msg
except Defect as e:
raise e
except Exception as e:
@ -118,7 +123,7 @@ proc pp(a: NodeTag; ps: AccountsDbSessionRef): string =
proc mergeProofs(
peer: Peer, ## For log messages
db: var HexaryTreeDB; ## Database table
db: HexaryTreeDbRef; ## Database table
root: NodeKey; ## Root for checking nodes
proof: seq[Blob]; ## Node records
freeStandingOk = false; ## Remove freestanding nodes
@ -155,28 +160,30 @@ proc mergeProofs(
proc persistentAccounts(
ps: AccountsDbSessionRef
db: HexaryTreeDbRef; ## Current table
pv: AccountsDbRef; ## Persistent database
): Result[void,HexaryDbError]
{.gcsafe, raises: [Defect,OSError,KeyError].} =
## Store accounts trie table on databse
if ps.base.rocky.isNil:
let rc = ps.accDb.bulkStorageAccounts(ps.base.db)
if pv.rocky.isNil:
let rc = db.bulkStorageAccounts(pv.db)
if rc.isErr: return rc
else:
let rc = ps.accDb.bulkStorageAccountsRocky(ps.base.rocky)
let rc = db.bulkStorageAccountsRocky(pv.rocky)
if rc.isErr: return rc
ok()
proc persistentStorages(
ps: AccountsDbSessionRef
db: HexaryTreeDbRef; ## Current table
pv: AccountsDbRef; ## Persistent database
): Result[void,HexaryDbError]
{.gcsafe, raises: [Defect,OSError,KeyError].} =
## Store accounts trie table on databse
if ps.base.rocky.isNil:
let rc = ps.stoDb.bulkStorageStorages(ps.base.db)
if pv.rocky.isNil:
let rc = db.bulkStorageStorages(pv.db)
if rc.isErr: return rc
else:
let rc = ps.stoDb.bulkStorageStoragesRocky(ps.base.rocky)
let rc = db.bulkStorageStoragesRocky(pv.rocky)
if rc.isErr: return rc
ok()
@ -271,7 +278,7 @@ proc importStorageSlots*(
stoRoot = data.account.storageRoot.to(NodeKey)
var
slots: seq[RLeafSpecs]
db = ps.newHexaryTreeDB()
db = ps.newHexaryTreeDbRef()
if 0 < proof.len:
let rc = ps.peer.mergeProofs(db, stoRoot, proof)
@ -326,7 +333,9 @@ proc init*(
let desc = AccountsDbSessionRef(
base: pv,
peer: peer,
accRoot: root.to(NodeKey))
accRoot: root.to(NodeKey),
accDb: HexaryTreeDbRef(),
stoDb: HexaryTreeDbRef())
# Debugging, might go away one time ...
desc.accDb.keyPp = proc(key: RepairKey): string = key.pp(desc)
@ -334,6 +343,27 @@ proc init*(
return desc
proc dup*(
ps: AccountsDbSessionRef;
root: Hash256;
peer: Peer;
): AccountsDbSessionRef =
## Resume a session with different `root` key and `peer`. This new session
## will access the same memory database as the `ps` argument session.
AccountsDbSessionRef(
base: ps.base,
peer: peer,
accRoot: root.to(NodeKey),
accDb: ps.accDb,
stoDb: ps.stoDb)
proc dup*(
ps: AccountsDbSessionRef;
root: Hash256;
): AccountsDbSessionRef =
## Variant of `dup()` without the `peer` argument.
ps.dup(root, ps.peer)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
@ -350,11 +380,11 @@ proc importAccounts*(
ps: AccountsDbSessionRef; ## Re-usable session descriptor
base: NodeTag; ## before or at first account entry in `data`
data: PackedAccountRange; ## re-packed `snap/1 ` reply data
storeOk = false; ## store data on disk
persistent = false; ## store data on disk
): Result[void,HexaryDbError] =
## Validate and import accounts (using proofs as received with the snap
## message `AccountRange`). This function accumulates data in a memory table
## which can be written to disk with the argument `storeOk` set `true`. The
## which can be written to disk with the argument `persistent` set `true`. The
## memory table is held in the descriptor argument`ps`.
##
## Note that the `peer` argument is for log messages, only.
@ -374,8 +404,8 @@ proc importAccounts*(
ps.accRoot, accounts, bootstrap = (data.proof.len == 0))
if rc.isErr:
return err(rc.error)
if storeOk:
let rc = ps.persistentAccounts()
if persistent:
let rc = ps.accDb.persistentAccounts(ps.base)
if rc.isErr:
return err(rc.error)
except RlpError:
@ -386,9 +416,10 @@ proc importAccounts*(
trace "Import Accounts exception", peer=ps.peer, name=($e.name), msg=e.msg
return err(OSErrorException)
trace "Accounts and proofs ok", peer=ps.peer,
root=ps.accRoot.ByteArray32.toHex,
proof=data.proof.len, base, accounts=data.accounts.len
when extraTraceMessages:
trace "Accounts and proofs ok", peer=ps.peer,
root=ps.accRoot.ByteArray32.toHex,
proof=data.proof.len, base, accounts=data.accounts.len
ok()
proc importAccounts*(
@ -397,21 +428,21 @@ proc importAccounts*(
root: Hash256; ## state root
base: NodeTag; ## before or at first account entry in `data`
data: PackedAccountRange; ## re-packed `snap/1 ` reply data
storeOk = false; ## store data on disk
): Result[void,HexaryDbError] =
## Variant of `importAccounts()`
AccountsDbSessionRef.init(pv, root, peer).importAccounts(base, data, storeOk)
AccountsDbSessionRef.init(
pv, root, peer).importAccounts(base, data, persistent=true)
proc importStorages*(
ps: AccountsDbSessionRef; ## Re-usable session descriptor
data: AccountStorageRange; ## Account storage reply from `snap/1` protocol
storeOk = false; ## store data on disk
persistent = false; ## store data on disk
): Result[void,seq[(int,HexaryDbError)]] =
## Validate and import storage slots (using proofs as received with the snap
## message `StorageRanges`). This function accumulates data in a memory table
## which can be written to disk with the argument `storeOk` set `true`. The
## which can be written to disk with the argument `persistent` set `true`. The
## memory table is held in the descriptor argument`ps`.
##
## Note that the `peer` argument is for log messages, only.
@ -447,9 +478,9 @@ proc importStorages*(
errors.add (slotID,rc.error)
# Store to disk
if storeOk:
if persistent:
slotID = -1
let rc = ps.persistentStorages()
let rc = ps.stoDb.persistentStorages(ps.base)
if rc.isErr:
errors.add (slotID,rc.error)
@ -465,19 +496,118 @@ proc importStorages*(
# So non-empty error list is guaranteed
return err(errors)
trace "Storage slots imported", peer=ps.peer,
slots=data.storages.len, proofs=data.proof.len
when extraTraceMessages:
trace "Storage slots imported", peer=ps.peer,
slots=data.storages.len, proofs=data.proof.len
ok()
proc importStorages*(
pv: AccountsDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer, ## For log messages, only
data: AccountStorageRange; ## Account storage reply from `snap/1` protocol
storeOk = false; ## store data on disk
): Result[void,seq[(int,HexaryDbError)]] =
## Variant of `importStorages()`
AccountsDbSessionRef.init(pv, Hash256(), peer).importStorages(data, storeOk)
AccountsDbSessionRef.init(
pv, Hash256(), peer).importStorages(data, persistent=true)
proc importRawNodes*(
ps: AccountsDbSessionRef; ## Re-usable session descriptor
nodes: openArray[Blob]; ## Node records
persistent = false; ## store data on disk
): Result[void,seq[(int,HexaryDbError)]] =
## ...
var
errors: seq[(int,HexaryDbError)]
nodeID = -1
let
db = ps.newHexaryTreeDbRef()
try:
# Import nodes
for n,rec in nodes:
nodeID = n
let rc = db.hexaryImport(rec)
if rc.isErr:
let error = rc.error
trace "importRawNodes()", peer=ps.peer, item=n, nodes=nodes.len, error
errors.add (nodeID,error)
# Store to disk
if persistent:
nodeID = -1
let rc = db.persistentAccounts(ps.base)
if rc.isErr:
errors.add (nodeID,rc.error)
except RlpError:
errors.add (nodeID,RlpEncoding)
except KeyError as e:
raiseAssert "Not possible @ importAccounts: " & e.msg
except OSError as e:
trace "Import Accounts exception", peer=ps.peer, name=($e.name), msg=e.msg
errors.add (nodeID,RlpEncoding)
if 0 < errors.len:
return err(errors)
trace "Raw nodes imported", peer=ps.peer, nodes=nodes.len
ok()
proc importRawNodes*(
pv: AccountsDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer, ## For log messages, only
nodes: openArray[Blob]; ## Node records
): Result[void,seq[(int,HexaryDbError)]] =
## Variant of `importRawNodes()` for persistent storage.
AccountsDbSessionRef.init(
pv, Hash256(), peer).importRawNodes(nodes, persistent=true)
proc inspectAccountsTrie*(
ps: AccountsDbSessionRef; ## Re-usable session descriptor
pathList = seq[Blob].default; ## Starting nodes for search
persistent = false; ## Read data from disk
ignoreError = false; ## Return partial results if available
): Result[TrieNodeStat, HexaryDbError] =
## Starting with the argument list `pathSet`, find all the non-leaf nodes in
## the hexary trie which have at least one node key reference missing in
## the trie database.
##
var stats: TrieNodeStat
noRlpExceptionOops("inspectAccountsTrie()"):
if persistent:
let getFn: HexaryGetFn = proc(key: Blob): Blob = ps.base.db.get(key)
stats = getFn.hexaryInspectTrie(ps.accRoot, pathList)
else:
stats = ps.accDb.hexaryInspectTrie(ps.accRoot, pathList)
let
peer = ps.peer
pathList = pathList.len
nDangling = stats.dangling.len
if stats.stoppedAt != 0:
let error = LoopAlert
trace "Inspect account trie loop", peer, pathList, nDangling,
stoppedAt=stats.stoppedAt, error
if ignoreError:
return ok(stats)
return err(error)
trace "Inspect account trie ok", peer, pathList, nDangling
return ok(stats)
proc inspectAccountsTrie*(
pv: AccountsDbRef; ## Base descriptor on `BaseChainDB`
peer: Peer, ## For log messages, only
root: Hash256; ## state root
pathList = seq[Blob].default; ## Starting paths for search
ignoreError = false; ## Return partial results if available
): Result[TrieNodeStat, HexaryDbError] =
## Variant of `inspectAccountsTrie()` for persistent storage.
AccountsDbSessionRef.init(
pv, root, peer).inspectAccountsTrie(pathList, persistent=true, ignoreError)
# ------------------------------------------------------------------------------
# Debugging (and playing with the hexary database)
@ -585,6 +715,10 @@ proc dumpAccDB*(ps: AccountsDbSessionRef; indent = 4): string =
## Dump the entries from the a generic accounts trie.
ps.accDb.pp(ps.accRoot,indent)
proc getAcc*(ps: AccountsDbSessionRef): HexaryTreeDbRef =
## Low level access to accounts DB
ps.accDb
proc hexaryPpFn*(ps: AccountsDbSessionRef): HexaryPpFn =
## Key mapping function used in `HexaryTreeDB`
ps.accDb.keyPp

View File

@ -1,5 +1,4 @@
# Nimbus - Fetch account and storage states from peers by snapshot traversal
#
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
@ -18,8 +17,9 @@ import
chronos,
eth/[common/eth_types, p2p, trie/trie_defs],
stew/interval_set,
"../.."/[protocol, protocol/trace_config],
".."/[range_desc, worker_desc]
"../../.."/[protocol, protocol/trace_config],
"../.."/[range_desc, worker_desc],
./get_error
{.push raises: [Defect].}
@ -27,15 +27,6 @@ logScope:
topics = "snap-fetch"
type
GetAccountRangeError* = enum
GareNothingSerious
GareMissingProof
GareAccountsMinTooSmall
GareAccountsMaxTooLarge
GareNoAccountsForStateRoot
GareNetworkProblem
GareResponseTimeout
GetAccountRange* = object
consumed*: LeafRange ## Real accounts interval covered
data*: PackedAccountRange ## Re-packed reply data
@ -69,7 +60,7 @@ proc getAccountRange*(
buddy: SnapBuddyRef;
stateRoot: Hash256;
iv: LeafRange
): Future[Result[GetAccountRange,GetAccountRangeError]] {.async.} =
): Future[Result[GetAccountRange,ComError]] {.async.} =
## Fetch data using the `snap#` protocol, returns the range covered.
let
peer = buddy.peer
@ -80,10 +71,10 @@ proc getAccountRange*(
var dd = block:
let rc = await buddy.getAccountRangeReq(stateRoot, iv)
if rc.isErr:
return err(GareNetworkProblem)
return err(ComNetworkProblem)
if rc.value.isNone:
trace trSnapRecvTimeoutWaiting & "for reply to GetAccountRange", peer
return err(GareResponseTimeout)
return err(ComResponseTimeout)
let snAccRange = rc.value.get
GetAccountRange(
consumed: iv,
@ -119,7 +110,7 @@ proc getAccountRange*(
# Maybe try another peer
trace trSnapRecvReceived & "empty AccountRange", peer,
nAccounts, nProof, accRange="n/a", reqRange=iv, stateRoot
return err(GareNoAccountsForStateRoot)
return err(ComNoAccountsForStateRoot)
# So there is no data, otherwise an account beyond the interval end
# `iv.maxPt` would have been returned.
@ -144,14 +135,14 @@ proc getAccountRange*(
trace trSnapRecvProtocolViolation & "proof-less AccountRange", peer,
nAccounts, nProof, accRange=LeafRange.new(iv.minPt, accMaxPt),
reqRange=iv, stateRoot
return err(GareMissingProof)
return err(ComMissingProof)
if accMinPt < iv.minPt:
# Not allowed
trace trSnapRecvProtocolViolation & "min too small in AccountRange", peer,
nAccounts, nProof, accRange=LeafRange.new(accMinPt, accMaxPt),
reqRange=iv, stateRoot
return err(GareAccountsMinTooSmall)
return err(ComAccountsMinTooSmall)
if iv.maxPt < accMaxPt:
# github.com/ethereum/devp2p/blob/master/caps/snap.md#getaccountrange-0x00:
@ -168,7 +159,7 @@ proc getAccountRange*(
trace trSnapRecvProtocolViolation & "AccountRange top exceeded", peer,
nAccounts, nProof, accRange=LeafRange.new(iv.minPt, accMaxPt),
reqRange=iv, stateRoot
return err(GareAccountsMaxTooLarge)
return err(ComAccountsMaxTooLarge)
dd.consumed = LeafRange.new(iv.minPt, max(iv.maxPt,accMaxPt))
trace trSnapRecvReceived & "AccountRange", peer,

View File

@ -0,0 +1,129 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Note: this module is currently unused
import
std/[options, sequtils],
chronos,
eth/[common/eth_types, p2p],
"../../.."/[protocol, protocol/trace_config],
"../.."/[range_desc, worker_desc],
./get_error
{.push raises: [Defect].}
logScope:
topics = "snap-fetch"
type
# SnapByteCodes* = object
# codes*: seq[Blob]
GetByteCodes* = object
leftOver*: seq[NodeKey]
kvPairs*: seq[(Nodekey,Blob)]
const
emptyBlob = seq[byte].default
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc getByteCodesReq(
buddy: SnapBuddyRef;
keys: seq[Hash256];
): Future[Result[Option[SnapByteCodes],void]]
{.async.} =
let
peer = buddy.peer
try:
let reply = await peer.getByteCodes(keys, snapRequestBytesLimit)
return ok(reply)
except CatchableError as e:
trace trSnapRecvError & "waiting for GetByteCodes reply", peer,
error=e.msg
return err()
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc getByteCodes*(
buddy: SnapBuddyRef;
keys: seq[NodeKey],
): Future[Result[GetByteCodes,ComError]]
{.async.} =
## Fetch data using the `snap#` protocol, returns the byte codes requested
## (if any.)
let
peer = buddy.peer
nKeys = keys.len
if nKeys == 0:
return err(ComEmptyRequestArguments)
if trSnapTracePacketsOk:
trace trSnapSendSending & "GetByteCodes", peer,
nkeys, bytesLimit=snapRequestBytesLimit
let byteCodes = block:
let rc = await buddy.getByteCodesReq(
keys.mapIt(Hash256(data: it.ByteArray32)))
if rc.isErr:
return err(ComNetworkProblem)
if rc.value.isNone:
trace trSnapRecvTimeoutWaiting & "for reply to GetByteCodes", peer, nKeys
return err(ComResponseTimeout)
let blobs = rc.value.get.codes
if nKeys < blobs.len:
# Ooops, makes no sense
return err(ComTooManyByteCodes)
blobs
let
nCodes = byteCodes.len
if nCodes == 0:
# github.com/ethereum/devp2p/blob/master/caps/snap.md#getbytecodes-0x04
#
# Notes:
# * Nodes must always respond to the query.
# * The returned codes must be in the request order.
# * The responding node is allowed to return less data than requested
# (serving QoS limits), but the node must return at least one bytecode,
# unless none requested are available, in which case it must answer with
# an empty response.
# * If a bytecode is unavailable, the node must skip that slot and proceed
# to the next one. The node must not return nil or other placeholders.
trace trSnapRecvReceived & "empty ByteCodes", peer, nKeys, nCodes
return err(ComNoByteCodesAvailable)
# Assemble return value
var dd: GetByteCodes
for n in 0 ..< nCodes:
if byteCodes[n].len == 0:
dd.leftOver.add keys[n]
else:
dd.kvPairs.add (keys[n], byteCodes[n])
dd.leftOver.add keys[byteCodes.len+1 ..< nKeys]
trace trSnapRecvReceived & "ByteCodes", peer,
nKeys, nCodes, nLeftOver=dd.leftOver.len
return ok(dd)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,34 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
type
ComError* = enum
ComNothingSerious
ComAccountsMaxTooLarge
ComAccountsMinTooSmall
ComEmptyAccountsArguments
ComEmptyRequestArguments
ComMissingProof
ComNetworkProblem
ComNoAccountsForStateRoot
ComNoByteCodesAvailable
ComNoDataForProof
ComNoStorageForAccounts
ComNoTrieNodesAvailable
ComResponseTimeout
ComTooManyByteCodes
ComTooManyStorageSlots
ComTooManyTrieNodes
# Other errors not directly related to communication
ComInspectDbFailed
ComImportAccountsFailed
# End

View File

@ -14,8 +14,9 @@ import
chronos,
eth/[common/eth_types, p2p],
stew/interval_set,
"../.."/[protocol, protocol/trace_config],
".."/[range_desc, worker_desc]
"../../.."/[protocol, protocol/trace_config],
"../.."/[range_desc, worker_desc],
./get_error
{.push raises: [Defect].}
@ -23,14 +24,6 @@ logScope:
topics = "snap-fetch"
type
GetStorageRangesError* = enum
GsreNothingSerious
GsreEmptyAccountsArguments
GsreNoStorageForAccounts
GsreTooManyStorageSlots
GsreNetworkProblem
GsreResponseTimeout
# SnapStorage* = object
# slotHash*: Hash256
# slotData*: Blob
@ -40,7 +33,7 @@ type
# proof*: SnapStorageProof
GetStorageRanges* = object
leftOver*: SnapSlotQueueItemRef
leftOver*: seq[SnapSlotQueueItemRef]
data*: AccountStorageRange
const
@ -85,11 +78,24 @@ proc getStorageRangesReq(
# Public functions
# ------------------------------------------------------------------------------
proc addLeftOver*(dd: var GetStorageRanges; accounts: seq[AccountSlotsHeader]) =
## Helper for maintaining the `leftOver` queue
if 0 < accounts.len:
if accounts[0].firstSlot != Hash256() or dd.leftOver.len == 0:
dd.leftOver.add SnapSlotQueueItemRef(q: accounts)
else:
dd.leftOver[^1].q = dd.leftOver[^1].q & accounts
proc addLeftOver*(dd: var GetStorageRanges; account: AccountSlotsHeader) =
## Variant of `addLeftOver()`
dd.addLeftOver @[account]
proc getStorageRanges*(
buddy: SnapBuddyRef;
stateRoot: Hash256;
accounts: seq[AccountSlotsHeader],
): Future[Result[GetStorageRanges,GetStorageRangesError]]
): Future[Result[GetStorageRanges,ComError]]
{.async.} =
## Fetch data using the `snap#` protocol, returns the range covered.
##
@ -104,8 +110,8 @@ proc getStorageRanges*(
maybeIv = none(LeafRange)
if nAccounts == 0:
return err(GsreEmptyAccountsArguments)
if accounts[0].firstSlot != Hash256.default:
return err(ComEmptyAccountsArguments)
if accounts[0].firstSlot != Hash256():
# Set up for range
maybeIv = some(LeafRange.new(
accounts[0].firstSlot.to(NodeTag), high(NodeTag)))
@ -114,29 +120,25 @@ proc getStorageRanges*(
trace trSnapSendSending & "GetStorageRanges", peer,
nAccounts, stateRoot, bytesLimit=snapRequestBytesLimit
var dd = block:
let snStoRanges = block:
let rc = await buddy.getStorageRangesReq(
stateRoot, accounts.mapIt(it.accHash), maybeIv)
if rc.isErr:
return err(GsreNetworkProblem)
return err(ComNetworkProblem)
if rc.value.isNone:
trace trSnapRecvTimeoutWaiting & "for reply to GetStorageRanges", peer
return err(GsreResponseTimeout)
let snStoRanges = rc.value.get
if nAccounts < snStoRanges.slots.len:
trace trSnapRecvTimeoutWaiting & "for reply to GetStorageRanges", peer,
nAccounts
return err(ComResponseTimeout)
if nAccounts < rc.value.get.slots.len:
# Ooops, makes no sense
return err(GsreTooManyStorageSlots)
GetStorageRanges(
data: AccountStorageRange(
proof: snStoRanges.proof,
storages: snStoRanges.slots.mapIt(
AccountSlots(
data: it))))
let
nStorages = dd.data.storages.len
nProof = dd.data.proof.len
return err(ComTooManyStorageSlots)
rc.value.get
if nStorages == 0:
let
nSlots = snStoRanges.slots.len
nProof = snStoRanges.proof.len
if nSlots == 0:
# github.com/ethereum/devp2p/blob/master/caps/snap.md#getstorageranges-0x02:
#
# Notes:
@ -146,31 +148,48 @@ proc getStorageRanges*(
# the responsibility of the caller to query an state not older than 128
# blocks; and the caller is expected to only ever query existing accounts.
trace trSnapRecvReceived & "empty StorageRanges", peer,
nAccounts, nStorages, nProof, stateRoot
return err(GsreNoStorageForAccounts)
nAccounts, nSlots, nProof, stateRoot, firstAccount=accounts[0].accHash
return err(ComNoStorageForAccounts)
# Complete response data
for n in 0 ..< nStorages:
dd.data.storages[n].account = accounts[n]
# Assemble return structure for given peer response
var dd = GetStorageRanges(data: AccountStorageRange(proof: snStoRanges.proof))
# Calculate what was not fetched
# Filter `slots` responses:
# * Accounts for empty ones go back to the `leftOver` list.
for n in 0 ..< nSlots:
# Empty data for a slot indicates missing data
if snStoRanges.slots[n].len == 0:
dd.addLeftOver accounts[n]
else:
dd.data.storages.add AccountSlots(
account: accounts[n], # known to be no fewer accounts than slots
data: snStoRanges.slots[n])
# Complete the part that was not answered by the peer
if nProof == 0:
dd.leftOver = SnapSlotQueueItemRef(q: accounts[nStorages ..< nAccounts])
dd.addLeftOver accounts[nSlots ..< nAccounts] # empty slice is ok
else:
if snStoRanges.slots[^1].len == 0:
# `dd.data.storages.len == 0` => `snStoRanges.slots[^1].len == 0` as
# it was confirmed that `0 < nSlots == snStoRanges.slots.len`
return err(ComNoDataForProof)
# If the storage data for the last account comes with a proof, then it is
# incomplete. So record the missing part on the `dd.leftOver` list.
let top = dd.data.storages[^1].data[^1].slotHash.to(NodeTag)
# Contrived situation with `top==high()`: any right proof will be useless
# so it is just ignored (i.e. `firstSlot` is zero in first slice.)
if top < high(NodeTag):
dd.leftOver = SnapSlotQueueItemRef(q: accounts[nStorages-1 ..< nAccounts])
dd.leftOver.q[0].firstSlot = (top + 1.u256).to(Hash256)
else:
# Contrived situation: the proof would be useless
dd.leftOver = SnapSlotQueueItemRef(q: accounts[nStorages ..< nAccounts])
# Notice that `dd.leftOver.len < nAccounts` as 0 < nStorages
dd.addLeftOver AccountSlotsHeader(
firstSlot: (top + 1.u256).to(Hash256),
accHash: accounts[nSlots-1].accHash,
storageRoot: accounts[nSlots-1].storageRoot)
dd.addLeftOver accounts[nSlots ..< nAccounts] # empty slice is ok
let nLeftOver = dd.leftOver.foldl(a + b.q.len, 0)
trace trSnapRecvReceived & "StorageRanges", peer,
nAccounts, nStorages, nProof, nLeftOver=dd.leftOver.q.len, stateRoot
nAccounts, nSlots, nProof, nLeftOver, stateRoot
return ok(dd)

View File

@ -0,0 +1,152 @@
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
std/[options, sequtils],
chronos,
eth/[common/eth_types, p2p],
"../../.."/[protocol, protocol/trace_config],
../../worker_desc,
./get_error
{.push raises: [Defect].}
logScope:
topics = "snap-fetch"
type
# SnapTrieNodes = object
# nodes*: seq[Blob]
GetTrieNodes* = object
leftOver*: seq[seq[Blob]]
nodes*: seq[Blob]
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc getTrieNodesReq(
buddy: SnapBuddyRef;
stateRoot: Hash256;
paths: seq[seq[Blob]];
): Future[Result[Option[SnapTrieNodes],void]]
{.async.} =
let
peer = buddy.peer
try:
let reply = await peer.getTrieNodes(stateRoot, paths, snapRequestBytesLimit)
return ok(reply)
except CatchableError as e:
trace trSnapRecvError & "waiting for GetByteCodes reply", peer,
error=e.msg
return err()
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc getTrieNodes*(
buddy: SnapBuddyRef;
stateRoot: Hash256;
paths: seq[seq[Blob]],
): Future[Result[GetTrieNodes,ComError]]
{.async.} =
## Fetch data using the `snap#` protocol, returns the trie nodes requested
## (if any.)
let
peer = buddy.peer
nPaths = paths.len
if nPaths == 0:
return err(ComEmptyRequestArguments)
let nTotal = paths.mapIt(it.len).foldl(a+b, 0)
if trSnapTracePacketsOk:
trace trSnapSendSending & "GetTrieNodes", peer,
nPaths, nTotal, bytesLimit=snapRequestBytesLimit
let trieNodes = block:
let rc = await buddy.getTrieNodesReq(stateRoot, paths)
if rc.isErr:
return err(ComNetworkProblem)
if rc.value.isNone:
trace trSnapRecvTimeoutWaiting & "for reply to GetTrieNodes", peer, nPaths
return err(ComResponseTimeout)
let blobs = rc.value.get.nodes
if nTotal < blobs.len:
# Ooops, makes no sense
return err(ComTooManyTrieNodes)
blobs
let
nNodes = trieNodes.len
if nNodes == 0:
# github.com/ethereum/devp2p/blob/master/caps/snap.md#gettrienodes-0x06
#
# Notes:
# * Nodes must always respond to the query.
# * The returned nodes must be in the request order.
# * If the node does not have the state for the requested state root or for
# any requested account paths, it must return an empty reply. It is the
# responsibility of the caller to query an state not older than 128
# blocks; and the caller is expected to only ever query existing trie
# nodes.
# * The responding node is allowed to return less data than requested
# (serving QoS limits), but the node must return at least one trie node.
trace trSnapRecvReceived & "empty TrieNodes", peer, nPaths, nNodes
return err(ComNoByteCodesAvailable)
# Assemble return value
var dd = GetTrieNodes(nodes: trieNodes)
# For each request group/sub-sequence, analyse the results
var nInx = 0
block loop:
for n in 0 ..< nPaths:
let pathLen = paths[n].len
# Account node request
if pathLen < 2:
if trieNodes[nInx].len == 0:
dd.leftOver.add paths[n]
nInx.inc
if nInx < nNodes:
continue
# all the rest needs to be re-processed
dd.leftOver = dd.leftOver & paths[n+1 ..< nPaths]
break loop
# Storage request for account
if 1 < pathLen:
var pushBack: seq[Blob]
for i in 1 ..< pathLen:
if trieNodes[nInx].len == 0:
pushBack.add paths[n][i]
nInx.inc
if nInx < nNodes:
continue
# all the rest needs to be re-processed
#
# add: account & pushBack & rest ...
dd.leftOver.add paths[n][0] & pushBack & paths[n][i+1 ..< pathLen]
dd.leftOver = dd.leftOver & paths[n+1 ..< nPaths]
break loop
if 0 < pushBack.len:
dd.leftOver.add paths[n][0] & pushBack
trace trSnapRecvReceived & "TrieNodes", peer,
nPaths, nNodes, nLeftOver=dd.leftOver.len
return ok(dd)
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -33,12 +33,12 @@ proc to(tag: NodeTag; T: type RepairKey): T =
tag.to(NodeKey).to(RepairKey)
proc convertTo(key: RepairKey; T: type NodeKey): T =
if key.isNodeKey:
discard result.init(key.ByteArray33[1 .. 32])
## Might be lossy, check before use
discard result.init(key.ByteArray33[1 .. 32])
proc convertTo(key: RepairKey; T: type NodeTag): T =
if key.isNodeKey:
result = UInt256.fromBytesBE(key.ByteArray33[1 .. 32]).T
## Might be lossy, check before use
UInt256.fromBytesBE(key.ByteArray33[1 .. 32]).T
# ------------------------------------------------------------------------------
# Private helpers for bulk load testing
@ -80,7 +80,7 @@ proc bulkStorageClearRockyCacheFile*(rocky: RocksStoreRef): bool =
# ------------------------------------------------------------------------------
proc bulkStorageAccounts*(
db: HexaryTreeDB;
db: HexaryTreeDbRef;
base: TrieDatabaseRef
): Result[void,HexaryDbError] =
## Bulk load using transactional `put()`
@ -96,7 +96,7 @@ proc bulkStorageAccounts*(
ok()
proc bulkStorageStorages*(
db: HexaryTreeDB;
db: HexaryTreeDbRef;
base: TrieDatabaseRef
): Result[void,HexaryDbError] =
## Bulk load using transactional `put()`
@ -113,7 +113,7 @@ proc bulkStorageStorages*(
proc bulkStorageAccountsRocky*(
db: HexaryTreeDB;
db: HexaryTreeDbRef;
rocky: RocksStoreRef
): Result[void,HexaryDbError]
{.gcsafe, raises: [Defect,OSError,KeyError].} =
@ -162,7 +162,7 @@ proc bulkStorageAccountsRocky*(
proc bulkStorageStoragesRocky*(
db: HexaryTreeDB;
db: HexaryTreeDbRef;
rocky: RocksStoreRef
): Result[void,HexaryDbError]
{.gcsafe, raises: [Defect,OSError,KeyError].} =

View File

@ -16,19 +16,25 @@ type
AccountSmallerThanBase
AccountsNotSrictlyIncreasing
AccountRangesOverlap
AccountRepairBlocked
RlpEncoding
SlotsNotSrictlyIncreasing
LoopAlert
# import
DifferentNodeValueExists
InternalDbInconsistency
RightBoundaryProofFailed
ExpectedNodeKeyDiffers
Rlp2Or17ListEntries
RlpBlobExpected
RlpBranchLinkExpected
RlpEncoding
RlpExtPathEncoding
RlpNonEmptyBlobExpected
# interpolate
AccountRepairBlocked
InternalDbInconsistency
RightBoundaryProofFailed
RootNodeMismatch
RootNodeMissing
SlotsNotSrictlyIncreasing
# bulk storage
AddBulkItemFailed

View File

@ -9,7 +9,7 @@
# except according to those terms.
import
std/[algorithm, hashes, sequtils, strutils, tables],
std/[algorithm, hashes, sequtils, sets, strutils, tables],
eth/[common/eth_types, p2p, trie/nibbles],
stint,
../../range_desc
@ -20,15 +20,9 @@ type
HexaryPpFn* = proc(key: RepairKey): string {.gcsafe.}
## For testing/debugging: key pretty printer function
ByteArray32* = array[32,byte]
## Used for 32 byte database keys
ByteArray33* = array[33,byte]
## Used for 31 byte database keys, i.e. <marker> + <32-byte-key>
NodeKey* = distinct ByteArray32
## Hash key without the hash wrapper
RepairKey* = distinct ByteArray33
## Byte prefixed `NodeKey` for internal DB records
@ -139,11 +133,22 @@ type
nodeKey*: RepairKey ## Leaf hash into hexary repair table
payload*: Blob ## Data payload
HexaryTreeDB* = object
TrieNodeStat* = object
## Trie inspection report
stoppedAt*: int ## Potential loop dedected if positive
dangling*: seq[Blob] ## Paths from nodes with incomplete refs
HexaryTreeDbRef* = ref object
## Hexary trie plus helper structures
tab*: Table[RepairKey,RNodeRef] ## key-value trie table, in-memory db
repairKeyGen*: uint64 ## Unique tmp key generator
keyPp*: HexaryPpFn ## For debugging, might go away
HexaryGetFn* = proc(key: Blob): Blob {.gcsafe.}
## Persistent database get() function. For read-only cacses, this function
## can be seen as the persistent alternative to `HexaryTreeDbRef`.
const
EmptyNodeBlob* = seq[byte].default
EmptyNibbleRange* = EmptyNodeBlob.initNibbleRange
@ -161,18 +166,9 @@ var
proc initImpl(key: var RepairKey; data: openArray[byte]): bool =
key.reset
if data.len <= 33:
if 0 < data.len:
let trg = addr key.ByteArray33[33 - data.len]
trg.copyMem(unsafeAddr data[0], data.len)
return true
proc initImpl(key: var NodeKey; data: openArray[byte]): bool =
key.reset
if data.len <= 32:
if 0 < data.len:
let trg = addr key.ByteArray32[32 - data.len]
trg.copyMem(unsafeAddr data[0], data.len)
if 0 < data.len and data.len <= 33:
let trg = addr key.ByteArray33[33 - data.len]
trg.copyMem(unsafeAddr data[0], data.len)
return true
# ------------------------------------------------------------------------------
@ -196,7 +192,7 @@ proc ppImpl(s: string; hex = false): string =
(if (s.len and 1) == 0: s[0 ..< 8] else: "0" & s[0 ..< 7]) &
"..(" & $s.len & ").." & s[s.len-16 ..< s.len]
proc ppImpl(key: RepairKey; db: HexaryTreeDB): string =
proc ppImpl(key: RepairKey; db: HexaryTreeDbRef): string =
try:
if not disablePrettyKeys and not db.keyPp.isNil:
return db.keyPp(key)
@ -204,13 +200,13 @@ proc ppImpl(key: RepairKey; db: HexaryTreeDB): string =
discard
key.ByteArray33.toSeq.mapIt(it.toHex(2)).join.toLowerAscii
proc ppImpl(key: NodeKey; db: HexaryTreeDB): string =
proc ppImpl(key: NodeKey; db: HexaryTreeDbRef): string =
key.to(RepairKey).ppImpl(db)
proc ppImpl(w: openArray[RepairKey]; db: HexaryTreeDB): string =
proc ppImpl(w: openArray[RepairKey]; db: HexaryTreeDbRef): string =
w.mapIt(it.ppImpl(db)).join(",")
proc ppImpl(w: openArray[Blob]; db: HexaryTreeDB): string =
proc ppImpl(w: openArray[Blob]; db: HexaryTreeDbRef): string =
var q: seq[RepairKey]
for a in w:
var key: RepairKey
@ -222,7 +218,7 @@ proc ppStr(blob: Blob): string =
if blob.len == 0: ""
else: blob.mapIt(it.toHex(2)).join.toLowerAscii.ppImpl(hex = true)
proc ppImpl(n: RNodeRef; db: HexaryTreeDB): string =
proc ppImpl(n: RNodeRef; db: HexaryTreeDbRef): string =
let so = n.state.ord
case n.kind:
of Leaf:
@ -232,7 +228,7 @@ proc ppImpl(n: RNodeRef; db: HexaryTreeDB): string =
of Branch:
["b","þ","B","R"][so] & "(" & n.bLink.ppImpl(db) & "," & n.bData.ppStr & ")"
proc ppImpl(n: XNodeObj; db: HexaryTreeDB): string =
proc ppImpl(n: XNodeObj; db: HexaryTreeDbRef): string =
case n.kind:
of Leaf:
"l(" & $n.lPfx & "," & n.lData.ppStr & ")"
@ -243,19 +239,19 @@ proc ppImpl(n: XNodeObj; db: HexaryTreeDB): string =
of Branch:
"b(" & n.bLink[0..15].ppImpl(db) & "," & n.bLink[16].ppStr & ")"
proc ppImpl(w: RPathStep; db: HexaryTreeDB): string =
proc ppImpl(w: RPathStep; db: HexaryTreeDbRef): string =
let
nibble = if 0 <= w.nibble: w.nibble.toHex(1).toLowerAscii else: "ø"
key = w.key.ppImpl(db)
"(" & key & "," & nibble & "," & w.node.ppImpl(db) & ")"
proc ppImpl(w: XPathStep; db: HexaryTreeDB): string =
proc ppImpl(w: XPathStep; db: HexaryTreeDbRef): string =
let nibble = if 0 <= w.nibble: w.nibble.toHex(1).toLowerAscii else: "ø"
var key: RepairKey
discard key.initImpl(w.key)
"(" & key.ppImpl(db) & "," & $nibble & "," & w.node.ppImpl(db) & ")"
proc ppImpl(db: HexaryTreeDB; root: NodeKey): seq[string] =
proc ppImpl(db: HexaryTreeDbRef; root: NodeKey): seq[string] =
## Dump the entries from the a generic repair tree. This function assumes
## that mapped keys are printed `$###` if a node is locked or static, and
## some substitute for the first letter `$` otherwise (if they are mutable.)
@ -280,6 +276,14 @@ proc ppImpl(db: HexaryTreeDB; root: NodeKey): seq[string] =
except Exception as e:
result &= " ! Ooops ppImpl(): name=" & $e.name & " msg=" & e.msg
proc ppDangling(a: seq[Blob]; maxItems = 30): string =
proc ppBlob(w: Blob): string =
w.mapIt(it.toHex(2)).join.toLowerAscii
let
q = a.toSeq.mapIt(it.ppBlob)[0 ..< min(maxItems,a.len)]
andMore = if maxItems < a.len: ", ..[#" & $a.len & "].." else: ""
"{" & q.join(",") & andMore & "}"
# ------------------------------------------------------------------------------
# Public debugging helpers
# ------------------------------------------------------------------------------
@ -299,40 +303,43 @@ proc pp*(key: NodeKey): string =
## Raw key, for referenced key dump use `key.pp(db)` below
key.ByteArray32.toSeq.mapIt(it.toHex(2)).join.tolowerAscii
proc pp*(key: NodeKey|RepairKey; db: HexaryTreeDB): string =
proc pp*(key: NodeKey|RepairKey; db: HexaryTreeDbRef): string =
key.ppImpl(db)
proc pp*(w: RNodeRef|XNodeObj|RPathStep; db: HexaryTreeDB): string =
proc pp*(w: RNodeRef|XNodeObj|RPathStep; db: HexaryTreeDbRef): string =
w.ppImpl(db)
proc pp*(w:openArray[RPathStep|XPathStep]; db:HexaryTreeDB; indent=4): string =
proc pp*(w:openArray[RPathStep|XPathStep];db:HexaryTreeDbRef;indent=4): string =
w.toSeq.mapIt(it.ppImpl(db)).join(indent.toPfx)
proc pp*(w: RPath; db: HexaryTreeDB; indent=4): string =
proc pp*(w: RPath; db: HexaryTreeDbRef; indent=4): string =
w.path.pp(db,indent) & indent.toPfx & "(" & $w.tail & ")"
proc pp*(w: XPath; db: HexaryTreeDB; indent=4): string =
proc pp*(w: XPath; db: HexaryTreeDbRef; indent=4): string =
w.path.pp(db,indent) & indent.toPfx & "(" & $w.tail & "," & $w.depth & ")"
proc pp*(db: HexaryTreeDB; root: NodeKey; indent=4): string =
proc pp*(db: HexaryTreeDbRef; root: NodeKey; indent=4): string =
## Dump the entries from the a generic repair tree.
db.ppImpl(root).join(indent.toPfx)
proc pp*(db: HexaryTreeDB; indent=4): string =
proc pp*(db: HexaryTreeDbRef; indent=4): string =
## varinat of `pp()` above
db.ppImpl(NodeKey.default).join(indent.toPfx)
proc pp*(a: TrieNodeStat; db: HexaryTreeDbRef; maxItems = 30): string =
"(" &
$a.stoppedAt & "," &
$a.dangling.len & "," &
a.dangling.ppDangling(maxItems) & ")"
# ------------------------------------------------------------------------------
# Public constructor (or similar)
# ------------------------------------------------------------------------------
proc init*(key: var NodeKey; data: openArray[byte]): bool =
key.initImpl(data)
proc init*(key: var RepairKey; data: openArray[byte]): bool =
key.initImpl(data)
proc newRepairKey*(db: var HexaryTreeDB): RepairKey =
proc newRepairKey*(db: HexaryTreeDbRef): RepairKey =
db.repairKeyGen.inc
var src = db.repairKeyGen.toBytesBE
(addr result.ByteArray33[25]).copyMem(addr src[0], 8)
@ -358,18 +365,12 @@ proc `==`*(a, b: RepairKey): bool =
## Tables mixin
a.ByteArray33 == b.ByteArray33
proc to*(tag: NodeTag; T: type NodeKey): T =
tag.UInt256.toBytesBE.T
proc to*(key: NodeKey; T: type NibblesSeq): T =
key.ByteArray32.initNibbleRange
proc to*(key: NodeKey; T: type RepairKey): T =
(addr result.ByteArray33[1]).copyMem(unsafeAddr key.ByteArray32[0], 32)
proc to*(hash: Hash256; T: type NodeKey): T =
hash.data.NodeKey
proc isZero*[T: NodeTag|NodeKey|RepairKey](a: T): bool =
a == T.default
@ -379,10 +380,14 @@ proc isNodeKey*(a: RepairKey): bool =
proc digestTo*(data: Blob; T: type NodeKey): T =
keccakHash(data).data.T
proc convertTo*[W: NodeKey|RepairKey](data: Blob; T: type W): T =
proc convertTo*(data: Blob; T: type NodeKey): T =
## Probably lossy conversion, use `init()` for safe conversion
discard result.init(data)
proc convertTo*(data: Blob; T: type RepairKey): T =
## Probably lossy conversion, use `init()` for safe conversion
discard result.initImpl(data)
proc convertTo*(node: RNodeRef; T: type Blob): T =
## Write the node as an RLP-encoded blob
var writer = initRlpWriter()

View File

@ -12,6 +12,7 @@ import
std/[sequtils, sets, strutils, tables],
eth/[common/eth_types_rlp, trie/nibbles],
stew/results,
../../range_desc,
"."/[hexary_defs, hexary_desc]
{.push raises: [Defect].}
@ -28,7 +29,7 @@ proc pp(q: openArray[byte]): string =
# ------------------------------------------------------------------------------
proc hexaryImport*(
db: var HexaryTreeDB; ## Contains node table
db: HexaryTreeDbRef; ## Contains node table
recData: Blob; ## Node to add
unrefNodes: var HashSet[RepairKey]; ## Keep track of freestanding nodes
nodeRefs: var HashSet[RepairKey]; ## Ditto
@ -121,6 +122,88 @@ proc hexaryImport*(
ok()
proc hexaryImport*(
db: HexaryTreeDbRef; ## Contains node table
recData: Blob; ## Node to add
): Result[void,HexaryDbError]
{.gcsafe, raises: [Defect, RlpError, KeyError].} =
## Ditto without referece checks
let
nodeKey = recData.digestTo(NodeKey)
repairKey = nodeKey.to(RepairKey) # for repair table
var
rlp = recData.rlpFromBytes
blobs = newSeq[Blob](2) # temporary, cache
links: array[16,RepairKey] # reconstruct branch node
blob16: Blob # reconstruct branch node
top = 0 # count entries
rNode: RNodeRef # repair tree node
# Collect lists of either 2 or 17 blob entries.
for w in rlp.items:
case top
of 0, 1:
if not w.isBlob:
return err(RlpBlobExpected)
blobs[top] = rlp.read(Blob)
of 2 .. 15:
var key: NodeKey
if not key.init(rlp.read(Blob)):
return err(RlpBranchLinkExpected)
# Update ref pool
links[top] = key.to(RepairKey)
of 16:
if not w.isBlob:
return err(RlpBlobExpected)
blob16 = rlp.read(Blob)
else:
return err(Rlp2Or17ListEntries)
top.inc
# Verify extension data
case top
of 2:
if blobs[0].len == 0:
return err(RlpNonEmptyBlobExpected)
let (isLeaf, pathSegment) = hexPrefixDecode blobs[0]
if isLeaf:
rNode = RNodeRef(
kind: Leaf,
lPfx: pathSegment,
lData: blobs[1])
else:
var key: NodeKey
if not key.init(blobs[1]):
return err(RlpExtPathEncoding)
# Update ref pool
rNode = RNodeRef(
kind: Extension,
ePfx: pathSegment,
eLink: key.to(RepairKey))
of 17:
for n in [0,1]:
var key: NodeKey
if not key.init(blobs[n]):
return err(RlpBranchLinkExpected)
# Update ref pool
links[n] = key.to(RepairKey)
rNode = RNodeRef(
kind: Branch,
bLink: links,
bData: blob16)
else:
discard
# Add to database
if not db.tab.hasKey(repairKey):
db.tab[repairKey] = rNode
elif db.tab[repairKey].convertTo(Blob) != recData:
return err(DifferentNodeValueExists)
ok()
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -0,0 +1,315 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
std/[hashes, sequtils, sets, tables],
eth/[common/eth_types_rlp, trie/nibbles],
stew/results,
../../range_desc,
"."/[hexary_desc, hexary_paths]
{.push raises: [Defect].}
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc convertTo(key: RepairKey; T: type NodeKey): T =
## Might be lossy, check before use
discard result.init(key.ByteArray33[1 .. 32])
proc convertTo(key: Blob; T: type NodeKey): T =
## Might be lossy, check before use
discard result.init(key)
proc doStepLink(step: RPathStep): Result[RepairKey,bool] =
## Helper for `hexaryInspectPath()` variant
case step.node.kind:
of Branch:
if step.nibble < 0:
return err(false) # indicates caller should try parent
return ok(step.node.bLink[step.nibble])
of Extension:
return ok(step.node.eLink)
of Leaf:
discard
err(true) # fully fail
proc doStepLink(step: XPathStep): Result[NodeKey,bool] =
## Helper for `hexaryInspectPath()` variant
case step.node.kind:
of Branch:
if step.nibble < 0:
return err(false) # indicates caller should try parent
return ok(step.node.bLink[step.nibble].convertTo(NodeKey))
of Extension:
return ok(step.node.eLink.convertTo(NodeKey))
of Leaf:
discard
err(true) # fully fail
proc hexaryInspectPath(
db: HexaryTreeDbRef; ## Database
rootKey: RepairKey; ## State root
path: NibblesSeq; ## Starting path
): Result[RepairKey,void]
{.gcsafe, raises: [Defect,KeyError]} =
## Translate `path` into `RepairKey`
let steps = path.hexaryPath(rootKey,db)
if 0 < steps.path.len and steps.tail.len == 0:
block:
let rc = steps.path[^1].doStepLink()
if rc.isOk:
return ok(rc.value)
if rc.error or steps.path.len == 1:
return err()
block:
let rc = steps.path[^2].doStepLink()
if rc.isOk:
return ok(rc.value)
err()
proc hexaryInspectPath(
getFn: HexaryGetFn; ## Database retrival function
root: NodeKey; ## State root
path: NibblesSeq; ## Starting path
): Result[NodeKey,void]
{.gcsafe, raises: [Defect,RlpError]} =
## Translate `path` into `RepairKey`
let steps = path.hexaryPath(root,getFn)
if 0 < steps.path.len and steps.tail.len == 0:
block:
let rc = steps.path[^1].doStepLink()
if rc.isOk:
return ok(rc.value)
if rc.error or steps.path.len == 1:
return err()
block:
let rc = steps.path[^2].doStepLink()
if rc.isOk:
return ok(rc.value)
err()
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc processLink(
db: HexaryTreeDbRef;
stats: var TrieNodeStat;
inspect: TableRef[RepairKey,NibblesSeq];
parent: NodeKey;
trail: NibblesSeq;
child: RepairKey;
) {.gcsafe, raises: [Defect,KeyError]} =
## Helper for `hexaryInspect()`
if not child.isZero:
if not child.isNodeKey:
# Oops -- caught in the middle of a repair process? Just register
# this node
stats.dangling.add trail.hexPrefixEncode(isLeaf = false)
elif db.tab.hasKey(child):
inspect[child] = trail
else:
stats.dangling.add trail.hexPrefixEncode(isLeaf = false)
proc processLink(
getFn: HexaryGetFn;
stats: var TrieNodeStat;
inspect: TableRef[NodeKey,NibblesSeq];
parent: NodeKey;
trail: NibblesSeq;
child: Rlp;
) {.gcsafe, raises: [Defect,RlpError,KeyError]} =
## Ditto
if not child.isEmpty:
let
#parentKey = parent.convertTo(NodeKey)
childBlob = child.toBytes
if childBlob.len != 32:
# Oops -- that is wrong, although the only sensible action is to
# register the node and otherwise ignore it
stats.dangling.add trail.hexPrefixEncode(isLeaf = false)
else:
let childKey = childBlob.convertTo(NodeKey)
if 0 < child.toBytes.getFn().len:
inspect[childKey] = trail
else:
stats.dangling.add trail.hexPrefixEncode(isLeaf = false)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc hexaryInspectPath*(
db: HexaryTreeDbRef; ## Database
root: NodeKey; ## State root
path: Blob; ## Starting path
): Result[NodeKey,void]
{.gcsafe, raises: [Defect,KeyError]} =
## Returns the `NodeKey` for a given path if there is any.
let (isLeaf,nibbles) = hexPrefixDecode path
if not isLeaf:
let rc = db.hexaryInspectPath(root.to(RepairKey), nibbles)
if rc.isOk and rc.value.isNodeKey:
return ok(rc.value.convertTo(NodeKey))
err()
proc hexaryInspectToKeys*(
db: HexaryTreeDbRef; ## Database
root: NodeKey; ## State root
paths: seq[Blob]; ## Paths segments
): HashSet[NodeKey]
{.gcsafe, raises: [Defect,KeyError]} =
## Convert a set of path segments to a node key set
paths.toSeq
.mapIt(db.hexaryInspectPath(root,it))
.filterIt(it.isOk)
.mapIt(it.value)
.toHashSet
proc hexaryInspectTrie*(
db: HexaryTreeDbRef; ## Database
root: NodeKey; ## State root
paths: seq[Blob]; ## Starting paths for search
stopAtLevel = 32; ## Instead of loop detector
): TrieNodeStat
{.gcsafe, raises: [Defect,KeyError]} =
## Starting with the argument list `paths`, find all the non-leaf nodes in
## the hexary trie which have at least one node key reference missing in
## the trie database.
let rootKey = root.to(RepairKey)
if not db.tab.hasKey(rootKey):
return TrieNodeStat()
var
reVisit = newTable[RepairKey,NibblesSeq]()
rcValue: TrieNodeStat
level = 0
# Initialise TODO list
if paths.len == 0:
reVisit[rootKey] = EmptyNibbleRange
else:
for w in paths:
let (isLeaf,nibbles) = hexPrefixDecode w
if not isLeaf:
let rc = db.hexaryInspectPath(rootKey, nibbles)
if rc.isOk:
reVisit[rc.value] = nibbles
while 0 < reVisit.len:
if stopAtLevel < level:
rcValue.stoppedAt = level
break
let again = newTable[RepairKey,NibblesSeq]()
for rKey,parentTrail in reVisit.pairs:
let
node = db.tab[rKey]
parent = rKey.convertTo(NodeKey)
case node.kind:
of Extension:
let
trail = parentTrail & node.ePfx
child = node.eLink
db.processLink(stats=rcValue, inspect=again, parent, trail, child)
of Branch:
for n in 0 ..< 16:
let
trail = parentTrail & @[n.byte].initNibbleRange.slice(1)
child = node.bLink[n]
db.processLink(stats=rcValue, inspect=again, parent, trail, child)
of Leaf:
# Done with this link, forget the key
discard
# End `for`
level.inc
reVisit = again
# End while
return rcValue
proc hexaryInspectTrie*(
getFn: HexaryGetFn;
root: NodeKey; ## State root
paths: seq[Blob]; ## Starting paths for search
stopAtLevel = 32; ## Instead of loop detector
): TrieNodeStat
{.gcsafe, raises: [Defect,RlpError,KeyError]} =
## Varianl of `hexaryInspectTrie()` for persistent database.
##
if root.to(Blob).getFn().len == 0:
return TrieNodeStat()
var
reVisit = newTable[NodeKey,NibblesSeq]()
rcValue: TrieNodeStat
level = 0
# Initialise TODO list
if paths.len == 0:
reVisit[root] = EmptyNibbleRange
else:
for w in paths:
let (isLeaf,nibbles) = hexPrefixDecode w
if not isLeaf:
let rc = getFn.hexaryInspectPath(root, nibbles)
if rc.isOk:
reVisit[rc.value] = nibbles
while 0 < reVisit.len:
if stopAtLevel < level:
rcValue.stoppedAt = level
break
let again = newTable[NodeKey,NibblesSeq]()
for parent,parentTrail in reVisit.pairs:
let nodeRlp = rlpFromBytes parent.to(Blob).getFn()
case nodeRlp.listLen:
of 2:
let (isLeaf,ePfx) = hexPrefixDecode nodeRlp.listElem(0).toBytes
if not isleaf:
let
trail = parentTrail & ePfx
child = nodeRlp.listElem(1)
getFn.processLink(stats=rcValue, inspect=again, parent, trail, child)
of 17:
for n in 0 ..< 16:
let
trail = parentTrail & @[n.byte].initNibbleRange.slice(1)
child = nodeRlp.listElem(n)
getFn.processLink(stats=rcValue, inspect=again, parent, trail, child)
else:
# Done with this link, forget the key
discard
# End `for`
level.inc
reVisit = again
# End while
return rcValue
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -15,7 +15,7 @@
## re-factored database layer.
import
std/[sequtils, strutils, tables],
std/[sequtils, sets, strutils, tables],
eth/[common/eth_types, trie/nibbles],
stew/results,
../../range_desc,
@ -34,14 +34,17 @@ type
# Private debugging helpers
# ------------------------------------------------------------------------------
proc pp(w: RPathXStep; db: var HexaryTreeDB): string =
proc pp(w: RPathXStep; db: HexaryTreeDbRef): string =
let y = if w.canLock: "lockOk" else: "noLock"
"(" & $w.pos & "," & y & "," & w.step.pp(db) & ")"
proc pp(w: seq[RPathXStep]; db: var HexaryTreeDB; indent = 4): string =
proc pp(w: seq[RPathXStep]; db: HexaryTreeDbRef; indent = 4): string =
let pfx = "\n" & " ".repeat(indent)
w.mapIt(it.pp(db)).join(pfx)
proc pp(rc: Result[TrieNodeStat, HexaryDbError]; db: HexaryTreeDbRef): string =
if rc.isErr: $rc.error else: rc.value.pp(db)
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
@ -53,7 +56,7 @@ proc dup(node: RNodeRef): RNodeRef =
proc hexaryPath(
tag: NodeTag;
root: NodeKey;
db: HexaryTreeDB;
db: HexaryTreeDbRef;
): RPath
{.gcsafe, raises: [Defect,KeyError].} =
## Shortcut
@ -104,7 +107,7 @@ proc `xData=`(node: RNodeRef; val: Blob) =
# ------------------------------------------------------------------------------
proc rTreeExtendLeaf(
db: var HexaryTreeDB;
db: HexaryTreeDbRef;
rPath: RPath;
key: RepairKey
): RPath =
@ -124,7 +127,7 @@ proc rTreeExtendLeaf(
tail: EmptyNibbleRange)
proc rTreeExtendLeaf(
db: var HexaryTreeDB;
db: HexaryTreeDbRef;
rPath: RPath;
key: RepairKey;
node: RNodeRef;
@ -140,7 +143,7 @@ proc rTreeExtendLeaf(
proc rTreeSplitNode(
db: var HexaryTreeDB;
db: HexaryTreeDbRef;
rPath: RPath;
key: RepairKey;
node: RNodeRef;
@ -207,7 +210,7 @@ proc rTreeSplitNode(
proc rTreeInterpolate(
rPath: RPath;
db: var HexaryTreeDB;
db: HexaryTreeDbRef;
): RPath
{.gcsafe, raises: [Defect,KeyError]} =
## Extend path, add missing nodes to tree. The last node added will be
@ -279,7 +282,7 @@ proc rTreeInterpolate(
proc rTreeInterpolate(
rPath: RPath;
db: var HexaryTreeDB;
db: HexaryTreeDbRef;
payload: Blob;
): RPath
{.gcsafe, raises: [Defect,KeyError]} =
@ -293,7 +296,7 @@ proc rTreeInterpolate(
proc rTreeUpdateKeys(
rPath: RPath;
db: var HexaryTreeDB;
db: HexaryTreeDbRef;
): Result[void,bool]
{.gcsafe, raises: [Defect,KeyError]} =
## The argument `rPath` is assumed to organise database nodes as
@ -431,7 +434,7 @@ proc rTreeUpdateKeys(
# ------------------------------------------------------------------------------
proc rTreeBranchAppendleaf(
db: var HexaryTreeDB;
db: HexaryTreeDbRef;
bNode: RNodeRef;
leaf: RLeafSpecs;
): bool =
@ -448,7 +451,7 @@ proc rTreeBranchAppendleaf(
return true
proc rTreePrefill(
db: var HexaryTreeDB;
db: HexaryTreeDbRef;
rootKey: NodeKey;
dbItems: var seq[RLeafSpecs];
) {.gcsafe, raises: [Defect,KeyError].} =
@ -469,7 +472,7 @@ proc rTreePrefill(
db.tab[rootKey.to(RepairKey)] = node
proc rTreeSquashRootNode(
db: var HexaryTreeDB;
db: HexaryTreeDbRef;
rootKey: NodeKey;
): RNodeRef
{.gcsafe, raises: [Defect,KeyError].} =
@ -517,16 +520,22 @@ proc rTreeSquashRootNode(
# ------------------------------------------------------------------------------
proc hexaryInterpolate*(
db: var HexaryTreeDB; ## Database
rootKey: NodeKey; ## root node hash
dbItems: var seq[RLeafSpecs]; ## list of path and leaf items
bootstrap = false; ## can create root node on-the-fly
db: HexaryTreeDbRef; ## Database
rootKey: NodeKey; ## Root node hash
dbItems: var seq[RLeafSpecs]; ## List of path and leaf items
bootstrap = false; ## Can create root node on-the-fly
): Result[void,HexaryDbError]
{.gcsafe, raises: [Defect,KeyError]} =
## Verifiy `dbItems` by interpolating the collected `dbItems` on the hexary
## trie of the repair database. If successful, there will be a complete
## hexary trie avaliable with the `payload` fields of the `dbItems` argument
## as leaf node values.
## From the argument list `dbItems`, leaf nodes will be added to the hexary
## trie while interpolating the path for the leaf nodes by adding missing
## nodes. This action is typically not a full trie rebuild. Some partial node
## entries might have been added, already which is typical for a boundary
## proof that comes with the `snap/1` protocol.
##
## If successful, there will be a complete hexary trie avaliable with the
## `payload` fields of the `dbItems` argument list as leaf node values. The
## argument list `dbItems` will have been updated by registering the node
## keys of the leaf items.
##
## The algorithm employed here tries to minimise hashing hexary nodes for
## the price of re-vising the same node again.

View File

@ -11,33 +11,24 @@
## Find node paths in hexary tries.
import
std/[sequtils, tables],
std/[tables],
eth/[common/eth_types_rlp, trie/nibbles],
../../range_desc,
./hexary_desc
{.push raises: [Defect].}
const
HexaryXPathDebugging = false # or true
type
HexaryGetFn* = proc(key: Blob): Blob {.gcsafe.}
## Fortesting/debugging: database get() function
# ------------------------------------------------------------------------------
# Private debugging helpers
# ------------------------------------------------------------------------------
proc pp(w: Blob; db: HexaryTreeDB): string =
proc pp(w: Blob; db: HexaryTreeDbRef): string =
w.convertTo(RepairKey).pp(db)
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc to(w: NodeKey; T: type Blob): T =
w.ByteArray32.toSeq
proc getNibblesImpl(path: XPath; start = 0): NibblesSeq =
## Re-build the key path
for n in start ..< path.path.len:
@ -90,7 +81,7 @@ when false:
proc pathExtend(
path: RPath;
key: RepairKey;
db: HexaryTreeDB;
db: HexaryTreeDbRef;
): RPath
{.gcsafe, raises: [Defect,KeyError].} =
## For the given path, extend to the longest possible repair tree `db`
@ -389,7 +380,7 @@ proc leafData*(path: XPath): Blob =
proc hexaryPath*(
nodeKey: NodeKey;
rootKey: RepairKey;
db: HexaryTreeDB;
db: HexaryTreeDbRef;
): RPath
{.gcsafe, raises: [Defect,KeyError]} =
## Compute logest possible repair tree `db` path matching the `nodeKey`
@ -397,6 +388,15 @@ proc hexaryPath*(
## functional notation.
RPath(tail: nodeKey.to(NibblesSeq)).pathExtend(rootKey,db)
proc hexaryPath*(
partialPath: NibblesSeq;
rootKey: RepairKey;
db: HexaryTreeDbRef;
): RPath
{.gcsafe, raises: [Defect,KeyError]} =
## Variant of `hexaryPath`.
RPath(tail: partialPath).pathExtend(rootKey,db)
proc hexaryPath*(
nodeKey: NodeKey;
root: NodeKey;
@ -413,6 +413,15 @@ proc hexaryPath*(
## in the invoking function due to the `getFn` argument.
XPath(tail: nodeKey.to(NibblesSeq)).pathExtend(root.to(Blob), getFn)
proc hexaryPath*(
partialPath: NibblesSeq;
root: NodeKey;
getFn: HexaryGetFn;
): XPath
{.gcsafe, raises: [Defect,RlpError]} =
## Variant of `hexaryPath`.
XPath(tail: partialPath).pathExtend(root.to(Blob), getFn)
proc next*(
path: XPath;
getFn: HexaryGetFn;

View File

@ -1,5 +1,4 @@
# Nimbus - Fetch account and storage states from peers efficiently
#
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
@ -10,6 +9,7 @@
# except according to those terms.
import
std/sequtils,
chronicles,
chronos,
eth/[common/eth_types, p2p],
@ -17,13 +17,15 @@ import
stint,
../../sync_desc,
".."/[range_desc, worker_desc],
"."/[accounts_db, get_account_range, get_storage_ranges]
./com/[get_account_range, get_error, get_storage_ranges, get_trie_nodes],
./accounts_db
when snapAccountsDumpEnable:
import ../../../tests/replay/[undump_accounts, undump_storages]
{.push raises: [Defect].}
logScope:
topics = "snap-fetch"
@ -145,68 +147,140 @@ proc delUnprocessed(buddy: SnapBuddyRef; iv: LeafRange) =
## Shortcut
discard buddy.ctx.data.pivotEnv.availAccounts.reduce(iv)
# -----
proc waitAfterError(buddy: SnapBuddyRef; error: GetAccountRangeError): bool =
## Error handling after `GetAccountRange` failed.
proc stopAfterError(
buddy: SnapBuddyRef;
error: ComError;
): Future[bool]
{.async.} =
## Error handling after data protocol failed.
case error:
of GareResponseTimeout:
of ComResponseTimeout:
if maxTimeoutErrors <= buddy.data.errors.nTimeouts:
# Mark this peer dead, i.e. avoid fetching from this peer for a while
buddy.ctrl.zombie = true
else:
# Otherwise try again some time later
# Otherwise try again some time later. Nevertheless, stop the
# current action.
buddy.data.errors.nTimeouts.inc
result = true
await sleepAsync(5.seconds)
return true
of GareNetworkProblem,
GareMissingProof,
GareAccountsMinTooSmall,
GareAccountsMaxTooLarge:
of ComNetworkProblem,
ComMissingProof,
ComAccountsMinTooSmall,
ComAccountsMaxTooLarge:
# Mark this peer dead, i.e. avoid fetching from this peer for a while
buddy.data.stats.major.networkErrors.inc()
buddy.ctrl.zombie = true
return true
of GareNothingSerious:
of ComEmptyAccountsArguments,
ComEmptyRequestArguments,
ComInspectDbFailed,
ComImportAccountsFailed,
ComNoDataForProof,
ComNothingSerious:
discard
of GareNoAccountsForStateRoot:
of ComNoAccountsForStateRoot,
ComNoStorageForAccounts,
ComNoByteCodesAvailable,
ComNoTrieNodesAvailable,
ComTooManyByteCodes,
ComTooManyStorageSlots,
ComTooManyTrieNodes:
# Mark this peer dead, i.e. avoid fetching from this peer for a while
buddy.ctrl.zombie = true
return true
# ------------------------------------------------------------------------------
# Private functions: accounts
# ------------------------------------------------------------------------------
proc waitAfterError(buddy: SnapBuddyRef; error: GetStorageRangesError): bool =
## Error handling after `GetStorageRanges` failed.
case error:
of GsreResponseTimeout:
if maxTimeoutErrors <= buddy.data.errors.nTimeouts:
# Mark this peer dead, i.e. avoid fetching from this peer for a while
proc processAccounts(
buddy: SnapBuddyRef;
iv: LeafRange; ## Accounts range to process
): Future[Result[void,ComError]]
{.async.} =
## Process accounts and storage by bulk download on the current envirinment
# Reset error counts for detecting repeated timeouts
buddy.data.errors.nTimeouts = 0
# Process accounts
let
ctx = buddy.ctx
peer = buddy.peer
env = ctx.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
# Fetch data for this range delegated to `fetchAccounts()`
let dd = block:
let rc = await buddy.getAccountRange(stateRoot, iv)
if rc.isErr:
buddy.putUnprocessed(iv) # fail => interval back to pool
return err(rc.error)
rc.value
let
nAccounts = dd.data.accounts.len
nStorage = dd.withStorage.len
block:
let rc = ctx.data.accountsDb.importAccounts(
peer, stateRoot, iv.minPt, dd.data)
if rc.isErr:
# Bad data, just try another peer
buddy.putUnprocessed(iv)
buddy.ctrl.zombie = true
else:
# Otherwise try again some time later
buddy.data.errors.nTimeouts.inc
result = true
trace "Import failed, restoring unprocessed accounts", peer, stateRoot,
range=dd.consumed, nAccounts, nStorage, error=rc.error
of GsreNetworkProblem,
GsreTooManyStorageSlots:
# Mark this peer dead, i.e. avoid fetching from this peer for a while
buddy.data.stats.major.networkErrors.inc()
buddy.ctrl.zombie = true
buddy.dumpBegin(iv, dd, rc.error) # FIXME: Debugging (will go away)
buddy.dumpEnd() # FIXME: Debugging (will go away)
return err(ComImportAccountsFailed)
of GsreNothingSerious,
GsreEmptyAccountsArguments:
discard
buddy.dumpBegin(iv, dd) # FIXME: Debugging (will go away)
of GsreNoStorageForAccounts:
# Mark this peer dead, i.e. avoid fetching from this peer for a while
buddy.ctrl.zombie = true
# Statistics
env.nAccounts.inc(nAccounts)
env.nStorage.inc(nStorage)
# -----
# Register consumed intervals on the accumulator over all state roots
discard buddy.ctx.data.coveredAccounts.merge(dd.consumed)
proc processStorageSlots(
# Register consumed and bulk-imported (well, not yet) accounts range
block registerConsumed:
block:
# Both intervals `min(iv)` and `min(dd.consumed)` are equal
let rc = iv - dd.consumed
if rc.isOk:
# Now, `dd.consumed` < `iv`, return some unused range
buddy.putUnprocessed(rc.value)
break registerConsumed
block:
# The processed interval might be a bit larger
let rc = dd.consumed - iv
if rc.isOk:
# Remove from unprocessed data. If it is not unprocessed, anymore
# then it was doubly processed which is ok.
buddy.delUnprocessed(rc.value)
break registerConsumed
# End registerConsumed
# Store accounts on the storage TODO list.
discard env.leftOver.append SnapSlotQueueItemRef(q: dd.withStorage)
return ok()
# ------------------------------------------------------------------------------
# Private functions: accounts storage
# ------------------------------------------------------------------------------
proc fetchAndImportStorageSlots(
buddy: SnapBuddyRef;
reqSpecs: seq[AccountSlotsHeader];
): Future[Result[SnapSlotQueueItemRef,GetStorageRangesError]]
): Future[Result[seq[SnapSlotQueueItemRef],ComError]]
{.async.} =
## Fetch storage slots data from the network, store it on disk and
## return yet unprocessed data.
@ -217,169 +291,243 @@ proc processStorageSlots(
stateRoot = env.stateHeader.stateRoot
# Get storage slots
let storage = block:
var stoRange = block:
let rc = await buddy.getStorageRanges(stateRoot, reqSpecs)
if rc.isErr:
return err(rc.error)
rc.value
# -----------------------------
buddy.dumpStorage(storage.data)
# -----------------------------
if 0 < stoRange.data.storages.len:
# ------------------------------
buddy.dumpStorage(stoRange.data)
# ------------------------------
# Verify/process data and save to disk
block:
let rc = ctx.data.accountsDb.importStorages(
peer, storage.data, storeOk = true)
# Verify/process data and save to disk
block:
let rc = ctx.data.accountsDb.importStorages(peer, stoRange.data)
if rc.isErr:
# Push back parts of the error item
for w in rc.error:
if 0 <= w[0]:
# Reset any partial requests by not copying the `firstSlot` field. So
# all the storage slots are re-fetched completely for this account.
storage.leftOver.q.add AccountSlotsHeader(
accHash: storage.data.storages[w[0]].account.accHash,
storageRoot: storage.data.storages[w[0]].account.storageRoot)
if rc.isErr:
# Push back parts of the error item
for w in rc.error:
if 0 <= w[0]:
# Reset any partial requests by not copying the `firstSlot` field.
# So all the storage slots are re-fetched completely for this
# account.
stoRange.addLeftOver AccountSlotsHeader(
accHash: stoRange.data.storages[w[0]].account.accHash,
storageRoot: stoRange.data.storages[w[0]].account.storageRoot)
if rc.error[^1][0] < 0:
discard
# TODO: disk storage failed or something else happend, so what?
if rc.error[^1][0] < 0:
discard
# TODO: disk storage failed or something else happend, so what?
# Return the remaining part to be processed later
return ok(storage.leftOver)
return ok(stoRange.leftOver)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc fetchAccounts*(buddy: SnapBuddyRef): Future[bool] {.async.} =
## Fetch accounts data and store them in the database. The function returns
## `true` if there are no more unprocessed accounts.
proc processStorageSlots(
buddy: SnapBuddyRef;
): Future[Result[void,ComError]]
{.async.} =
## Fetch storage data and save it on disk. Storage requests are managed by
## a request queue for handling partioal replies and re-fetch issues. For
## all practical puroses, this request queue should mostly be empty.
let
ctx = buddy.ctx
peer = buddy.peer
env = ctx.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
# Get a range of accounts to fetch from
let iv = block:
let rc = buddy.getUnprocessed()
if rc.isErr:
trace "No more unprocessed accounts", peer, stateRoot
return true
rc.value
# Fetch data for this range delegated to `fetchAccounts()`
let dd = block:
let rc = await buddy.getAccountRange(stateRoot, iv)
if rc.isErr:
buddy.putUnprocessed(iv) # fail => interval back to pool
if buddy.waitAfterError(rc.error):
await sleepAsync(5.seconds)
return false
rc.value
# Reset error counts for detecting repeated timeouts
buddy.data.errors.nTimeouts = 0
# Process accounts
let
nAccounts = dd.data.accounts.len
nStorage = dd.withStorage.len
block processAccountsAndStorage:
block:
let rc = ctx.data.accountsDb.importAccounts(
peer, stateRoot, iv.minPt, dd.data, storeOk = true)
while true:
# Pull out the next request item from the queue
let req = block:
let rc = env.leftOver.shift
if rc.isErr:
# Bad data, just try another peer
buddy.putUnprocessed(iv)
buddy.ctrl.zombie = true
trace "Import failed, restoring unprocessed accounts", peer, stateRoot,
range=dd.consumed, nAccounts, nStorage, error=rc.error
return ok()
rc.value
# -------------------------------
buddy.dumpBegin(iv, dd, rc.error)
buddy.dumpEnd()
# -------------------------------
block:
# Fetch and store account storage slots. On some sort of success,
# the `rc` return value contains a list of left-over items to be
# re-processed.
let rc = await buddy.fetchAndImportStorageSlots(req.q)
break processAccountsAndStorage
if rc.isErr:
# Save accounts/storage list to be processed later, then stop
discard env.leftOver.append req
return err(rc.error)
# ---------------------
buddy.dumpBegin(iv, dd)
# ---------------------
for qLo in rc.value:
# Handle queue left-overs for processing in the next cycle
if qLo.q[0].firstSlot == Hash256.default and 0 < env.leftOver.len:
# Appending to last queue item is preferred over adding new item
let item = env.leftOver.first.value
item.q = item.q & qLo.q
else:
# Put back as-is.
discard env.leftOver.append qLo
# End while
# Statistics
env.nAccounts.inc(nAccounts)
env.nStorage.inc(nStorage)
return ok()
# Register consumed intervals on the accumulator over all state roots
discard buddy.ctx.data.coveredAccounts.merge(dd.consumed)
# ------------------------------------------------------------------------------
# Private functions: healing
# ------------------------------------------------------------------------------
# Register consumed and bulk-imported (well, not yet) accounts range
block registerConsumed:
block:
# Both intervals `min(iv)` and `min(dd.consumed)` are equal
let rc = iv - dd.consumed
if rc.isOk:
# Now, `dd.consumed` < `iv`, return some unused range
buddy.putUnprocessed(rc.value)
break registerConsumed
block:
# The processed interval might be a bit larger
let rc = dd.consumed - iv
if rc.isOk:
# Remove from unprocessed data. If it is not unprocessed, anymore
# then it was doubly processed which is ok.
buddy.delUnprocessed(rc.value)
break registerConsumed
# End registerConsumed
proc accountsTrieHealing(
buddy: SnapBuddyRef;
env: SnapPivotRef;
envSource: string;
): Future[Result[void,ComError]]
{.async.} =
## ...
# Starting with a given set of potentially dangling nodes, this set is
# updated.
let
ctx = buddy.ctx
peer = buddy.peer
stateRoot = env.stateHeader.stateRoot
# Fetch storage data and save it on disk. Storage requests are managed by
# a request queue for handling partioal replies and re-fetch issues. For
# all practical puroses, this request queue should mostly be empty.
block processStorage:
discard env.leftOver.append SnapSlotQueueItemRef(q: dd.withStorage)
while env.repairState != Done and
(env.dangling.len != 0 or env.repairState == Pristine):
while true:
# Pull out the next request item from the queue
let req = block:
let rc = env.leftOver.shift
if rc.isErr:
break processStorage
rc.value
trace "Accounts healing loop", peer, repairState=env.repairState,
envSource, nDangling=env.dangling.len
block:
# Fetch and store account storage slots. On some sort of success,
# the `rc` return value contains a list of left-over items to be
# re-processed.
let rc = await buddy.processStorageSlots(req.q)
let needNodes = block:
let rc = ctx.data.accountsDb.inspectAccountsTrie(
peer, stateRoot, env.dangling)
if rc.isErr:
let error = rc.error
trace "Accounts healing failed", peer, repairState=env.repairState,
envSource, nDangling=env.dangling.len, error
return err(ComInspectDbFailed)
rc.value.dangling
if rc.isErr:
# Save accounts/storage list to be processed later, then stop
discard env.leftOver.append req
if buddy.waitAfterError(rc.error):
await sleepAsync(5.seconds)
break processAccountsAndStorage
# Clear dangling nodes register so that other processes would not fetch
# the same list simultaneously.
env.dangling.setLen(0)
elif 0 < rc.value.q.len:
# Handle queue left-overs for processing in the next cycle
if rc.value.q[0].firstSlot == Hash256.default and
0 < env.leftOver.len:
# Appending to last queue item is preferred over adding new item
let item = env.leftOver.first.value
item.q = item.q & rc.value.q
else:
# Put back as-is.
discard env.leftOver.append rc.value
# End while
# Noting to anymore
if needNodes.len == 0:
if env.repairState != Pristine:
env.repairState = Done
trace "Done accounts healing for now", peer, repairState=env.repairState,
envSource, nDangling=env.dangling.len
return ok()
# -------------
buddy.dumpEnd()
# -------------
let lastState = env.repairState
env.repairState = KeepGoing
# End processAccountsAndStorage
trace "Need nodes for healing", peer, repairState=env.repairState,
envSource, nDangling=env.dangling.len, nNodes=needNodes.len
# Fetch nodes
let dd = block:
let rc = await buddy.getTrieNodes(stateRoot, needNodes.mapIt(@[it]))
if rc.isErr:
env.dangling = needNodes
env.repairState = lastState
return err(rc.error)
rc.value
# Store to disk and register left overs for the next pass
block:
let rc = ctx.data.accountsDb.importRawNodes(peer, dd.nodes)
if rc.isOk:
env.dangling = dd.leftOver.mapIt(it[0])
elif 0 < rc.error.len and rc.error[^1][0] < 0:
# negative index => storage error
env.dangling = needNodes
else:
let nodeKeys = rc.error.mapIt(dd.nodes[it[0]])
env.dangling = dd.leftOver.mapIt(it[0]) & nodeKeys
# End while
return ok()
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc fetchAccounts*(buddy: SnapBuddyRef) {.async.} =
## Fetch accounts and data and store them in the database.
##
## TODO: Healing for storages. Currently, healing in only run for accounts.
let
ctx = buddy.ctx
peer = buddy.peer
env = ctx.data.pivotEnv
stateRoot = env.stateHeader.stateRoot
var
# Complete the previous environment by trie database healing (if any)
healingEnvs = if not ctx.data.prevEnv.isNil: @[ctx.data.prevEnv] else: @[]
block processAccountsFrame:
# Get a range of accounts to fetch from
let iv = block:
let rc = buddy.getUnprocessed()
if rc.isErr:
# Although there are no accounts left to process, the other peer might
# still work on some accounts. As a general rule, not all from an
# account range gets served so the remaining range will magically
# reappear on the unprocessed ranges database.
trace "No more unprocessed accounts (maybe)", peer, stateRoot
# Complete healing for sporadic nodes missing.
healingEnvs.add env
break processAccountsFrame
rc.value
trace "Start fetching accounts", peer, stateRoot, iv,
repairState=env.repairState
# Process received accounts and stash storage slots to fetch later
block:
let rc = await buddy.processAccounts(iv)
if rc.isErr:
let error = rc.error
if await buddy.stopAfterError(error):
buddy.dumpEnd() # FIXME: Debugging (will go away)
trace "Stop fetching cycle", peer, repairState=env.repairState,
processing="accounts", error
return
break processAccountsFrame
# End `block processAccountsFrame`
trace "Start fetching storages", peer, nAccounts=env.leftOver.len,
repairState=env.repairState
# Process storage slots from environment batch
block:
let rc = await buddy.processStorageSlots()
if rc.isErr:
let error = rc.error
if await buddy.stopAfterError(error):
buddy.dumpEnd() # FIXME: Debugging (will go away)
trace "Stop fetching cycle", peer, repairState=env.repairState,
processing="storage", error
return
# Check whether there is some environment that can be completed by
# Patricia Merkle Tree healing.
for w in healingEnvs:
let envSource = if env == ctx.data.pivotEnv: "pivot" else: "retro"
trace "Start accounts healing", peer, repairState=env.repairState,
envSource, dangling=w.dangling.len
let rc = await buddy.accountsTrieHealing(w, envSource)
if rc.isErr:
let error = rc.error
if await buddy.stopAfterError(error):
buddy.dumpEnd() # FIXME: Debugging (will go away)
trace "Stop fetching cycle", peer, repairState=env.repairState,
processing="healing", dangling=w.dangling.len, error
return
buddy.dumpEnd() # FIXME: Debugging (will go away)
trace "Done fetching cycle", peer, repairState=env.repairState
# ------------------------------------------------------------------------------
# End

View File

@ -0,0 +1,345 @@
# Nimbus
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## Borrowed from `full/worker.nim`
import
std/[hashes, options, sets],
chronicles,
chronos,
eth/[common/eth_types, p2p],
stew/byteutils,
"../.."/[protocol, sync_desc],
../worker_desc
{.push raises:[Defect].}
logScope:
topics = "snap-pivot"
const
extraTraceMessages = false # or true
## Additional trace commands
minPeersToStartSync = 2
## Wait for consensus of at least this number of peers before syncing.
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc hash(peer: Peer): Hash =
## Mixin `HashSet[Peer]` handler
hash(cast[pointer](peer))
proc pivotNumber(buddy: SnapBuddyRef): BlockNumber =
# data.pivot2Header
if buddy.ctx.data.pivotEnv.isNil:
0.u256
else:
buddy.ctx.data.pivotEnv.stateHeader.blockNumber
template safeTransport(
buddy: SnapBuddyRef;
info: static[string];
code: untyped) =
try:
code
except TransportError as e:
error info & ", stop", peer=buddy.peer, error=($e.name), msg=e.msg
buddy.ctrl.stopped = true
proc rand(r: ref HmacDrbgContext; maxVal: uint64): uint64 =
# github.com/nim-lang/Nim/tree/version-1-6/lib/pure/random.nim#L216
const
randMax = high(uint64)
if 0 < maxVal:
if maxVal == randMax:
var x: uint64
r[].generate(x)
return x
while true:
var x: uint64
r[].generate(x)
# avoid `mod` bias, so `x <= n*maxVal <= randMax` for some integer `n`
if x <= randMax - (randMax mod maxVal):
# uint -> int
return x mod (maxVal + 1)
proc rand(r: ref HmacDrbgContext; maxVal: int): int =
if 0 < maxVal: # somehow making sense of `maxVal = -1`
return cast[int](r.rand(maxVal.uint64))
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc getRandomTrustedPeer(buddy: SnapBuddyRef): Result[Peer,void] =
## Return random entry from `trusted` peer different from this peer set if
## there are enough
##
## Ackn: nim-eth/eth/p2p/blockchain_sync.nim: `randomTrustedPeer()`
let
ctx = buddy.ctx
nPeers = ctx.data.trusted.len
offInx = if buddy.peer in ctx.data.trusted: 2 else: 1
if 0 < nPeers:
var (walkInx, stopInx) = (0, ctx.data.rng.rand(nPeers - offInx))
for p in ctx.data.trusted:
if p == buddy.peer:
continue
if walkInx == stopInx:
return ok(p)
walkInx.inc
err()
proc getBestHeader(
buddy: SnapBuddyRef
): Future[Result[BlockHeader,void]] {.async.} =
## Get best block number from best block hash.
##
## Ackn: nim-eth/eth/p2p/blockchain_sync.nim: `getBestBlockNumber()`
let
peer = buddy.peer
startHash = peer.state(eth).bestBlockHash
reqLen = 1u
hdrReq = BlocksRequest(
startBlock: HashOrNum(
isHash: true,
hash: startHash),
maxResults: reqLen,
skip: 0,
reverse: true)
trace trEthSendSendingGetBlockHeaders, peer,
startBlock=startHash.data.toHex, reqLen
var hdrResp: Option[blockHeadersObj]
buddy.safeTransport("Error fetching block header"):
hdrResp = await peer.getBlockHeaders(hdrReq)
if buddy.ctrl.stopped:
return err()
if hdrResp.isNone:
trace trEthRecvReceivedBlockHeaders, peer, reqLen, respose="n/a"
return err()
let hdrRespLen = hdrResp.get.headers.len
if hdrRespLen == 1:
let
header = hdrResp.get.headers[0]
blockNumber = header.blockNumber
trace trEthRecvReceivedBlockHeaders, peer, hdrRespLen, blockNumber
return ok(header)
trace trEthRecvReceivedBlockHeaders, peer, reqLen, hdrRespLen
return err()
proc agreesOnChain(
buddy: SnapBuddyRef;
other: Peer
): Future[Result[void,bool]] {.async.} =
## Returns `true` if one of the peers `buddy.peer` or `other` acknowledges
## existence of the best block of the other peer. The values returned mean
## * ok() -- `peer` is trusted
## * err(true) -- `peer` is untrusted
## * err(false) -- `other` is dead
##
## Ackn: nim-eth/eth/p2p/blockchain_sync.nim: `peersAgreeOnChain()`
let
peer = buddy.peer
var
start = peer
fetch = other
swapped = false
# Make sure that `fetch` has not the smaller difficulty.
if fetch.state(eth).bestDifficulty < start.state(eth).bestDifficulty:
swap(fetch, start)
swapped = true
let
startHash = start.state(eth).bestBlockHash
hdrReq = BlocksRequest(
startBlock: HashOrNum(
isHash: true,
hash: startHash),
maxResults: 1,
skip: 0,
reverse: true)
trace trEthSendSendingGetBlockHeaders, peer, start, fetch,
startBlock=startHash.data.toHex, hdrReqLen=1, swapped
var hdrResp: Option[blockHeadersObj]
buddy.safeTransport("Error fetching block header"):
hdrResp = await fetch.getBlockHeaders(hdrReq)
if buddy.ctrl.stopped:
if swapped:
return err(true)
# No need to terminate `peer` if it was the `other`, failing nevertheless
buddy.ctrl.stopped = false
return err(false)
if hdrResp.isSome:
let hdrRespLen = hdrResp.get.headers.len
if 0 < hdrRespLen:
let blockNumber = hdrResp.get.headers[0].blockNumber
trace trEthRecvReceivedBlockHeaders, peer, start, fetch,
hdrRespLen, blockNumber
return ok()
trace trEthRecvReceivedBlockHeaders, peer, start, fetch,
blockNumber="n/a", swapped
return err(true)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc pivot2Start*(buddy: SnapBuddyRef) =
discard
proc pivot2Stop*(buddy: SnapBuddyRef) =
## Clean up this peer
buddy.ctx.data.untrusted.add buddy.peer
proc pivot2Restart*(buddy: SnapBuddyRef) =
buddy.data.pivot2Header = none(BlockHeader)
buddy.ctx.data.untrusted.add buddy.peer
proc pivot2Exec*(buddy: SnapBuddyRef): Future[bool] {.async.} =
## Initalise worker. This function must be run in single mode at the
## beginning of running worker peer.
##
## Ackn: nim-eth/eth/p2p/blockchain_sync.nim: `startSyncWithPeer()`
##
let
ctx = buddy.ctx
peer = buddy.peer
# Delayed clean up batch list
if 0 < ctx.data.untrusted.len:
when extraTraceMessages:
trace "Removing untrusted peers", peer, trusted=ctx.data.trusted.len,
untrusted=ctx.data.untrusted.len, runState=buddy.ctrl.state
ctx.data.trusted = ctx.data.trusted - ctx.data.untrusted.toHashSet
ctx.data.untrusted.setLen(0)
if buddy.data.pivot2Header.isNone:
when extraTraceMessages:
# Only log for the first time (if any)
trace "Pivot initialisation", peer,
trusted=ctx.data.trusted.len, runState=buddy.ctrl.state
let rc = await buddy.getBestHeader()
# Beware of peer terminating the session right after communicating
if rc.isErr or buddy.ctrl.stopped:
return false
let
bestNumber = rc.value.blockNumber
minNumber = buddy.pivotNumber
if bestNumber < minNumber:
buddy.ctrl.zombie = true
trace "Useless peer, best number too low", peer,
trusted=ctx.data.trusted.len, runState=buddy.ctrl.state,
minNumber, bestNumber
buddy.data.pivot2Header = some(rc.value)
if minPeersToStartSync <= ctx.data.trusted.len:
# We have enough trusted peers. Validate new peer against trusted
let rc = buddy.getRandomTrustedPeer()
if rc.isOK:
let rx = await buddy.agreesOnChain(rc.value)
if rx.isOk:
ctx.data.trusted.incl peer
return true
if not rx.error:
# Other peer is dead
ctx.data.trusted.excl rc.value
# If there are no trusted peers yet, assume this very peer is trusted,
# but do not finish initialisation until there are more peers.
elif ctx.data.trusted.len == 0:
ctx.data.trusted.incl peer
when extraTraceMessages:
trace "Assume initial trusted peer", peer,
trusted=ctx.data.trusted.len, runState=buddy.ctrl.state
elif ctx.data.trusted.len == 1 and buddy.peer in ctx.data.trusted:
# Ignore degenerate case, note that `trusted.len < minPeersToStartSync`
discard
else:
# At this point we have some "trusted" candidates, but they are not
# "trusted" enough. We evaluate `peer` against all other candidates. If
# one of the candidates disagrees, we swap it for `peer`. If all candidates
# agree, we add `peer` to trusted set. The peers in the set will become
# "fully trusted" (and sync will start) when the set is big enough
var
agreeScore = 0
otherPeer: Peer
deadPeers: HashSet[Peer]
when extraTraceMessages:
trace "Trust scoring peer", peer,
trusted=ctx.data.trusted.len, runState=buddy.ctrl.state
for p in ctx.data.trusted:
if peer == p:
inc agreeScore
else:
let rc = await buddy.agreesOnChain(p)
if rc.isOk:
inc agreeScore
elif buddy.ctrl.stopped:
# Beware of terminated session
return false
elif rc.error:
otherPeer = p
else:
# `Other` peer is dead
deadPeers.incl p
# Normalise
if 0 < deadPeers.len:
ctx.data.trusted = ctx.data.trusted - deadPeers
if ctx.data.trusted.len == 0 or
ctx.data.trusted.len == 1 and buddy.peer in ctx.data.trusted:
return false
# Check for the number of peers that disagree
case ctx.data.trusted.len - agreeScore:
of 0:
ctx.data.trusted.incl peer # best possible outcome
when extraTraceMessages:
trace "Agreeable trust score for peer", peer,
trusted=ctx.data.trusted.len, runState=buddy.ctrl.state
of 1:
ctx.data.trusted.excl otherPeer
ctx.data.trusted.incl peer
when extraTraceMessages:
trace "Other peer no longer trusted", peer,
otherPeer, trusted=ctx.data.trusted.len, runState=buddy.ctrl.state
else:
when extraTraceMessages:
trace "Peer not trusted", peer,
trusted=ctx.data.trusted.len, runState=buddy.ctrl.state
discard
# Evaluate status, finally
if minPeersToStartSync <= ctx.data.trusted.len:
when extraTraceMessages:
trace "Peer trusted now", peer,
trusted=ctx.data.trusted.len, runState=buddy.ctrl.state
return true
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -102,6 +102,11 @@ type
## only the first element of a `seq[AccountSlotsHeader]` queue can have an
## effective sub-range specification (later ones will be ignored.)
SnapRepairState* = enum
Pristine ## Not initialised yet
KeepGoing ## Unfinished repair process
Done ## Stop repairing
SnapPivotRef* = ref object
## Per-state root cache for particular snap data environment
stateHeader*: BlockHeader ## Pivot state, containg state root
@ -110,6 +115,8 @@ type
nAccounts*: uint64 ## Number of accounts imported
nStorage*: uint64 ## Number of storage spaces imported
leftOver*: SnapSlotsQueue ## Re-fetch storage for these accounts
dangling*: seq[Blob] ## Missing nodes for healing process
repairState*: SnapRepairState ## State of healing process
when switchPivotAfterCoverage < 1.0:
minCoverageReachedOk*: bool ## Stop filling this pivot
@ -122,6 +129,7 @@ type
stats*: SnapBuddyStats ## Statistics counters
errors*: SnapBuddyErrors ## For error handling
pivotHeader*: Option[BlockHeader] ## For pivot state hunter
pivot2Header*: Option[BlockHeader] ## Alternative header
workerPivot*: WorkerPivotBase ## Opaque object reference for sub-module
BuddyPoolHookFn* = proc(buddy: BuddyRef[CtxData,BuddyData]) {.gcsafe.}
@ -138,10 +146,14 @@ type
pivotTable*: SnapPivotTable ## Per state root environment
pivotCount*: uint64 ## Total of all created tab entries
pivotEnv*: SnapPivotRef ## Environment containing state root
prevEnv*: SnapPivotRef ## Previous state root environment
accountRangeMax*: UInt256 ## Maximal length, high(u256)/#peers
accountsDb*: AccountsDbRef ## Proof processing for accounts
runPoolHook*: BuddyPoolHookFn ## Callback for `runPool()`
# --------
untrusted*: seq[Peer] ## Clean up list (pivot2)
trusted*: HashSet[Peer] ## Peers ready for delivery (pivot2)
# --------
when snapAccountsDumpEnable:
proofDumpOk*: bool
proofDumpFile*: File

View File

@ -129,14 +129,20 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} =
# Grab `monitorLock` (was `false` as checked above) and wait until clear
# to run as the only activated instance.
dsc.monitorLock = true
while 0 < dsc.activeMulti:
await sleepAsync(50.milliseconds)
while dsc.singleRunLock:
await sleepAsync(50.milliseconds)
var count = dsc.buddies.len
for w in dsc.buddies.nextValues:
count.dec
worker.runPool(count == 0)
block poolModeExec:
while 0 < dsc.activeMulti:
await sleepAsync(50.milliseconds)
if worker.ctrl.stopped:
break poolModeExec
while dsc.singleRunLock:
await sleepAsync(50.milliseconds)
if worker.ctrl.stopped:
break poolModeExec
var count = dsc.buddies.len
for w in dsc.buddies.nextValues:
count.dec
worker.runPool(count == 0)
# End `block poolModeExec`
dsc.monitorLock = false
continue
@ -146,6 +152,8 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} =
# Allow task switch
await sleepAsync(50.milliseconds)
if worker.ctrl.stopped:
break
# Multi mode
if worker.ctrl.multiOk:
@ -161,10 +169,14 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} =
if not dsc.singleRunLock:
# Lock single instance mode and wait for other workers to finish
dsc.singleRunLock = true
while 0 < dsc.activeMulti:
await sleepAsync(50.milliseconds)
# Run single instance and release afterwards
await worker.runSingle()
block singleModeExec:
while 0 < dsc.activeMulti:
await sleepAsync(50.milliseconds)
if worker.ctrl.stopped:
break singleModeExec
# Run single instance and release afterwards
await worker.runSingle()
# End `block singleModeExec`
dsc.singleRunLock = false
# End while
@ -181,7 +193,7 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) =
peers = dsc.pool.len
workers = dsc.buddies.len
if dsc.buddies.hasKey(peer.hash):
trace "Reconnecting zombie peer rejected", peer, peers, workers, maxWorkers
trace "Reconnecting zombie peer ignored", peer, peers, workers, maxWorkers
return
# Initialise worker for this peer

View File

@ -9,34 +9,24 @@
# distributed except according to those terms.
import
std/[math, strutils, hashes],
std/[math, hashes],
eth/common/eth_types_rlp,
stew/byteutils
{.push raises: [Defect].}
type
NodeHash* = distinct Hash256
## Hash of a trie node or other blob carried over `NodeData` account trie
## nodes, storage trie nodes, contract code.
##
## Note that the `ethXX` and `snapXX` protocol drivers always use the
## underlying `Hash256` type which needs to be converted to `NodeHash`.
BlockHash* = distinct Hash256
## Hash of a block, goes with `BlockNumber`.
##
## Note that the `ethXX` protocol driver always uses the
## underlying `Hash256` type which needs to be converted to `BlockHash`.
SomeDistinctHash256 =
NodeHash | BlockHash
# ------------------------------------------------------------------------------
# Public constructors
# ------------------------------------------------------------------------------
proc new*(T: type SomeDistinctHash256): T =
proc new*(T: type BlockHash): T =
Hash256().T
# ------------------------------------------------------------------------------
@ -57,11 +47,11 @@ proc to*(longNum: UInt256; T: type float): T =
let exp = mantissaLen - 64
(longNum shr exp).truncate(uint64).T * (2.0 ^ exp)
proc to*(w: SomeDistinctHash256; T: type Hash256): T =
proc to*(w: BlockHash; T: type Hash256): T =
## Syntactic sugar
w.Hash256
proc to*(w: seq[SomeDistinctHash256]; T: type seq[Hash256]): T =
proc to*(w: seq[BlockHash]; T: type seq[Hash256]): T =
## Ditto
cast[seq[Hash256]](w)
@ -73,22 +63,22 @@ proc to*(bh: BlockHash; T: type HashOrNum): T =
# Public functions
# ------------------------------------------------------------------------------
proc read*(rlp: var Rlp, T: type SomeDistinctHash256): T
proc read*(rlp: var Rlp, T: type BlockHash): T
{.gcsafe, raises: [Defect,RlpError]} =
## RLP mixin reader
rlp.read(Hash256).T
proc append*(writer: var RlpWriter; h: SomeDistinctHash256) =
proc append*(writer: var RlpWriter; h: BlockHash) =
## RLP mixin
append(writer, h.Hash256)
proc `==`*(a: SomeDistinctHash256; b: Hash256): bool =
proc `==`*(a: BlockHash; b: Hash256): bool =
a.Hash256 == b
proc `==`*[T: SomeDistinctHash256](a,b: T): bool =
proc `==`*[T: BlockHash](a,b: T): bool =
a.Hash256 == b.Hash256
proc hash*(root: SomeDistinctHash256): Hash =
proc hash*(root: BlockHash): Hash =
## Mixin for `Table` or `KeyedQueue`
root.Hash256.data.hash
@ -100,7 +90,7 @@ func toHex*(hash: Hash256): string =
## Shortcut for `byteutils.toHex(hash.data)`
hash.data.toHex
func `$`*(h: SomeDistinctHash256): string =
func `$`*(h: BlockHash): string =
$h.Hash256.data.toHex
func `$`*(blob: Blob): string =

View File

@ -57,6 +57,11 @@ proc toPC*(
minDigits = digitsAfterDot + 1
multiplier = (10 ^ (minDigits + 1)).float
roundUp = rounding / 10.0
result = ((num * multiplier) + roundUp).int.intToStr(minDigits) & "%"
let
sign = if num < 0: "-" else: ""
preTruncated = (num.abs * multiplier) + roundUp
if int.high.float <= preTruncated:
return "NaN"
result = sign & preTruncated.int.intToStr(minDigits) & "%"
when 0 < digitsAfterDot:
result.insert(".", result.len - minDigits)

View File

@ -13,7 +13,7 @@ import
eth/common,
nimcrypto/utils,
stew/byteutils,
../../nimbus/sync/snap/[range_desc, worker/db/hexary_desc],
../../nimbus/sync/snap/range_desc,
./gunzip
type

View File

@ -13,7 +13,7 @@ import
eth/common,
nimcrypto/utils,
stew/byteutils,
../../nimbus/sync/snap/[range_desc, worker/db/hexary_desc],
../../nimbus/sync/snap/range_desc,
../../nimbus/sync/protocol,
./gunzip

View File

@ -25,8 +25,8 @@ import
../nimbus/p2p/chain,
../nimbus/sync/types,
../nimbus/sync/snap/range_desc,
../nimbus/sync/snap/worker/accounts_db,
../nimbus/sync/snap/worker/db/[hexary_desc, rocky_bulk_load],
../nimbus/sync/snap/worker/[accounts_db, db/hexary_desc,
db/hexary_inspect, db/rocky_bulk_load],
../nimbus/utils/prettify,
./replay/[pp, undump_blocks, undump_accounts, undump_storages],
./test_sync_snap/[bulk_test_xx, snap_test_xx, test_types]
@ -115,6 +115,12 @@ proc pp(rc: Result[Account,HexaryDbError]): string =
proc pp(rc: Result[Hash256,HexaryDbError]): string =
if rc.isErr: $rc.error else: $rc.value.to(NodeTag)
proc pp(
rc: Result[TrieNodeStat,HexaryDbError];
db: AccountsDbSessionRef
): string =
if rc.isErr: $rc.error else: rc.value.pp(db.getAcc)
proc ppKvPc(w: openArray[(string,int)]): string =
w.mapIt(&"{it[0]}={it[1]}%").join(", ")
@ -282,11 +288,12 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) =
accKeys: seq[Hash256]
test &"Snap-proofing {accountsList.len} items for state root ..{root.pp}":
let dbBase = if persistent: AccountsDbRef.init(db.cdb[0])
else: AccountsDbRef.init(newMemoryDB())
let
dbBase = if persistent: AccountsDbRef.init(db.cdb[0])
else: AccountsDbRef.init(newMemoryDB())
dbDesc = AccountsDbSessionRef.init(dbBase, root, peer)
for n,w in accountsList:
check dbBase.importAccounts(
peer, root, w.base, w.data, storeOk = persistent) == OkHexDb
check dbDesc.importAccounts(w.base, w.data, persistent) == OkHexDb
test &"Merging {accountsList.len} proofs for state root ..{root.pp}":
let dbBase = if persistent: AccountsDbRef.init(db.cdb[1])
@ -417,11 +424,13 @@ proc storagesRunner(
test &"Merging {accountsList.len} accounts for state root ..{root.pp}":
for w in accountsList:
check dbBase.importAccounts(
peer, root, w.base, w.data, storeOk = persistent) == OkHexDb
let desc = AccountsDbSessionRef.init(dbBase, root, peer)
check desc.importAccounts(w.base, w.data, persistent) == OkHexDb
test &"Merging {storagesList.len} storages lists":
let ignore = knownFailures.toTable
let
dbDesc = AccountsDbSessionRef.init(dbBase, root, peer)
ignore = knownFailures.toTable
for n,w in storagesList:
let
testId = fileInfo & "#" & $n
@ -429,12 +438,197 @@ proc storagesRunner(
Result[void,seq[(int,HexaryDbError)]].err(ignore[testId])
else:
OkStoDb
check dbDesc.importStorages(w.data, persistent) == expRc
#if expRc.isErr: setTraceLevel()
#else: setErrorLevel()
#echo ">>> testId=", testId, " expect-error=", expRc.isErr
check dbBase.importStorages(peer, w.data, storeOk = persistent) == expRc
proc inspectionRunner(
noisy = true;
persistent = true;
cascaded = true;
sample: openArray[AccountsSample] = snapTestList) =
let
peer = Peer.new
inspectList = sample.mapIt(it.to(seq[UndumpAccounts]))
tmpDir = getTmpDir()
db = if persistent: tmpDir.testDbs(sample[0].name) else: testDbs()
dbDir = db.dbDir.split($DirSep).lastTwo.join($DirSep)
info = if db.persistent: &"persistent db on \"{dbDir}\""
else: "in-memory db"
fileInfo = "[" & sample[0].file.splitPath.tail.replace(".txt.gz","") & "..]"
defer:
if db.persistent:
for n in 0 ..< nTestDbInstances:
if db.cdb[n].rocksStoreRef.isNil:
break
db.cdb[n].rocksStoreRef.store.db.rocksdb_close
tmpDir.flushDbDir(sample[0].name)
suite &"SyncSnap: inspect {fileInfo} lists for {info} for healing":
let
memBase = AccountsDbRef.init(newMemoryDB())
memDesc = AccountsDbSessionRef.init(memBase, Hash256(), peer)
var
singleStats: seq[(int,TrieNodeStat)]
accuStats: seq[(int,TrieNodeStat)]
perBase,altBase: AccountsDbRef
perDesc,altDesc: AccountsDbSessionRef
if persistent:
perBase = AccountsDbRef.init(db.cdb[0])
perDesc = AccountsDbSessionRef.init(perBase, Hash256(), peer)
altBase = AccountsDbRef.init(db.cdb[1])
altDesc = AccountsDbSessionRef.init(altBase, Hash256(), peer)
test &"Fingerprinting {inspectList.len} single accounts lists " &
"for in-memory-db":
for n,accList in inspectList:
# Separate storage
let
root = accList[0].root
rootKey = root.to(NodeKey)
desc = AccountsDbSessionRef.init(memBase, root, peer)
for w in accList:
check desc.importAccounts(w.base, w.data, persistent=false) == OkHexDb
let rc = desc.inspectAccountsTrie(persistent=false)
check rc.isOk
let
dangling = rc.value.dangling
keys = desc.getAcc.hexaryInspectToKeys(
rootKey, dangling.toHashSet.toSeq)
check dangling.len == keys.len
singleStats.add (desc.getAcc.tab.len,rc.value)
test &"Fingerprinting {inspectList.len} single accounts lists " &
"for persistent db":
if not persistent:
skip()
else:
for n,accList in inspectList:
if nTestDbInstances <= 2+n or db.cdb[2+n].rocksStoreRef.isNil:
continue
# Separate storage on persistent DB (leaving first db slot empty)
let
root = accList[0].root
rootKey = root.to(NodeKey)
dbBase = AccountsDbRef.init(db.cdb[2+n])
desc = AccountsDbSessionRef.init(dbBase, root, peer)
for w in accList:
check desc.importAccounts(w.base, w.data, persistent) == OkHexDb
let rc = desc.inspectAccountsTrie(persistent=false)
check rc.isOk
let
dangling = rc.value.dangling
keys = desc.getAcc.hexaryInspectToKeys(
rootKey, dangling.toHashSet.toSeq)
check dangling.len == keys.len
# Must be the same as the in-memory fingerprint
check singleStats[n][1] == rc.value
test &"Fingerprinting {inspectList.len} accumulated accounts lists " &
"for in-memory-db":
for n,accList in inspectList:
# Accumulated storage
let
root = accList[0].root
rootKey = root.to(NodeKey)
desc = memDesc.dup(root,Peer())
for w in accList:
check desc.importAccounts(w.base, w.data, persistent=false) == OkHexDb
let rc = desc.inspectAccountsTrie(persistent=false)
check rc.isOk
let
dangling = rc.value.dangling
keys = desc.getAcc.hexaryInspectToKeys(
rootKey, dangling.toHashSet.toSeq)
check dangling.len == keys.len
accuStats.add (desc.getAcc.tab.len,rc.value)
test &"Fingerprinting {inspectList.len} accumulated accounts lists " &
"for persistent db":
if not persistent:
skip()
else:
for n,accList in inspectList:
# Accumulated storage on persistent DB (using first db slot)
let
root = accList[0].root
rootKey = root.to(NodeKey)
rootSet = [rootKey].toHashSet
desc = perDesc.dup(root,Peer())
for w in accList:
check desc.importAccounts(w.base, w.data, persistent) == OkHexDb
let rc = desc.inspectAccountsTrie(persistent=false)
check rc.isOk
let
dangling = rc.value.dangling
keys = desc.getAcc.hexaryInspectToKeys(
rootKey, dangling.toHashSet.toSeq)
check dangling.len == keys.len
check accuStats[n][1] == rc.value
test &"Cascaded fingerprinting {inspectList.len} accumulated accounts " &
"lists for in-memory-db":
if not cascaded:
skip()
else:
let
cscBase = AccountsDbRef.init(newMemoryDB())
cscDesc = AccountsDbSessionRef.init(cscBase, Hash256(), peer)
var
cscStep: Table[NodeKey,(int,seq[Blob])]
for n,accList in inspectList:
# Accumulated storage
let
root = accList[0].root
rootKey = root.to(NodeKey)
desc = cscDesc.dup(root,Peer())
for w in accList:
check desc.importAccounts(w.base,w.data,persistent=false) == OkHexDb
if cscStep.hasKeyOrPut(rootKey,(1,seq[Blob].default)):
cscStep[rootKey][0].inc
let
r0 = desc.inspectAccountsTrie(persistent=false)
rc = desc.inspectAccountsTrie(cscStep[rootKey][1],false)
check rc.isOk
let
accumulated = r0.value.dangling.toHashSet
cascaded = rc.value.dangling.toHashSet
check accumulated == cascaded
# Make sure that there are no trivial cases
let trivialCases = toSeq(cscStep.values).filterIt(it[0] <= 1).len
check trivialCases == 0
test &"Cascaded fingerprinting {inspectList.len} accumulated accounts " &
"for persistent db":
if not cascaded or not persistent:
skip()
else:
let
cscBase = altBase
cscDesc = altDesc
var
cscStep: Table[NodeKey,(int,seq[Blob])]
for n,accList in inspectList:
# Accumulated storage
let
root = accList[0].root
rootKey = root.to(NodeKey)
desc = cscDesc.dup(root,Peer())
for w in accList:
check desc.importAccounts(w.base,w.data,persistent) == OkHexDb
if cscStep.hasKeyOrPut(rootKey,(1,seq[Blob].default)):
cscStep[rootKey][0].inc
let
r0 = desc.inspectAccountsTrie(persistent=true)
rc = desc.inspectAccountsTrie(cscStep[rootKey][1],true)
check rc.isOk
let
accumulated = r0.value.dangling.toHashSet
cascaded = rc.value.dangling.toHashSet
check accumulated == cascaded
# Make sure that there are no trivial cases
let trivialCases = toSeq(cscStep.values).filterIt(it[0] <= 1).len
check trivialCases == 0
# ------------------------------------------------------------------------------
# Test Runners: database timing tests
@ -547,6 +741,10 @@ proc storeRunner(noisy = true; persistent = true; cleanUp = true) =
defer:
if xDbs.persistent and cleanUp:
for n in 0 ..< nTestDbInstances:
if xDbs.cdb[n].rocksStoreRef.isNil:
break
xDbs.cdb[n].rocksStoreRef.store.db.rocksdb_close
xTmpDir.flushDbDir("store-runner")
xDbs.reset
@ -877,12 +1075,10 @@ proc storeRunner(noisy = true; persistent = true; cleanUp = true) =
# ------------------------------------------------------------------------------
proc syncSnapMain*(noisy = defined(debug)) =
# Caveat: running `accountsRunner(persistent=true)` twice will crash as the
# persistent database might not be fully cleared due to some stale
# locks.
noisy.accountsRunner(persistent=true)
noisy.accountsRunner(persistent=false)
#noisy.accountsRunner(persistent=false) # problems unless running stand-alone
noisy.importRunner() # small sample, just verify functionality
noisy.inspectionRunner()
noisy.storeRunner()
when isMainModule:
@ -941,15 +1137,17 @@ when isMainModule:
#
# This one uses dumps from the external `nimbus-eth1-blob` repo
when true and false:
when true: # and false:
import ./test_sync_snap/snap_other_xx
noisy.showElapsed("accountsRunner()"):
for n,sam in snapOtherList:
if n in {999} or true:
false.accountsRunner(persistent=true, sam)
false.accountsRunner(persistent=true, sam)
noisy.showElapsed("inspectRunner()"):
for n,sam in snapOtherHealingList:
false.inspectionRunner(persistent=true, cascaded=false, sam)
# This one usues dumps from the external `nimbus-eth1-blob` repo
when true and false:
when true: # and false:
import ./test_sync_snap/snap_storage_xx
let knownFailures = @[
("storages3__18__25_dump#11", @[( 233, RightBoundaryProofFailed)]),
@ -960,15 +1158,14 @@ when isMainModule:
]
noisy.showElapsed("storageRunner()"):
for n,sam in snapStorageList:
if n in {999} or true:
false.storagesRunner(persistent=true, sam, knownFailures)
#if true: quit()
false.storagesRunner(persistent=true, sam, knownFailures)
# This one uses readily available dumps
when true: # and false:
for n,sam in snapTestList:
false.inspectionRunner()
for sam in snapTestList:
false.accountsRunner(persistent=true, sam)
for n,sam in snapTestStorageList:
for sam in snapTestStorageList:
false.accountsRunner(persistent=true, sam)
false.storagesRunner(persistent=true, sam)

View File

@ -72,4 +72,62 @@ const
snapOther0a, snapOther0b, snapOther1a, snapOther1b, snapOther2,
snapOther3, snapOther4, snapOther5, snapOther6]
#<state-root-id> <sample-id-range> <state-root>
# <range-base>
# <last-account>
#
# 0b 7..8 346637e390dce1941c8f8c7bf21adb33cefc198c26bc1964ebf8507471e89000
# 0000000000000000000000000000000000000000000000000000000000000000
# 09e8d852bc952f53343967d775b55a7a626ce6f02c828f4b0d4509b790aee55b
#
# 1b 10..17 979c81bf60286f195c9b69d0bf3c6e4b3939389702ed767d55230fe5db57b8f7
# 0000000000000000000000000000000000000000000000000000000000000000
# 44fc2f4f885e7110bcba5534e9dce2bc59261e1b6ceac2206f5d356575d58d6a
#
# 2 18..25 93353de9894e0eac48bfe0b0023488379aff8ffd4b6e96e0c2c51f395363c7fb
# 024043dc9f47e85f13267584b6098d37e1f8884672423e5f2b86fe4cc606c9d7
# 473c70d158603819829a2d637edd5fa8e8f05720d9895e5e87450b6b19d81239
#
# 4 34..41 d6feef8f3472c5288a5a99409bc0cddbb697637644266a9c8b2e134806ca0fc8
# 2452fe42091c1f12adfe4ea768e47fe8d7b2494a552122470c89cb4c759fe614
# 6958f4d824c2b679ad673cc3f373bb6c431e8941d027ed4a1c699925ccc31ea5
#
# 3 26..33 14d70751ba7fd40303a054c284bca4ef2f63a8e4e1973da90371dffc666bde32
# 387bb75a840d46baa37a6d723d3b1de78f6a0a41d6094c47ee1dad16533b829e
# 7d77e87f695f4244ff8cd4cbfc750003080578f9f51eac3ab3e50df1a7c088c4
#
# 6 50..54 11eba9ec2f204c8165a245f9d05bb7ebb5bfdbdbcccc1a849d8ab2b23550cc12
# 74e30f84b7d6532cf3aeec8931fe6f7ef13d5bad90ebaae451d1f78c4ee41412
# 9c5f3f14c3b3a6eb4d2201b3bf15cf15554d44ba49d8230a7c8a1709660ca2ef
#
# 5 42..49 f75477bd57be4883875042577bf6caab1bd7f8517f0ce3532d813e043ec9f5d0
# a04344c35a42386857589e92428b49b96cd0319a315b81bff5c7ae93151b5057
# e549721af6484420635f0336d90d2d0226ba9bbd599310ae76916b725980bd85
#
# 1a 9 979c81bf60286f195c9b69d0bf3c6e4b3939389702ed767d55230fe5db57b8f7
# fa261d159a47f908d499271fcf976b71244b260ca189f709b8b592d18c098b60
# fa361ef07b5b6cc719347b8d9db35e08986a575b0eca8701caf778f01a08640a
#
# 0a 0..6 346637e390dce1941c8f8c7bf21adb33cefc198c26bc1964ebf8507471e89000
# bf75c492276113636daa8cdd8b27ca5283e26965fbdc2568633480b6b104cd77
# fa99c0467106abe1ed33bd2b6acc1582b09e43d28308d04663d1ef9532e57c6e
#
# ------------------------
#0 0..6 346637e390dce1941c8f8c7bf21adb33cefc198c26bc1964ebf8507471e89000
#0 7..8 346637e390dce1941c8f8c7bf21adb33cefc198c26bc1964ebf8507471e89000
#1 9 979c81bf60286f195c9b69d0bf3c6e4b3939389702ed767d55230fe5db57b8f7
#1 10..17 979c81bf60286f195c9b69d0bf3c6e4b3939389702ed767d55230fe5db57b8f7
#2 18..25 93353de9894e0eac48bfe0b0023488379aff8ffd4b6e96e0c2c51f395363c7fb
#3 26..33 14d70751ba7fd40303a054c284bca4ef2f63a8e4e1973da90371dffc666bde32
#4 34..41 d6feef8f3472c5288a5a99409bc0cddbb697637644266a9c8b2e134806ca0fc8
#5 42..49 f75477bd57be4883875042577bf6caab1bd7f8517f0ce3532d813e043ec9f5d0
#6 50..54 11eba9ec2f204c8165a245f9d05bb7ebb5bfdbdbcccc1a849d8ab2b23550cc12
# ------------------------
snapOtherHealingList* = [
@[snapOther0b, snapOther2, snapOther4],
@[snapOther0a, snapOther1a, snapOther5]]
# End