From 4ff0948fede29743e818eea2bdbf72ad0fd7771d Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Fri, 16 Sep 2022 08:24:12 +0100 Subject: [PATCH] Snap sync accounts healing (#1225) * Added inspect module why: Find dangling references for trie healing support. details: + This patch set provides only the inspect module and some unit tests. + There are also extensive unit tests which need bulk data from the `nimbus-eth1-blob` module. * Alternative pivot finder why: Attempt to be faster on start up. Also tying to decouple pivot finder somehow by providing different mechanisms (this one runs in `single` mode.) * Use inspect module for healing details: + After some progress with account and storage data, the inspect facility is used to find dangling links in the database to be filled nose-wise. + This is a crude attempt to cobble together functional elements. The set up needs to be honed. * fix scheduler to avoid starting dead peers why: Some peers drop out while in `sleepAsync()`. So extra `if` clauses make sure that this event is detected early. * Bug fixes causing crashes details: + prettify.toPC(): int/intToStr() numeric range over/underflow + hexary_inspect.hexaryInspectPath(): take care of half initialised step with branch but missing index into branch array * improve handling of dropped peers in alternaive pivot finder why: Strange things may happen while querying data from the network. Additional checks make sure that the state of other peers is updated immediately. * Update trace messages * reorganise snap fetch & store schedule --- nimbus/sync/full/worker.nim | 20 +- nimbus/sync/protocol.nim | 3 + nimbus/sync/snap/range_desc.nim | 57 +- nimbus/sync/snap/worker.nim | 157 ++++-- nimbus/sync/snap/worker/accounts_db.nim | 204 ++++++-- .../worker/{ => com}/get_account_range.nim | 31 +- .../sync/snap/worker/com/get_byte_codes.nim | 129 +++++ nimbus/sync/snap/worker/com/get_error.nim | 34 ++ .../worker/{ => com}/get_storage_ranges.nim | 111 ++-- .../sync/snap/worker/com/get_trie_nodes.nim | 152 ++++++ nimbus/sync/snap/worker/db/bulk_storage.nim | 16 +- nimbus/sync/snap/worker/db/hexary_defs.nim | 16 +- nimbus/sync/snap/worker/db/hexary_desc.nim | 99 ++-- nimbus/sync/snap/worker/db/hexary_import.nim | 85 ++- nimbus/sync/snap/worker/db/hexary_inspect.nim | 315 +++++++++++ .../snap/worker/db/hexary_interpolate.nim | 51 +- nimbus/sync/snap/worker/db/hexary_paths.nim | 37 +- nimbus/sync/snap/worker/fetch_accounts.nim | 488 ++++++++++++------ nimbus/sync/snap/worker/pivot2.nim | 345 +++++++++++++ nimbus/sync/snap/worker_desc.nim | 12 + nimbus/sync/sync_sched.nim | 38 +- nimbus/sync/types.nim | 30 +- nimbus/utils/prettify.nim | 7 +- tests/replay/undump_accounts.nim | 2 +- tests/replay/undump_storages.nim | 2 +- tests/test_sync_snap.nim | 249 ++++++++- tests/test_sync_snap/snap_other_xx.nim | 58 +++ 27 files changed, 2242 insertions(+), 506 deletions(-) rename nimbus/sync/snap/worker/{ => com}/get_account_range.nim (90%) create mode 100644 nimbus/sync/snap/worker/com/get_byte_codes.nim create mode 100644 nimbus/sync/snap/worker/com/get_error.nim rename nimbus/sync/snap/worker/{ => com}/get_storage_ranges.nim (62%) create mode 100644 nimbus/sync/snap/worker/com/get_trie_nodes.nim create mode 100644 nimbus/sync/snap/worker/db/hexary_inspect.nim create mode 100644 nimbus/sync/snap/worker/pivot2.nim diff --git a/nimbus/sync/full/worker.nim b/nimbus/sync/full/worker.nim index b2f138b0e..b02a3dd0f 100644 --- a/nimbus/sync/full/worker.nim +++ b/nimbus/sync/full/worker.nim @@ -449,10 +449,8 @@ proc initaliseWorker(buddy: FullBuddyRef): Future[bool] {.async.} = let rc = buddy.getRandomTrustedPeer() if rc.isOK: if await buddy.agreesOnChain(rc.value): - # Beware of peer terminating the session - if not buddy.ctrl.stopped: - ctx.data.trusted.incl peer - return true + ctx.data.trusted.incl peer + return true # If there are no trusted peers yet, assume this very peer is trusted, # but do not finish initialisation until there are more peers. @@ -476,15 +474,13 @@ proc initaliseWorker(buddy: FullBuddyRef): Future[bool] {.async.} = for p in ctx.data.trusted: if peer == p: inc agreeScore - else: - let agreedOk = await buddy.agreesOnChain(p) + elif await buddy.agreesOnChain(p): + inc agreeScore + elif buddy.ctrl.stopped: # Beware of peer terminating the session - if buddy.ctrl.stopped: - return false - if agreedOk: - inc agreeScore - else: - otherPeer = p + return false + else: + otherPeer = p # Check for the number of peers that disagree case ctx.data.trusted.len - agreeScore diff --git a/nimbus/sync/protocol.nim b/nimbus/sync/protocol.nim index 9af9bcb28..b8306f3bf 100644 --- a/nimbus/sync/protocol.nim +++ b/nimbus/sync/protocol.nim @@ -33,6 +33,9 @@ type SnapStorageRanges* = storageRangesObj ## Ditto + SnapByteCodes* = byteCodesObj + ## Ditto + SnapTrieNodes* = trieNodesObj ## Ditto diff --git a/nimbus/sync/snap/range_desc.nim b/nimbus/sync/snap/range_desc.nim index fed267299..295ebe7e0 100644 --- a/nimbus/sync/snap/range_desc.nim +++ b/nimbus/sync/snap/range_desc.nim @@ -10,7 +10,7 @@ # distributed except according to those terms. import - std/[math, hashes], + std/[math, sequtils, hashes], eth/common/eth_types_rlp, stew/[byteutils, interval_set], stint, @@ -21,10 +21,17 @@ import {.push raises: [Defect].} type + ByteArray32* = array[32,byte] + ## Used for 32 byte database keys + NodeTag* = ##\ ## Trie leaf item, account hash etc. distinct UInt256 + NodeKey* = distinct ByteArray32 + ## Hash key without the hash wrapper (as opposed to `NodeTag` which is a + ## number) + LeafRange* = ##\ ## Interval `[minPt,maxPt]` of` NodeTag` elements, can be managed in an ## `IntervalSet` data type. @@ -69,21 +76,29 @@ type # Public helpers # ------------------------------------------------------------------------------ -proc to*(nid: NodeTag; T: type Hash256): T = +proc to*(tag: NodeTag; T: type Hash256): T = ## Convert to serialised equivalent - result.data = nid.UInt256.toBytesBE + result.data = tag.UInt256.toBytesBE -proc to*(nid: NodeTag; T: type NodeHash): T = - ## Syntactic sugar - nid.to(Hash256).T - -proc to*(h: Hash256; T: type NodeTag): T = +proc to*(key: NodeKey; T: type NodeTag): T = ## Convert from serialised equivalent - UInt256.fromBytesBE(h.data).T + UInt256.fromBytesBE(key.ByteArray32).T -proc to*(nh: NodeHash; T: type NodeTag): T = +proc to*(key: Hash256; T: type NodeTag): T = ## Syntactic sugar - nh.Hash256.to(T) + key.data.NodeKey.to(T) + +proc to*(tag: NodeTag; T: type NodeKey): T = + ## Syntactic sugar + tag.UInt256.toBytesBE.T + +proc to*(hash: Hash256; T: type NodeKey): T = + ## Syntactic sugar + hash.data.NodeKey + +proc to*(key: NodeKey; T: type Blob): T = + ## Syntactic sugar + key.ByteArray32.toSeq proc to*(n: SomeUnsignedInt|UInt256; T: type NodeTag): T = ## Syntactic sugar @@ -93,21 +108,21 @@ proc to*(n: SomeUnsignedInt|UInt256; T: type NodeTag): T = # Public constructors # ------------------------------------------------------------------------------ -proc init*(nh: var NodeHash; data: openArray[byte]): bool = - ## Import argument `data` into `nh` which must have length either `32` or `0`. - ## The latter case is equivalent to an all zero byte array of size `32`. +proc init*(key: var NodeKey; data: openArray[byte]): bool = + ## ## Import argument `data` into `key` which must have length either `32`, ot + ## `0`. The latter case is equivalent to an all zero byte array of size `32`. if data.len == 32: - (addr nh.Hash256.data[0]).copyMem(unsafeAddr data[0], 32) + (addr key.ByteArray32[0]).copyMem(unsafeAddr data[0], data.len) return true elif data.len == 0: - nh.reset + key.reset return true -proc init*(nt: var NodeTag; data: openArray[byte]): bool = - ## Similar to `init(nh: var NodeHash; .)`. - var h: NodeHash - if h.init(data): - nt = h.to(NodeTag) +proc init*(tag: var NodeTag; data: openArray[byte]): bool = + ## Similar to `init(key: var NodeHash; .)`. + var key: NodeKey + if key.init(data): + tag = key.to(NodeTag) return true # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker.nim b/nimbus/sync/snap/worker.nim index 6e727d954..ebd8ab2ee 100644 --- a/nimbus/sync/snap/worker.nim +++ b/nimbus/sync/snap/worker.nim @@ -16,11 +16,20 @@ import eth/[common/eth_types, p2p], stew/[interval_set, keyed_queue], ../../db/select_backend, - ../../utils/prettify, ".."/[protocol, sync_desc], - ./worker/[accounts_db, fetch_accounts, pivot, ticker], + ./worker/[accounts_db, fetch_accounts, ticker], "."/[range_desc, worker_desc] +const + usePivot2ok = false or true + +when usePivot2ok: + import ./worker/pivot2 +else: + import ./worker/pivot + +{.push raises: [Defect].} + logScope: topics = "snap-sync" @@ -81,30 +90,47 @@ proc setPivotEnv(buddy: SnapBuddyRef; header: BlockHeader) = # Statistics ctx.data.pivotCount.inc - # Activate per-state root environment + # Activate per-state root environment (and hold previous one) + ctx.data.prevEnv = ctx.data.pivotEnv ctx.data.pivotEnv = ctx.data.pivotTable.lruAppend(key, env, ctx.buddiesMax) proc updatePivotEnv(buddy: SnapBuddyRef): bool = ## Update global state root environment from local `pivotHeader`. Choose the ## latest block number. Returns `true` if the environment was changed - if buddy.data.pivotHeader.isSome: + when usePivot2ok: + let maybeHeader = buddy.data.pivot2Header + else: + let maybeHeader = buddy.data.pivotHeader + + if maybeHeader.isSome: let + peer = buddy.peer ctx = buddy.ctx env = ctx.data.pivotEnv - newStateNumber = buddy.data.pivotHeader.unsafeGet.blockNumber + pivotHeader = maybeHeader.unsafeGet + newStateNumber = pivotHeader.blockNumber stateNumber = if env.isNil: 0.toBlockNumber else: env.stateHeader.blockNumber + stateWindow = stateNumber + maxPivotBlockWindow - when switchPivotAfterCoverage < 1.0: - if not env.isNil: - if stateNumber < newStateNumber and env.minCoverageReachedOk: - buddy.setPivotEnv(buddy.data.pivotHeader.get) - return true + block keepCurrent: + if env.isNil: + break keepCurrent # => new pivot + if stateNumber < newStateNumber: + when switchPivotAfterCoverage < 1.0: + if env.minCoverageReachedOk: + break keepCurrent # => new pivot + if stateWindow < newStateNumber: + break keepCurrent # => new pivot + if newStateNumber <= maxPivotBlockWindow: + break keepCurrent # => new pivot + # keep current + return false - if stateNumber + maxPivotBlockWindow < newStateNumber: - buddy.setPivotEnv(buddy.data.pivotHeader.get) - return true + # set new block + buddy.setPivotEnv(pivotHeader) + return true proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater = @@ -151,6 +177,39 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater = nStorage: meanStdDev(sSum, sSqSum, count), accountsFill: (accFill[0], accFill[1], accCoverage)) + +proc havePivot(buddy: SnapBuddyRef): bool = + ## ... + if buddy.data.pivotHeader.isSome and + buddy.data.pivotHeader.get.blockNumber != 0: + + # So there is a `ctx.data.pivotEnv` + when 1.0 <= switchPivotAfterCoverage: + return true + else: + let + ctx = buddy.ctx + env = ctx.data.pivotEnv + + # Force fetching new pivot if coverage reached by returning `false` + if not env.minCoverageReachedOk: + + # Not sure yet, so check whether coverage has been reached at all + let cov = env.availAccounts.freeFactor + if switchPivotAfterCoverage <= cov: + trace " Snap accounts coverage reached", peer, + threshold=switchPivotAfterCoverage, coverage=cov.toPC + + # Need to reset pivot handlers + buddy.ctx.poolMode = true + buddy.ctx.data.runPoolHook = proc(b: SnapBuddyRef) = + b.ctx.data.pivotEnv.minCoverageReachedOk = true + when usePivot2ok: + b.pivot2Restart + else: + b.pivotRestart + return true + # ------------------------------------------------------------------------------ # Public start/stop and admin functions # ------------------------------------------------------------------------------ @@ -187,7 +246,10 @@ proc start*(buddy: SnapBuddyRef): bool = if peer.supports(protocol.snap) and peer.supports(protocol.eth) and peer.state(protocol.eth).initialized: - buddy.pivotStart() + when usePivot2ok: + buddy.pivot2Start() + else: + buddy.pivotStart() if not ctx.data.ticker.isNil: ctx.data.ticker.startBuddy() return true @@ -198,7 +260,10 @@ proc stop*(buddy: SnapBuddyRef) = ctx = buddy.ctx peer = buddy.peer buddy.ctrl.stopped = true - buddy.pivotStop() + when usePivot2ok: + buddy.pivot2Stop() + else: + buddy.pivotStop() if not ctx.data.ticker.isNil: ctx.data.ticker.stopBuddy() @@ -218,7 +283,31 @@ proc runSingle*(buddy: SnapBuddyRef) {.async.} = ## ## Note that this function runs in `async` mode. ## - buddy.ctrl.multiOk = true + when usePivot2ok: + # + # Run alternative pivot finder. This one harmonises difficulties of at + # least two peers. The can only be one instance active/unfinished of the + # `pivot2Exec()` functions. + # + let peer = buddy.peer + if not buddy.havePivot: + if await buddy.pivot2Exec(): + discard buddy.updatePivotEnv() + else: + if not buddy.ctrl.stopped: + await sleepAsync(2.seconds) + return + + buddy.ctrl.multiOk = true + + trace "Snap pivot initialised", peer, + multiOk=buddy.ctrl.multiOk, runState=buddy.ctrl.state + else: + # + # The default pivot finder runs in multi mode. So there is nothing to do + # here. + # + buddy.ctrl.multiOk = true proc runPool*(buddy: SnapBuddyRef, last: bool) = @@ -253,34 +342,12 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} = let ctx = buddy.ctx peer = buddy.peer - var - havePivotOk = (buddy.data.pivotHeader.isSome and - buddy.data.pivotHeader.get.blockNumber != 0) - # Switch pivot state root if this much coverage has been achieved, already - when switchPivotAfterCoverage < 1.0: - if havePivotOk: - # So there is a `ctx.data.pivotEnv` - if ctx.data.pivotEnv.minCoverageReachedOk: - # Force fetching new pivot if coverage reached - havePivotOk = false - else: - # Not sure yet, so check whether coverage has been reached at all - let cov = ctx.data.pivotEnv.availAccounts.freeFactor - if switchPivotAfterCoverage <= cov: - trace " Snap accounts coverage reached", - threshold=switchPivotAfterCoverage, coverage=cov.toPC - # Need to reset pivot handlers - buddy.ctx.poolMode = true - buddy.ctx.data.runPoolHook = proc(b: SnapBuddyRef) = - b.ctx.data.pivotEnv.minCoverageReachedOk = true - b.pivotRestart - return - - if not havePivotOk: - await buddy.pivotExec() - if not buddy.updatePivotEnv(): - return + when not usePivot2ok: + if not buddy.havePivot: + await buddy.pivotExec() + if not buddy.updatePivotEnv(): + return # Ignore rest if the pivot is still acceptably covered when switchPivotAfterCoverage < 1.0: @@ -288,7 +355,9 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} = await sleepAsync(50.milliseconds) return - if await buddy.fetchAccounts(): + await buddy.fetchAccounts() + + if ctx.data.pivotEnv.repairState == Done: buddy.ctrl.multiOk = false buddy.data.pivotHeader = none(BlockHeader) diff --git a/nimbus/sync/snap/worker/accounts_db.nim b/nimbus/sync/snap/worker/accounts_db.nim index da3863f11..375459ef6 100644 --- a/nimbus/sync/snap/worker/accounts_db.nim +++ b/nimbus/sync/snap/worker/accounts_db.nim @@ -21,7 +21,7 @@ import "../.."/[protocol, types], ../range_desc, ./db/[bulk_storage, hexary_defs, hexary_desc, hexary_import, - hexary_interpolate, hexary_paths, rocky_bulk_load] + hexary_interpolate, hexary_inspect, hexary_paths, rocky_bulk_load] {.push raises: [Defect].} @@ -31,6 +31,9 @@ logScope: export HexaryDbError +const + extraTraceMessages = false # or true + type AccountsDbRef* = ref object db: TrieDatabaseRef ## General database @@ -41,21 +44,21 @@ type base: AccountsDbRef ## Back reference to common parameters peer: Peer ## For log messages accRoot: NodeKey ## Current accounts root node - accDb: HexaryTreeDB ## Accounts database - stoDb: HexaryTreeDB ## Storage database + accDb: HexaryTreeDbRef ## Accounts database + stoDb: HexaryTreeDbRef ## Storage database # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ -proc newHexaryTreeDb(ps: AccountsDbSessionRef): HexaryTreeDB = - result.keyPp = ps.stoDb.keyPp # for debugging, will go away +proc newHexaryTreeDbRef(ps: AccountsDbSessionRef): HexaryTreeDbRef = + HexaryTreeDbRef(keyPp: ps.stoDb.keyPp) # for debugging, will go away proc to(h: Hash256; T: type NodeKey): T = h.data.T proc convertTo(data: openArray[byte]; T: type Hash256): T = - discard result.NodeHash.init(data) # error => zero + discard result.data.NodeKey.init(data) # size error => zero template noKeyError(info: static[string]; code: untyped) = try: @@ -68,6 +71,8 @@ template noRlpExceptionOops(info: static[string]; code: untyped) = code except RlpError: return err(RlpEncoding) + except KeyError as e: + raiseAssert "Not possible (" & info & "): " & e.msg except Defect as e: raise e except Exception as e: @@ -118,7 +123,7 @@ proc pp(a: NodeTag; ps: AccountsDbSessionRef): string = proc mergeProofs( peer: Peer, ## For log messages - db: var HexaryTreeDB; ## Database table + db: HexaryTreeDbRef; ## Database table root: NodeKey; ## Root for checking nodes proof: seq[Blob]; ## Node records freeStandingOk = false; ## Remove freestanding nodes @@ -155,28 +160,30 @@ proc mergeProofs( proc persistentAccounts( - ps: AccountsDbSessionRef + db: HexaryTreeDbRef; ## Current table + pv: AccountsDbRef; ## Persistent database ): Result[void,HexaryDbError] {.gcsafe, raises: [Defect,OSError,KeyError].} = ## Store accounts trie table on databse - if ps.base.rocky.isNil: - let rc = ps.accDb.bulkStorageAccounts(ps.base.db) + if pv.rocky.isNil: + let rc = db.bulkStorageAccounts(pv.db) if rc.isErr: return rc else: - let rc = ps.accDb.bulkStorageAccountsRocky(ps.base.rocky) + let rc = db.bulkStorageAccountsRocky(pv.rocky) if rc.isErr: return rc ok() proc persistentStorages( - ps: AccountsDbSessionRef + db: HexaryTreeDbRef; ## Current table + pv: AccountsDbRef; ## Persistent database ): Result[void,HexaryDbError] {.gcsafe, raises: [Defect,OSError,KeyError].} = ## Store accounts trie table on databse - if ps.base.rocky.isNil: - let rc = ps.stoDb.bulkStorageStorages(ps.base.db) + if pv.rocky.isNil: + let rc = db.bulkStorageStorages(pv.db) if rc.isErr: return rc else: - let rc = ps.stoDb.bulkStorageStoragesRocky(ps.base.rocky) + let rc = db.bulkStorageStoragesRocky(pv.rocky) if rc.isErr: return rc ok() @@ -271,7 +278,7 @@ proc importStorageSlots*( stoRoot = data.account.storageRoot.to(NodeKey) var slots: seq[RLeafSpecs] - db = ps.newHexaryTreeDB() + db = ps.newHexaryTreeDbRef() if 0 < proof.len: let rc = ps.peer.mergeProofs(db, stoRoot, proof) @@ -326,7 +333,9 @@ proc init*( let desc = AccountsDbSessionRef( base: pv, peer: peer, - accRoot: root.to(NodeKey)) + accRoot: root.to(NodeKey), + accDb: HexaryTreeDbRef(), + stoDb: HexaryTreeDbRef()) # Debugging, might go away one time ... desc.accDb.keyPp = proc(key: RepairKey): string = key.pp(desc) @@ -334,6 +343,27 @@ proc init*( return desc +proc dup*( + ps: AccountsDbSessionRef; + root: Hash256; + peer: Peer; + ): AccountsDbSessionRef = + ## Resume a session with different `root` key and `peer`. This new session + ## will access the same memory database as the `ps` argument session. + AccountsDbSessionRef( + base: ps.base, + peer: peer, + accRoot: root.to(NodeKey), + accDb: ps.accDb, + stoDb: ps.stoDb) + +proc dup*( + ps: AccountsDbSessionRef; + root: Hash256; + ): AccountsDbSessionRef = + ## Variant of `dup()` without the `peer` argument. + ps.dup(root, ps.peer) + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ @@ -350,11 +380,11 @@ proc importAccounts*( ps: AccountsDbSessionRef; ## Re-usable session descriptor base: NodeTag; ## before or at first account entry in `data` data: PackedAccountRange; ## re-packed `snap/1 ` reply data - storeOk = false; ## store data on disk + persistent = false; ## store data on disk ): Result[void,HexaryDbError] = ## Validate and import accounts (using proofs as received with the snap ## message `AccountRange`). This function accumulates data in a memory table - ## which can be written to disk with the argument `storeOk` set `true`. The + ## which can be written to disk with the argument `persistent` set `true`. The ## memory table is held in the descriptor argument`ps`. ## ## Note that the `peer` argument is for log messages, only. @@ -374,8 +404,8 @@ proc importAccounts*( ps.accRoot, accounts, bootstrap = (data.proof.len == 0)) if rc.isErr: return err(rc.error) - if storeOk: - let rc = ps.persistentAccounts() + if persistent: + let rc = ps.accDb.persistentAccounts(ps.base) if rc.isErr: return err(rc.error) except RlpError: @@ -386,9 +416,10 @@ proc importAccounts*( trace "Import Accounts exception", peer=ps.peer, name=($e.name), msg=e.msg return err(OSErrorException) - trace "Accounts and proofs ok", peer=ps.peer, - root=ps.accRoot.ByteArray32.toHex, - proof=data.proof.len, base, accounts=data.accounts.len + when extraTraceMessages: + trace "Accounts and proofs ok", peer=ps.peer, + root=ps.accRoot.ByteArray32.toHex, + proof=data.proof.len, base, accounts=data.accounts.len ok() proc importAccounts*( @@ -397,21 +428,21 @@ proc importAccounts*( root: Hash256; ## state root base: NodeTag; ## before or at first account entry in `data` data: PackedAccountRange; ## re-packed `snap/1 ` reply data - storeOk = false; ## store data on disk ): Result[void,HexaryDbError] = ## Variant of `importAccounts()` - AccountsDbSessionRef.init(pv, root, peer).importAccounts(base, data, storeOk) + AccountsDbSessionRef.init( + pv, root, peer).importAccounts(base, data, persistent=true) proc importStorages*( ps: AccountsDbSessionRef; ## Re-usable session descriptor data: AccountStorageRange; ## Account storage reply from `snap/1` protocol - storeOk = false; ## store data on disk + persistent = false; ## store data on disk ): Result[void,seq[(int,HexaryDbError)]] = ## Validate and import storage slots (using proofs as received with the snap ## message `StorageRanges`). This function accumulates data in a memory table - ## which can be written to disk with the argument `storeOk` set `true`. The + ## which can be written to disk with the argument `persistent` set `true`. The ## memory table is held in the descriptor argument`ps`. ## ## Note that the `peer` argument is for log messages, only. @@ -447,9 +478,9 @@ proc importStorages*( errors.add (slotID,rc.error) # Store to disk - if storeOk: + if persistent: slotID = -1 - let rc = ps.persistentStorages() + let rc = ps.stoDb.persistentStorages(ps.base) if rc.isErr: errors.add (slotID,rc.error) @@ -465,19 +496,118 @@ proc importStorages*( # So non-empty error list is guaranteed return err(errors) - trace "Storage slots imported", peer=ps.peer, - slots=data.storages.len, proofs=data.proof.len - + when extraTraceMessages: + trace "Storage slots imported", peer=ps.peer, + slots=data.storages.len, proofs=data.proof.len ok() proc importStorages*( pv: AccountsDbRef; ## Base descriptor on `BaseChainDB` peer: Peer, ## For log messages, only data: AccountStorageRange; ## Account storage reply from `snap/1` protocol - storeOk = false; ## store data on disk ): Result[void,seq[(int,HexaryDbError)]] = ## Variant of `importStorages()` - AccountsDbSessionRef.init(pv, Hash256(), peer).importStorages(data, storeOk) + AccountsDbSessionRef.init( + pv, Hash256(), peer).importStorages(data, persistent=true) + + + +proc importRawNodes*( + ps: AccountsDbSessionRef; ## Re-usable session descriptor + nodes: openArray[Blob]; ## Node records + persistent = false; ## store data on disk + ): Result[void,seq[(int,HexaryDbError)]] = + ## ... + var + errors: seq[(int,HexaryDbError)] + nodeID = -1 + let + db = ps.newHexaryTreeDbRef() + try: + # Import nodes + for n,rec in nodes: + nodeID = n + let rc = db.hexaryImport(rec) + if rc.isErr: + let error = rc.error + trace "importRawNodes()", peer=ps.peer, item=n, nodes=nodes.len, error + errors.add (nodeID,error) + + # Store to disk + if persistent: + nodeID = -1 + let rc = db.persistentAccounts(ps.base) + if rc.isErr: + errors.add (nodeID,rc.error) + + except RlpError: + errors.add (nodeID,RlpEncoding) + except KeyError as e: + raiseAssert "Not possible @ importAccounts: " & e.msg + except OSError as e: + trace "Import Accounts exception", peer=ps.peer, name=($e.name), msg=e.msg + errors.add (nodeID,RlpEncoding) + + if 0 < errors.len: + return err(errors) + + trace "Raw nodes imported", peer=ps.peer, nodes=nodes.len + ok() + +proc importRawNodes*( + pv: AccountsDbRef; ## Base descriptor on `BaseChainDB` + peer: Peer, ## For log messages, only + nodes: openArray[Blob]; ## Node records + ): Result[void,seq[(int,HexaryDbError)]] = + ## Variant of `importRawNodes()` for persistent storage. + AccountsDbSessionRef.init( + pv, Hash256(), peer).importRawNodes(nodes, persistent=true) + + +proc inspectAccountsTrie*( + ps: AccountsDbSessionRef; ## Re-usable session descriptor + pathList = seq[Blob].default; ## Starting nodes for search + persistent = false; ## Read data from disk + ignoreError = false; ## Return partial results if available + ): Result[TrieNodeStat, HexaryDbError] = + ## Starting with the argument list `pathSet`, find all the non-leaf nodes in + ## the hexary trie which have at least one node key reference missing in + ## the trie database. + ## + var stats: TrieNodeStat + noRlpExceptionOops("inspectAccountsTrie()"): + if persistent: + let getFn: HexaryGetFn = proc(key: Blob): Blob = ps.base.db.get(key) + stats = getFn.hexaryInspectTrie(ps.accRoot, pathList) + else: + stats = ps.accDb.hexaryInspectTrie(ps.accRoot, pathList) + + let + peer = ps.peer + pathList = pathList.len + nDangling = stats.dangling.len + + if stats.stoppedAt != 0: + let error = LoopAlert + trace "Inspect account trie loop", peer, pathList, nDangling, + stoppedAt=stats.stoppedAt, error + if ignoreError: + return ok(stats) + return err(error) + + trace "Inspect account trie ok", peer, pathList, nDangling + return ok(stats) + +proc inspectAccountsTrie*( + pv: AccountsDbRef; ## Base descriptor on `BaseChainDB` + peer: Peer, ## For log messages, only + root: Hash256; ## state root + pathList = seq[Blob].default; ## Starting paths for search + ignoreError = false; ## Return partial results if available + ): Result[TrieNodeStat, HexaryDbError] = + ## Variant of `inspectAccountsTrie()` for persistent storage. + AccountsDbSessionRef.init( + pv, root, peer).inspectAccountsTrie(pathList, persistent=true, ignoreError) # ------------------------------------------------------------------------------ # Debugging (and playing with the hexary database) @@ -585,6 +715,10 @@ proc dumpAccDB*(ps: AccountsDbSessionRef; indent = 4): string = ## Dump the entries from the a generic accounts trie. ps.accDb.pp(ps.accRoot,indent) +proc getAcc*(ps: AccountsDbSessionRef): HexaryTreeDbRef = + ## Low level access to accounts DB + ps.accDb + proc hexaryPpFn*(ps: AccountsDbSessionRef): HexaryPpFn = ## Key mapping function used in `HexaryTreeDB` ps.accDb.keyPp diff --git a/nimbus/sync/snap/worker/get_account_range.nim b/nimbus/sync/snap/worker/com/get_account_range.nim similarity index 90% rename from nimbus/sync/snap/worker/get_account_range.nim rename to nimbus/sync/snap/worker/com/get_account_range.nim index f75fe7ccb..bbcdb6e10 100644 --- a/nimbus/sync/snap/worker/get_account_range.nim +++ b/nimbus/sync/snap/worker/com/get_account_range.nim @@ -1,5 +1,4 @@ -# Nimbus - Fetch account and storage states from peers by snapshot traversal -# +# Nimbus # Copyright (c) 2021 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or @@ -18,8 +17,9 @@ import chronos, eth/[common/eth_types, p2p, trie/trie_defs], stew/interval_set, - "../.."/[protocol, protocol/trace_config], - ".."/[range_desc, worker_desc] + "../../.."/[protocol, protocol/trace_config], + "../.."/[range_desc, worker_desc], + ./get_error {.push raises: [Defect].} @@ -27,15 +27,6 @@ logScope: topics = "snap-fetch" type - GetAccountRangeError* = enum - GareNothingSerious - GareMissingProof - GareAccountsMinTooSmall - GareAccountsMaxTooLarge - GareNoAccountsForStateRoot - GareNetworkProblem - GareResponseTimeout - GetAccountRange* = object consumed*: LeafRange ## Real accounts interval covered data*: PackedAccountRange ## Re-packed reply data @@ -69,7 +60,7 @@ proc getAccountRange*( buddy: SnapBuddyRef; stateRoot: Hash256; iv: LeafRange - ): Future[Result[GetAccountRange,GetAccountRangeError]] {.async.} = + ): Future[Result[GetAccountRange,ComError]] {.async.} = ## Fetch data using the `snap#` protocol, returns the range covered. let peer = buddy.peer @@ -80,10 +71,10 @@ proc getAccountRange*( var dd = block: let rc = await buddy.getAccountRangeReq(stateRoot, iv) if rc.isErr: - return err(GareNetworkProblem) + return err(ComNetworkProblem) if rc.value.isNone: trace trSnapRecvTimeoutWaiting & "for reply to GetAccountRange", peer - return err(GareResponseTimeout) + return err(ComResponseTimeout) let snAccRange = rc.value.get GetAccountRange( consumed: iv, @@ -119,7 +110,7 @@ proc getAccountRange*( # Maybe try another peer trace trSnapRecvReceived & "empty AccountRange", peer, nAccounts, nProof, accRange="n/a", reqRange=iv, stateRoot - return err(GareNoAccountsForStateRoot) + return err(ComNoAccountsForStateRoot) # So there is no data, otherwise an account beyond the interval end # `iv.maxPt` would have been returned. @@ -144,14 +135,14 @@ proc getAccountRange*( trace trSnapRecvProtocolViolation & "proof-less AccountRange", peer, nAccounts, nProof, accRange=LeafRange.new(iv.minPt, accMaxPt), reqRange=iv, stateRoot - return err(GareMissingProof) + return err(ComMissingProof) if accMinPt < iv.minPt: # Not allowed trace trSnapRecvProtocolViolation & "min too small in AccountRange", peer, nAccounts, nProof, accRange=LeafRange.new(accMinPt, accMaxPt), reqRange=iv, stateRoot - return err(GareAccountsMinTooSmall) + return err(ComAccountsMinTooSmall) if iv.maxPt < accMaxPt: # github.com/ethereum/devp2p/blob/master/caps/snap.md#getaccountrange-0x00: @@ -168,7 +159,7 @@ proc getAccountRange*( trace trSnapRecvProtocolViolation & "AccountRange top exceeded", peer, nAccounts, nProof, accRange=LeafRange.new(iv.minPt, accMaxPt), reqRange=iv, stateRoot - return err(GareAccountsMaxTooLarge) + return err(ComAccountsMaxTooLarge) dd.consumed = LeafRange.new(iv.minPt, max(iv.maxPt,accMaxPt)) trace trSnapRecvReceived & "AccountRange", peer, diff --git a/nimbus/sync/snap/worker/com/get_byte_codes.nim b/nimbus/sync/snap/worker/com/get_byte_codes.nim new file mode 100644 index 000000000..6a272f13a --- /dev/null +++ b/nimbus/sync/snap/worker/com/get_byte_codes.nim @@ -0,0 +1,129 @@ +# Nimbus +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +## Note: this module is currently unused + +import + std/[options, sequtils], + chronos, + eth/[common/eth_types, p2p], + "../../.."/[protocol, protocol/trace_config], + "../.."/[range_desc, worker_desc], + ./get_error + +{.push raises: [Defect].} + +logScope: + topics = "snap-fetch" + +type + # SnapByteCodes* = object + # codes*: seq[Blob] + + GetByteCodes* = object + leftOver*: seq[NodeKey] + kvPairs*: seq[(Nodekey,Blob)] + +const + emptyBlob = seq[byte].default + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc getByteCodesReq( + buddy: SnapBuddyRef; + keys: seq[Hash256]; + ): Future[Result[Option[SnapByteCodes],void]] + {.async.} = + let + peer = buddy.peer + try: + let reply = await peer.getByteCodes(keys, snapRequestBytesLimit) + return ok(reply) + + except CatchableError as e: + trace trSnapRecvError & "waiting for GetByteCodes reply", peer, + error=e.msg + return err() + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc getByteCodes*( + buddy: SnapBuddyRef; + keys: seq[NodeKey], + ): Future[Result[GetByteCodes,ComError]] + {.async.} = + ## Fetch data using the `snap#` protocol, returns the byte codes requested + ## (if any.) + let + peer = buddy.peer + nKeys = keys.len + + if nKeys == 0: + return err(ComEmptyRequestArguments) + + if trSnapTracePacketsOk: + trace trSnapSendSending & "GetByteCodes", peer, + nkeys, bytesLimit=snapRequestBytesLimit + + let byteCodes = block: + let rc = await buddy.getByteCodesReq( + keys.mapIt(Hash256(data: it.ByteArray32))) + if rc.isErr: + return err(ComNetworkProblem) + if rc.value.isNone: + trace trSnapRecvTimeoutWaiting & "for reply to GetByteCodes", peer, nKeys + return err(ComResponseTimeout) + let blobs = rc.value.get.codes + if nKeys < blobs.len: + # Ooops, makes no sense + return err(ComTooManyByteCodes) + blobs + + let + nCodes = byteCodes.len + + if nCodes == 0: + # github.com/ethereum/devp2p/blob/master/caps/snap.md#getbytecodes-0x04 + # + # Notes: + # * Nodes must always respond to the query. + # * The returned codes must be in the request order. + # * The responding node is allowed to return less data than requested + # (serving QoS limits), but the node must return at least one bytecode, + # unless none requested are available, in which case it must answer with + # an empty response. + # * If a bytecode is unavailable, the node must skip that slot and proceed + # to the next one. The node must not return nil or other placeholders. + trace trSnapRecvReceived & "empty ByteCodes", peer, nKeys, nCodes + return err(ComNoByteCodesAvailable) + + # Assemble return value + var dd: GetByteCodes + + for n in 0 ..< nCodes: + if byteCodes[n].len == 0: + dd.leftOver.add keys[n] + else: + dd.kvPairs.add (keys[n], byteCodes[n]) + + dd.leftOver.add keys[byteCodes.len+1 ..< nKeys] + + trace trSnapRecvReceived & "ByteCodes", peer, + nKeys, nCodes, nLeftOver=dd.leftOver.len + + return ok(dd) + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/com/get_error.nim b/nimbus/sync/snap/worker/com/get_error.nim new file mode 100644 index 000000000..e649d3b9c --- /dev/null +++ b/nimbus/sync/snap/worker/com/get_error.nim @@ -0,0 +1,34 @@ +# Nimbus +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +type + ComError* = enum + ComNothingSerious + ComAccountsMaxTooLarge + ComAccountsMinTooSmall + ComEmptyAccountsArguments + ComEmptyRequestArguments + ComMissingProof + ComNetworkProblem + ComNoAccountsForStateRoot + ComNoByteCodesAvailable + ComNoDataForProof + ComNoStorageForAccounts + ComNoTrieNodesAvailable + ComResponseTimeout + ComTooManyByteCodes + ComTooManyStorageSlots + ComTooManyTrieNodes + + # Other errors not directly related to communication + ComInspectDbFailed + ComImportAccountsFailed + +# End diff --git a/nimbus/sync/snap/worker/get_storage_ranges.nim b/nimbus/sync/snap/worker/com/get_storage_ranges.nim similarity index 62% rename from nimbus/sync/snap/worker/get_storage_ranges.nim rename to nimbus/sync/snap/worker/com/get_storage_ranges.nim index 9fcb35ae2..906dbc6d7 100644 --- a/nimbus/sync/snap/worker/get_storage_ranges.nim +++ b/nimbus/sync/snap/worker/com/get_storage_ranges.nim @@ -14,8 +14,9 @@ import chronos, eth/[common/eth_types, p2p], stew/interval_set, - "../.."/[protocol, protocol/trace_config], - ".."/[range_desc, worker_desc] + "../../.."/[protocol, protocol/trace_config], + "../.."/[range_desc, worker_desc], + ./get_error {.push raises: [Defect].} @@ -23,14 +24,6 @@ logScope: topics = "snap-fetch" type - GetStorageRangesError* = enum - GsreNothingSerious - GsreEmptyAccountsArguments - GsreNoStorageForAccounts - GsreTooManyStorageSlots - GsreNetworkProblem - GsreResponseTimeout - # SnapStorage* = object # slotHash*: Hash256 # slotData*: Blob @@ -40,7 +33,7 @@ type # proof*: SnapStorageProof GetStorageRanges* = object - leftOver*: SnapSlotQueueItemRef + leftOver*: seq[SnapSlotQueueItemRef] data*: AccountStorageRange const @@ -85,11 +78,24 @@ proc getStorageRangesReq( # Public functions # ------------------------------------------------------------------------------ +proc addLeftOver*(dd: var GetStorageRanges; accounts: seq[AccountSlotsHeader]) = + ## Helper for maintaining the `leftOver` queue + if 0 < accounts.len: + if accounts[0].firstSlot != Hash256() or dd.leftOver.len == 0: + dd.leftOver.add SnapSlotQueueItemRef(q: accounts) + else: + dd.leftOver[^1].q = dd.leftOver[^1].q & accounts + +proc addLeftOver*(dd: var GetStorageRanges; account: AccountSlotsHeader) = + ## Variant of `addLeftOver()` + dd.addLeftOver @[account] + + proc getStorageRanges*( buddy: SnapBuddyRef; stateRoot: Hash256; accounts: seq[AccountSlotsHeader], - ): Future[Result[GetStorageRanges,GetStorageRangesError]] + ): Future[Result[GetStorageRanges,ComError]] {.async.} = ## Fetch data using the `snap#` protocol, returns the range covered. ## @@ -104,8 +110,8 @@ proc getStorageRanges*( maybeIv = none(LeafRange) if nAccounts == 0: - return err(GsreEmptyAccountsArguments) - if accounts[0].firstSlot != Hash256.default: + return err(ComEmptyAccountsArguments) + if accounts[0].firstSlot != Hash256(): # Set up for range maybeIv = some(LeafRange.new( accounts[0].firstSlot.to(NodeTag), high(NodeTag))) @@ -114,29 +120,25 @@ proc getStorageRanges*( trace trSnapSendSending & "GetStorageRanges", peer, nAccounts, stateRoot, bytesLimit=snapRequestBytesLimit - var dd = block: + let snStoRanges = block: let rc = await buddy.getStorageRangesReq( stateRoot, accounts.mapIt(it.accHash), maybeIv) if rc.isErr: - return err(GsreNetworkProblem) + return err(ComNetworkProblem) if rc.value.isNone: - trace trSnapRecvTimeoutWaiting & "for reply to GetStorageRanges", peer - return err(GsreResponseTimeout) - let snStoRanges = rc.value.get - if nAccounts < snStoRanges.slots.len: + trace trSnapRecvTimeoutWaiting & "for reply to GetStorageRanges", peer, + nAccounts + return err(ComResponseTimeout) + if nAccounts < rc.value.get.slots.len: # Ooops, makes no sense - return err(GsreTooManyStorageSlots) - GetStorageRanges( - data: AccountStorageRange( - proof: snStoRanges.proof, - storages: snStoRanges.slots.mapIt( - AccountSlots( - data: it)))) - let - nStorages = dd.data.storages.len - nProof = dd.data.proof.len + return err(ComTooManyStorageSlots) + rc.value.get - if nStorages == 0: + let + nSlots = snStoRanges.slots.len + nProof = snStoRanges.proof.len + + if nSlots == 0: # github.com/ethereum/devp2p/blob/master/caps/snap.md#getstorageranges-0x02: # # Notes: @@ -146,31 +148,48 @@ proc getStorageRanges*( # the responsibility of the caller to query an state not older than 128 # blocks; and the caller is expected to only ever query existing accounts. trace trSnapRecvReceived & "empty StorageRanges", peer, - nAccounts, nStorages, nProof, stateRoot - return err(GsreNoStorageForAccounts) + nAccounts, nSlots, nProof, stateRoot, firstAccount=accounts[0].accHash + return err(ComNoStorageForAccounts) - # Complete response data - for n in 0 ..< nStorages: - dd.data.storages[n].account = accounts[n] + # Assemble return structure for given peer response + var dd = GetStorageRanges(data: AccountStorageRange(proof: snStoRanges.proof)) - # Calculate what was not fetched + # Filter `slots` responses: + # * Accounts for empty ones go back to the `leftOver` list. + for n in 0 ..< nSlots: + # Empty data for a slot indicates missing data + if snStoRanges.slots[n].len == 0: + dd.addLeftOver accounts[n] + else: + dd.data.storages.add AccountSlots( + account: accounts[n], # known to be no fewer accounts than slots + data: snStoRanges.slots[n]) + + # Complete the part that was not answered by the peer if nProof == 0: - dd.leftOver = SnapSlotQueueItemRef(q: accounts[nStorages ..< nAccounts]) + dd.addLeftOver accounts[nSlots ..< nAccounts] # empty slice is ok else: + if snStoRanges.slots[^1].len == 0: + # `dd.data.storages.len == 0` => `snStoRanges.slots[^1].len == 0` as + # it was confirmed that `0 < nSlots == snStoRanges.slots.len` + return err(ComNoDataForProof) + # If the storage data for the last account comes with a proof, then it is # incomplete. So record the missing part on the `dd.leftOver` list. let top = dd.data.storages[^1].data[^1].slotHash.to(NodeTag) + + # Contrived situation with `top==high()`: any right proof will be useless + # so it is just ignored (i.e. `firstSlot` is zero in first slice.) if top < high(NodeTag): - dd.leftOver = SnapSlotQueueItemRef(q: accounts[nStorages-1 ..< nAccounts]) - dd.leftOver.q[0].firstSlot = (top + 1.u256).to(Hash256) - else: - # Contrived situation: the proof would be useless - dd.leftOver = SnapSlotQueueItemRef(q: accounts[nStorages ..< nAccounts]) - - # Notice that `dd.leftOver.len < nAccounts` as 0 < nStorages + dd.addLeftOver AccountSlotsHeader( + firstSlot: (top + 1.u256).to(Hash256), + accHash: accounts[nSlots-1].accHash, + storageRoot: accounts[nSlots-1].storageRoot) + dd.addLeftOver accounts[nSlots ..< nAccounts] # empty slice is ok + let nLeftOver = dd.leftOver.foldl(a + b.q.len, 0) trace trSnapRecvReceived & "StorageRanges", peer, - nAccounts, nStorages, nProof, nLeftOver=dd.leftOver.q.len, stateRoot + nAccounts, nSlots, nProof, nLeftOver, stateRoot return ok(dd) diff --git a/nimbus/sync/snap/worker/com/get_trie_nodes.nim b/nimbus/sync/snap/worker/com/get_trie_nodes.nim new file mode 100644 index 000000000..7a2aacc48 --- /dev/null +++ b/nimbus/sync/snap/worker/com/get_trie_nodes.nim @@ -0,0 +1,152 @@ +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +import + std/[options, sequtils], + chronos, + eth/[common/eth_types, p2p], + "../../.."/[protocol, protocol/trace_config], + ../../worker_desc, + ./get_error + +{.push raises: [Defect].} + +logScope: + topics = "snap-fetch" + +type + # SnapTrieNodes = object + # nodes*: seq[Blob] + + GetTrieNodes* = object + leftOver*: seq[seq[Blob]] + nodes*: seq[Blob] + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc getTrieNodesReq( + buddy: SnapBuddyRef; + stateRoot: Hash256; + paths: seq[seq[Blob]]; + ): Future[Result[Option[SnapTrieNodes],void]] + {.async.} = + let + peer = buddy.peer + try: + let reply = await peer.getTrieNodes(stateRoot, paths, snapRequestBytesLimit) + return ok(reply) + + except CatchableError as e: + trace trSnapRecvError & "waiting for GetByteCodes reply", peer, + error=e.msg + return err() + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc getTrieNodes*( + buddy: SnapBuddyRef; + stateRoot: Hash256; + paths: seq[seq[Blob]], + ): Future[Result[GetTrieNodes,ComError]] + {.async.} = + ## Fetch data using the `snap#` protocol, returns the trie nodes requested + ## (if any.) + let + peer = buddy.peer + nPaths = paths.len + + if nPaths == 0: + return err(ComEmptyRequestArguments) + + let nTotal = paths.mapIt(it.len).foldl(a+b, 0) + + if trSnapTracePacketsOk: + trace trSnapSendSending & "GetTrieNodes", peer, + nPaths, nTotal, bytesLimit=snapRequestBytesLimit + + let trieNodes = block: + let rc = await buddy.getTrieNodesReq(stateRoot, paths) + if rc.isErr: + return err(ComNetworkProblem) + if rc.value.isNone: + trace trSnapRecvTimeoutWaiting & "for reply to GetTrieNodes", peer, nPaths + return err(ComResponseTimeout) + let blobs = rc.value.get.nodes + if nTotal < blobs.len: + # Ooops, makes no sense + return err(ComTooManyTrieNodes) + blobs + + let + nNodes = trieNodes.len + + if nNodes == 0: + # github.com/ethereum/devp2p/blob/master/caps/snap.md#gettrienodes-0x06 + # + # Notes: + # * Nodes must always respond to the query. + # * The returned nodes must be in the request order. + # * If the node does not have the state for the requested state root or for + # any requested account paths, it must return an empty reply. It is the + # responsibility of the caller to query an state not older than 128 + # blocks; and the caller is expected to only ever query existing trie + # nodes. + # * The responding node is allowed to return less data than requested + # (serving QoS limits), but the node must return at least one trie node. + trace trSnapRecvReceived & "empty TrieNodes", peer, nPaths, nNodes + return err(ComNoByteCodesAvailable) + + # Assemble return value + var dd = GetTrieNodes(nodes: trieNodes) + + # For each request group/sub-sequence, analyse the results + var nInx = 0 + block loop: + for n in 0 ..< nPaths: + let pathLen = paths[n].len + + # Account node request + if pathLen < 2: + if trieNodes[nInx].len == 0: + dd.leftOver.add paths[n] + nInx.inc + if nInx < nNodes: + continue + # all the rest needs to be re-processed + dd.leftOver = dd.leftOver & paths[n+1 ..< nPaths] + break loop + + # Storage request for account + if 1 < pathLen: + var pushBack: seq[Blob] + for i in 1 ..< pathLen: + if trieNodes[nInx].len == 0: + pushBack.add paths[n][i] + nInx.inc + if nInx < nNodes: + continue + # all the rest needs to be re-processed + # + # add: account & pushBack & rest ... + dd.leftOver.add paths[n][0] & pushBack & paths[n][i+1 ..< pathLen] + dd.leftOver = dd.leftOver & paths[n+1 ..< nPaths] + break loop + if 0 < pushBack.len: + dd.leftOver.add paths[n][0] & pushBack + + trace trSnapRecvReceived & "TrieNodes", peer, + nPaths, nNodes, nLeftOver=dd.leftOver.len + + return ok(dd) + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/db/bulk_storage.nim b/nimbus/sync/snap/worker/db/bulk_storage.nim index ca113bb25..0464450c9 100644 --- a/nimbus/sync/snap/worker/db/bulk_storage.nim +++ b/nimbus/sync/snap/worker/db/bulk_storage.nim @@ -33,12 +33,12 @@ proc to(tag: NodeTag; T: type RepairKey): T = tag.to(NodeKey).to(RepairKey) proc convertTo(key: RepairKey; T: type NodeKey): T = - if key.isNodeKey: - discard result.init(key.ByteArray33[1 .. 32]) + ## Might be lossy, check before use + discard result.init(key.ByteArray33[1 .. 32]) proc convertTo(key: RepairKey; T: type NodeTag): T = - if key.isNodeKey: - result = UInt256.fromBytesBE(key.ByteArray33[1 .. 32]).T + ## Might be lossy, check before use + UInt256.fromBytesBE(key.ByteArray33[1 .. 32]).T # ------------------------------------------------------------------------------ # Private helpers for bulk load testing @@ -80,7 +80,7 @@ proc bulkStorageClearRockyCacheFile*(rocky: RocksStoreRef): bool = # ------------------------------------------------------------------------------ proc bulkStorageAccounts*( - db: HexaryTreeDB; + db: HexaryTreeDbRef; base: TrieDatabaseRef ): Result[void,HexaryDbError] = ## Bulk load using transactional `put()` @@ -96,7 +96,7 @@ proc bulkStorageAccounts*( ok() proc bulkStorageStorages*( - db: HexaryTreeDB; + db: HexaryTreeDbRef; base: TrieDatabaseRef ): Result[void,HexaryDbError] = ## Bulk load using transactional `put()` @@ -113,7 +113,7 @@ proc bulkStorageStorages*( proc bulkStorageAccountsRocky*( - db: HexaryTreeDB; + db: HexaryTreeDbRef; rocky: RocksStoreRef ): Result[void,HexaryDbError] {.gcsafe, raises: [Defect,OSError,KeyError].} = @@ -162,7 +162,7 @@ proc bulkStorageAccountsRocky*( proc bulkStorageStoragesRocky*( - db: HexaryTreeDB; + db: HexaryTreeDbRef; rocky: RocksStoreRef ): Result[void,HexaryDbError] {.gcsafe, raises: [Defect,OSError,KeyError].} = diff --git a/nimbus/sync/snap/worker/db/hexary_defs.nim b/nimbus/sync/snap/worker/db/hexary_defs.nim index 2117c45a2..11e0a7888 100644 --- a/nimbus/sync/snap/worker/db/hexary_defs.nim +++ b/nimbus/sync/snap/worker/db/hexary_defs.nim @@ -16,19 +16,25 @@ type AccountSmallerThanBase AccountsNotSrictlyIncreasing AccountRangesOverlap - AccountRepairBlocked + RlpEncoding + SlotsNotSrictlyIncreasing + LoopAlert + + # import DifferentNodeValueExists - InternalDbInconsistency - RightBoundaryProofFailed + ExpectedNodeKeyDiffers Rlp2Or17ListEntries RlpBlobExpected RlpBranchLinkExpected - RlpEncoding RlpExtPathEncoding RlpNonEmptyBlobExpected + + # interpolate + AccountRepairBlocked + InternalDbInconsistency + RightBoundaryProofFailed RootNodeMismatch RootNodeMissing - SlotsNotSrictlyIncreasing # bulk storage AddBulkItemFailed diff --git a/nimbus/sync/snap/worker/db/hexary_desc.nim b/nimbus/sync/snap/worker/db/hexary_desc.nim index 61638cefd..e95271a70 100644 --- a/nimbus/sync/snap/worker/db/hexary_desc.nim +++ b/nimbus/sync/snap/worker/db/hexary_desc.nim @@ -9,7 +9,7 @@ # except according to those terms. import - std/[algorithm, hashes, sequtils, strutils, tables], + std/[algorithm, hashes, sequtils, sets, strutils, tables], eth/[common/eth_types, p2p, trie/nibbles], stint, ../../range_desc @@ -20,15 +20,9 @@ type HexaryPpFn* = proc(key: RepairKey): string {.gcsafe.} ## For testing/debugging: key pretty printer function - ByteArray32* = array[32,byte] - ## Used for 32 byte database keys - ByteArray33* = array[33,byte] ## Used for 31 byte database keys, i.e. + <32-byte-key> - NodeKey* = distinct ByteArray32 - ## Hash key without the hash wrapper - RepairKey* = distinct ByteArray33 ## Byte prefixed `NodeKey` for internal DB records @@ -139,11 +133,22 @@ type nodeKey*: RepairKey ## Leaf hash into hexary repair table payload*: Blob ## Data payload - HexaryTreeDB* = object + TrieNodeStat* = object + ## Trie inspection report + stoppedAt*: int ## Potential loop dedected if positive + dangling*: seq[Blob] ## Paths from nodes with incomplete refs + + HexaryTreeDbRef* = ref object + ## Hexary trie plus helper structures tab*: Table[RepairKey,RNodeRef] ## key-value trie table, in-memory db repairKeyGen*: uint64 ## Unique tmp key generator keyPp*: HexaryPpFn ## For debugging, might go away + HexaryGetFn* = proc(key: Blob): Blob {.gcsafe.} + ## Persistent database get() function. For read-only cacses, this function + ## can be seen as the persistent alternative to `HexaryTreeDbRef`. + + const EmptyNodeBlob* = seq[byte].default EmptyNibbleRange* = EmptyNodeBlob.initNibbleRange @@ -161,18 +166,9 @@ var proc initImpl(key: var RepairKey; data: openArray[byte]): bool = key.reset - if data.len <= 33: - if 0 < data.len: - let trg = addr key.ByteArray33[33 - data.len] - trg.copyMem(unsafeAddr data[0], data.len) - return true - -proc initImpl(key: var NodeKey; data: openArray[byte]): bool = - key.reset - if data.len <= 32: - if 0 < data.len: - let trg = addr key.ByteArray32[32 - data.len] - trg.copyMem(unsafeAddr data[0], data.len) + if 0 < data.len and data.len <= 33: + let trg = addr key.ByteArray33[33 - data.len] + trg.copyMem(unsafeAddr data[0], data.len) return true # ------------------------------------------------------------------------------ @@ -196,7 +192,7 @@ proc ppImpl(s: string; hex = false): string = (if (s.len and 1) == 0: s[0 ..< 8] else: "0" & s[0 ..< 7]) & "..(" & $s.len & ").." & s[s.len-16 ..< s.len] -proc ppImpl(key: RepairKey; db: HexaryTreeDB): string = +proc ppImpl(key: RepairKey; db: HexaryTreeDbRef): string = try: if not disablePrettyKeys and not db.keyPp.isNil: return db.keyPp(key) @@ -204,13 +200,13 @@ proc ppImpl(key: RepairKey; db: HexaryTreeDB): string = discard key.ByteArray33.toSeq.mapIt(it.toHex(2)).join.toLowerAscii -proc ppImpl(key: NodeKey; db: HexaryTreeDB): string = +proc ppImpl(key: NodeKey; db: HexaryTreeDbRef): string = key.to(RepairKey).ppImpl(db) -proc ppImpl(w: openArray[RepairKey]; db: HexaryTreeDB): string = +proc ppImpl(w: openArray[RepairKey]; db: HexaryTreeDbRef): string = w.mapIt(it.ppImpl(db)).join(",") -proc ppImpl(w: openArray[Blob]; db: HexaryTreeDB): string = +proc ppImpl(w: openArray[Blob]; db: HexaryTreeDbRef): string = var q: seq[RepairKey] for a in w: var key: RepairKey @@ -222,7 +218,7 @@ proc ppStr(blob: Blob): string = if blob.len == 0: "" else: blob.mapIt(it.toHex(2)).join.toLowerAscii.ppImpl(hex = true) -proc ppImpl(n: RNodeRef; db: HexaryTreeDB): string = +proc ppImpl(n: RNodeRef; db: HexaryTreeDbRef): string = let so = n.state.ord case n.kind: of Leaf: @@ -232,7 +228,7 @@ proc ppImpl(n: RNodeRef; db: HexaryTreeDB): string = of Branch: ["b","þ","B","R"][so] & "(" & n.bLink.ppImpl(db) & "," & n.bData.ppStr & ")" -proc ppImpl(n: XNodeObj; db: HexaryTreeDB): string = +proc ppImpl(n: XNodeObj; db: HexaryTreeDbRef): string = case n.kind: of Leaf: "l(" & $n.lPfx & "," & n.lData.ppStr & ")" @@ -243,19 +239,19 @@ proc ppImpl(n: XNodeObj; db: HexaryTreeDB): string = of Branch: "b(" & n.bLink[0..15].ppImpl(db) & "," & n.bLink[16].ppStr & ")" -proc ppImpl(w: RPathStep; db: HexaryTreeDB): string = +proc ppImpl(w: RPathStep; db: HexaryTreeDbRef): string = let nibble = if 0 <= w.nibble: w.nibble.toHex(1).toLowerAscii else: "ø" key = w.key.ppImpl(db) "(" & key & "," & nibble & "," & w.node.ppImpl(db) & ")" -proc ppImpl(w: XPathStep; db: HexaryTreeDB): string = +proc ppImpl(w: XPathStep; db: HexaryTreeDbRef): string = let nibble = if 0 <= w.nibble: w.nibble.toHex(1).toLowerAscii else: "ø" var key: RepairKey discard key.initImpl(w.key) "(" & key.ppImpl(db) & "," & $nibble & "," & w.node.ppImpl(db) & ")" -proc ppImpl(db: HexaryTreeDB; root: NodeKey): seq[string] = +proc ppImpl(db: HexaryTreeDbRef; root: NodeKey): seq[string] = ## Dump the entries from the a generic repair tree. This function assumes ## that mapped keys are printed `$###` if a node is locked or static, and ## some substitute for the first letter `$` otherwise (if they are mutable.) @@ -280,6 +276,14 @@ proc ppImpl(db: HexaryTreeDB; root: NodeKey): seq[string] = except Exception as e: result &= " ! Ooops ppImpl(): name=" & $e.name & " msg=" & e.msg +proc ppDangling(a: seq[Blob]; maxItems = 30): string = + proc ppBlob(w: Blob): string = + w.mapIt(it.toHex(2)).join.toLowerAscii + let + q = a.toSeq.mapIt(it.ppBlob)[0 ..< min(maxItems,a.len)] + andMore = if maxItems < a.len: ", ..[#" & $a.len & "].." else: "" + "{" & q.join(",") & andMore & "}" + # ------------------------------------------------------------------------------ # Public debugging helpers # ------------------------------------------------------------------------------ @@ -299,40 +303,43 @@ proc pp*(key: NodeKey): string = ## Raw key, for referenced key dump use `key.pp(db)` below key.ByteArray32.toSeq.mapIt(it.toHex(2)).join.tolowerAscii -proc pp*(key: NodeKey|RepairKey; db: HexaryTreeDB): string = +proc pp*(key: NodeKey|RepairKey; db: HexaryTreeDbRef): string = key.ppImpl(db) -proc pp*(w: RNodeRef|XNodeObj|RPathStep; db: HexaryTreeDB): string = +proc pp*(w: RNodeRef|XNodeObj|RPathStep; db: HexaryTreeDbRef): string = w.ppImpl(db) -proc pp*(w:openArray[RPathStep|XPathStep]; db:HexaryTreeDB; indent=4): string = +proc pp*(w:openArray[RPathStep|XPathStep];db:HexaryTreeDbRef;indent=4): string = w.toSeq.mapIt(it.ppImpl(db)).join(indent.toPfx) -proc pp*(w: RPath; db: HexaryTreeDB; indent=4): string = +proc pp*(w: RPath; db: HexaryTreeDbRef; indent=4): string = w.path.pp(db,indent) & indent.toPfx & "(" & $w.tail & ")" -proc pp*(w: XPath; db: HexaryTreeDB; indent=4): string = +proc pp*(w: XPath; db: HexaryTreeDbRef; indent=4): string = w.path.pp(db,indent) & indent.toPfx & "(" & $w.tail & "," & $w.depth & ")" -proc pp*(db: HexaryTreeDB; root: NodeKey; indent=4): string = +proc pp*(db: HexaryTreeDbRef; root: NodeKey; indent=4): string = ## Dump the entries from the a generic repair tree. db.ppImpl(root).join(indent.toPfx) -proc pp*(db: HexaryTreeDB; indent=4): string = +proc pp*(db: HexaryTreeDbRef; indent=4): string = ## varinat of `pp()` above db.ppImpl(NodeKey.default).join(indent.toPfx) +proc pp*(a: TrieNodeStat; db: HexaryTreeDbRef; maxItems = 30): string = + "(" & + $a.stoppedAt & "," & + $a.dangling.len & "," & + a.dangling.ppDangling(maxItems) & ")" + # ------------------------------------------------------------------------------ # Public constructor (or similar) # ------------------------------------------------------------------------------ -proc init*(key: var NodeKey; data: openArray[byte]): bool = - key.initImpl(data) - proc init*(key: var RepairKey; data: openArray[byte]): bool = key.initImpl(data) -proc newRepairKey*(db: var HexaryTreeDB): RepairKey = +proc newRepairKey*(db: HexaryTreeDbRef): RepairKey = db.repairKeyGen.inc var src = db.repairKeyGen.toBytesBE (addr result.ByteArray33[25]).copyMem(addr src[0], 8) @@ -358,18 +365,12 @@ proc `==`*(a, b: RepairKey): bool = ## Tables mixin a.ByteArray33 == b.ByteArray33 -proc to*(tag: NodeTag; T: type NodeKey): T = - tag.UInt256.toBytesBE.T - proc to*(key: NodeKey; T: type NibblesSeq): T = key.ByteArray32.initNibbleRange proc to*(key: NodeKey; T: type RepairKey): T = (addr result.ByteArray33[1]).copyMem(unsafeAddr key.ByteArray32[0], 32) -proc to*(hash: Hash256; T: type NodeKey): T = - hash.data.NodeKey - proc isZero*[T: NodeTag|NodeKey|RepairKey](a: T): bool = a == T.default @@ -379,10 +380,14 @@ proc isNodeKey*(a: RepairKey): bool = proc digestTo*(data: Blob; T: type NodeKey): T = keccakHash(data).data.T -proc convertTo*[W: NodeKey|RepairKey](data: Blob; T: type W): T = +proc convertTo*(data: Blob; T: type NodeKey): T = ## Probably lossy conversion, use `init()` for safe conversion discard result.init(data) +proc convertTo*(data: Blob; T: type RepairKey): T = + ## Probably lossy conversion, use `init()` for safe conversion + discard result.initImpl(data) + proc convertTo*(node: RNodeRef; T: type Blob): T = ## Write the node as an RLP-encoded blob var writer = initRlpWriter() diff --git a/nimbus/sync/snap/worker/db/hexary_import.nim b/nimbus/sync/snap/worker/db/hexary_import.nim index bc1642b9f..3303ab902 100644 --- a/nimbus/sync/snap/worker/db/hexary_import.nim +++ b/nimbus/sync/snap/worker/db/hexary_import.nim @@ -12,6 +12,7 @@ import std/[sequtils, sets, strutils, tables], eth/[common/eth_types_rlp, trie/nibbles], stew/results, + ../../range_desc, "."/[hexary_defs, hexary_desc] {.push raises: [Defect].} @@ -28,7 +29,7 @@ proc pp(q: openArray[byte]): string = # ------------------------------------------------------------------------------ proc hexaryImport*( - db: var HexaryTreeDB; ## Contains node table + db: HexaryTreeDbRef; ## Contains node table recData: Blob; ## Node to add unrefNodes: var HashSet[RepairKey]; ## Keep track of freestanding nodes nodeRefs: var HashSet[RepairKey]; ## Ditto @@ -121,6 +122,88 @@ proc hexaryImport*( ok() + +proc hexaryImport*( + db: HexaryTreeDbRef; ## Contains node table + recData: Blob; ## Node to add + ): Result[void,HexaryDbError] + {.gcsafe, raises: [Defect, RlpError, KeyError].} = + ## Ditto without referece checks + let + nodeKey = recData.digestTo(NodeKey) + repairKey = nodeKey.to(RepairKey) # for repair table + var + rlp = recData.rlpFromBytes + blobs = newSeq[Blob](2) # temporary, cache + links: array[16,RepairKey] # reconstruct branch node + blob16: Blob # reconstruct branch node + top = 0 # count entries + rNode: RNodeRef # repair tree node + + # Collect lists of either 2 or 17 blob entries. + for w in rlp.items: + case top + of 0, 1: + if not w.isBlob: + return err(RlpBlobExpected) + blobs[top] = rlp.read(Blob) + of 2 .. 15: + var key: NodeKey + if not key.init(rlp.read(Blob)): + return err(RlpBranchLinkExpected) + # Update ref pool + links[top] = key.to(RepairKey) + of 16: + if not w.isBlob: + return err(RlpBlobExpected) + blob16 = rlp.read(Blob) + else: + return err(Rlp2Or17ListEntries) + top.inc + + # Verify extension data + case top + of 2: + if blobs[0].len == 0: + return err(RlpNonEmptyBlobExpected) + let (isLeaf, pathSegment) = hexPrefixDecode blobs[0] + if isLeaf: + rNode = RNodeRef( + kind: Leaf, + lPfx: pathSegment, + lData: blobs[1]) + else: + var key: NodeKey + if not key.init(blobs[1]): + return err(RlpExtPathEncoding) + # Update ref pool + rNode = RNodeRef( + kind: Extension, + ePfx: pathSegment, + eLink: key.to(RepairKey)) + of 17: + for n in [0,1]: + var key: NodeKey + if not key.init(blobs[n]): + return err(RlpBranchLinkExpected) + # Update ref pool + links[n] = key.to(RepairKey) + rNode = RNodeRef( + kind: Branch, + bLink: links, + bData: blob16) + else: + discard + + # Add to database + if not db.tab.hasKey(repairKey): + db.tab[repairKey] = rNode + + elif db.tab[repairKey].convertTo(Blob) != recData: + return err(DifferentNodeValueExists) + + ok() + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/db/hexary_inspect.nim b/nimbus/sync/snap/worker/db/hexary_inspect.nim new file mode 100644 index 000000000..a7c4cd822 --- /dev/null +++ b/nimbus/sync/snap/worker/db/hexary_inspect.nim @@ -0,0 +1,315 @@ +# nimbus-eth1 +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +import + std/[hashes, sequtils, sets, tables], + eth/[common/eth_types_rlp, trie/nibbles], + stew/results, + ../../range_desc, + "."/[hexary_desc, hexary_paths] + +{.push raises: [Defect].} + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +proc convertTo(key: RepairKey; T: type NodeKey): T = + ## Might be lossy, check before use + discard result.init(key.ByteArray33[1 .. 32]) + +proc convertTo(key: Blob; T: type NodeKey): T = + ## Might be lossy, check before use + discard result.init(key) + +proc doStepLink(step: RPathStep): Result[RepairKey,bool] = + ## Helper for `hexaryInspectPath()` variant + case step.node.kind: + of Branch: + if step.nibble < 0: + return err(false) # indicates caller should try parent + return ok(step.node.bLink[step.nibble]) + of Extension: + return ok(step.node.eLink) + of Leaf: + discard + err(true) # fully fail + +proc doStepLink(step: XPathStep): Result[NodeKey,bool] = + ## Helper for `hexaryInspectPath()` variant + case step.node.kind: + of Branch: + if step.nibble < 0: + return err(false) # indicates caller should try parent + return ok(step.node.bLink[step.nibble].convertTo(NodeKey)) + of Extension: + return ok(step.node.eLink.convertTo(NodeKey)) + of Leaf: + discard + err(true) # fully fail + + +proc hexaryInspectPath( + db: HexaryTreeDbRef; ## Database + rootKey: RepairKey; ## State root + path: NibblesSeq; ## Starting path + ): Result[RepairKey,void] + {.gcsafe, raises: [Defect,KeyError]} = + ## Translate `path` into `RepairKey` + let steps = path.hexaryPath(rootKey,db) + if 0 < steps.path.len and steps.tail.len == 0: + block: + let rc = steps.path[^1].doStepLink() + if rc.isOk: + return ok(rc.value) + if rc.error or steps.path.len == 1: + return err() + block: + let rc = steps.path[^2].doStepLink() + if rc.isOk: + return ok(rc.value) + err() + +proc hexaryInspectPath( + getFn: HexaryGetFn; ## Database retrival function + root: NodeKey; ## State root + path: NibblesSeq; ## Starting path + ): Result[NodeKey,void] + {.gcsafe, raises: [Defect,RlpError]} = + ## Translate `path` into `RepairKey` + let steps = path.hexaryPath(root,getFn) + if 0 < steps.path.len and steps.tail.len == 0: + block: + let rc = steps.path[^1].doStepLink() + if rc.isOk: + return ok(rc.value) + if rc.error or steps.path.len == 1: + return err() + block: + let rc = steps.path[^2].doStepLink() + if rc.isOk: + return ok(rc.value) + err() + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc processLink( + db: HexaryTreeDbRef; + stats: var TrieNodeStat; + inspect: TableRef[RepairKey,NibblesSeq]; + parent: NodeKey; + trail: NibblesSeq; + child: RepairKey; + ) {.gcsafe, raises: [Defect,KeyError]} = + ## Helper for `hexaryInspect()` + if not child.isZero: + if not child.isNodeKey: + # Oops -- caught in the middle of a repair process? Just register + # this node + stats.dangling.add trail.hexPrefixEncode(isLeaf = false) + + elif db.tab.hasKey(child): + inspect[child] = trail + + else: + stats.dangling.add trail.hexPrefixEncode(isLeaf = false) + +proc processLink( + getFn: HexaryGetFn; + stats: var TrieNodeStat; + inspect: TableRef[NodeKey,NibblesSeq]; + parent: NodeKey; + trail: NibblesSeq; + child: Rlp; + ) {.gcsafe, raises: [Defect,RlpError,KeyError]} = + ## Ditto + if not child.isEmpty: + let + #parentKey = parent.convertTo(NodeKey) + childBlob = child.toBytes + + if childBlob.len != 32: + # Oops -- that is wrong, although the only sensible action is to + # register the node and otherwise ignore it + stats.dangling.add trail.hexPrefixEncode(isLeaf = false) + + else: + let childKey = childBlob.convertTo(NodeKey) + if 0 < child.toBytes.getFn().len: + inspect[childKey] = trail + + else: + stats.dangling.add trail.hexPrefixEncode(isLeaf = false) + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc hexaryInspectPath*( + db: HexaryTreeDbRef; ## Database + root: NodeKey; ## State root + path: Blob; ## Starting path + ): Result[NodeKey,void] + {.gcsafe, raises: [Defect,KeyError]} = + ## Returns the `NodeKey` for a given path if there is any. + let (isLeaf,nibbles) = hexPrefixDecode path + if not isLeaf: + let rc = db.hexaryInspectPath(root.to(RepairKey), nibbles) + if rc.isOk and rc.value.isNodeKey: + return ok(rc.value.convertTo(NodeKey)) + err() + +proc hexaryInspectToKeys*( + db: HexaryTreeDbRef; ## Database + root: NodeKey; ## State root + paths: seq[Blob]; ## Paths segments + ): HashSet[NodeKey] + {.gcsafe, raises: [Defect,KeyError]} = + ## Convert a set of path segments to a node key set + paths.toSeq + .mapIt(db.hexaryInspectPath(root,it)) + .filterIt(it.isOk) + .mapIt(it.value) + .toHashSet + + +proc hexaryInspectTrie*( + db: HexaryTreeDbRef; ## Database + root: NodeKey; ## State root + paths: seq[Blob]; ## Starting paths for search + stopAtLevel = 32; ## Instead of loop detector + ): TrieNodeStat + {.gcsafe, raises: [Defect,KeyError]} = + ## Starting with the argument list `paths`, find all the non-leaf nodes in + ## the hexary trie which have at least one node key reference missing in + ## the trie database. + let rootKey = root.to(RepairKey) + if not db.tab.hasKey(rootKey): + return TrieNodeStat() + + var + reVisit = newTable[RepairKey,NibblesSeq]() + rcValue: TrieNodeStat + level = 0 + + # Initialise TODO list + if paths.len == 0: + reVisit[rootKey] = EmptyNibbleRange + else: + for w in paths: + let (isLeaf,nibbles) = hexPrefixDecode w + if not isLeaf: + let rc = db.hexaryInspectPath(rootKey, nibbles) + if rc.isOk: + reVisit[rc.value] = nibbles + + while 0 < reVisit.len: + if stopAtLevel < level: + rcValue.stoppedAt = level + break + + let again = newTable[RepairKey,NibblesSeq]() + + for rKey,parentTrail in reVisit.pairs: + let + node = db.tab[rKey] + parent = rKey.convertTo(NodeKey) + + case node.kind: + of Extension: + let + trail = parentTrail & node.ePfx + child = node.eLink + db.processLink(stats=rcValue, inspect=again, parent, trail, child) + of Branch: + for n in 0 ..< 16: + let + trail = parentTrail & @[n.byte].initNibbleRange.slice(1) + child = node.bLink[n] + db.processLink(stats=rcValue, inspect=again, parent, trail, child) + of Leaf: + # Done with this link, forget the key + discard + # End `for` + + level.inc + reVisit = again + # End while + + return rcValue + + +proc hexaryInspectTrie*( + getFn: HexaryGetFn; + root: NodeKey; ## State root + paths: seq[Blob]; ## Starting paths for search + stopAtLevel = 32; ## Instead of loop detector + ): TrieNodeStat + {.gcsafe, raises: [Defect,RlpError,KeyError]} = + ## Varianl of `hexaryInspectTrie()` for persistent database. + ## + if root.to(Blob).getFn().len == 0: + return TrieNodeStat() + + var + reVisit = newTable[NodeKey,NibblesSeq]() + rcValue: TrieNodeStat + level = 0 + + # Initialise TODO list + if paths.len == 0: + reVisit[root] = EmptyNibbleRange + else: + for w in paths: + let (isLeaf,nibbles) = hexPrefixDecode w + if not isLeaf: + let rc = getFn.hexaryInspectPath(root, nibbles) + if rc.isOk: + reVisit[rc.value] = nibbles + + while 0 < reVisit.len: + if stopAtLevel < level: + rcValue.stoppedAt = level + break + + let again = newTable[NodeKey,NibblesSeq]() + + for parent,parentTrail in reVisit.pairs: + let nodeRlp = rlpFromBytes parent.to(Blob).getFn() + case nodeRlp.listLen: + of 2: + let (isLeaf,ePfx) = hexPrefixDecode nodeRlp.listElem(0).toBytes + if not isleaf: + let + trail = parentTrail & ePfx + child = nodeRlp.listElem(1) + getFn.processLink(stats=rcValue, inspect=again, parent, trail, child) + of 17: + for n in 0 ..< 16: + let + trail = parentTrail & @[n.byte].initNibbleRange.slice(1) + child = nodeRlp.listElem(n) + getFn.processLink(stats=rcValue, inspect=again, parent, trail, child) + else: + # Done with this link, forget the key + discard + # End `for` + + level.inc + reVisit = again + # End while + + return rcValue + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/db/hexary_interpolate.nim b/nimbus/sync/snap/worker/db/hexary_interpolate.nim index 05445f41c..2f0614179 100644 --- a/nimbus/sync/snap/worker/db/hexary_interpolate.nim +++ b/nimbus/sync/snap/worker/db/hexary_interpolate.nim @@ -15,7 +15,7 @@ ## re-factored database layer. import - std/[sequtils, strutils, tables], + std/[sequtils, sets, strutils, tables], eth/[common/eth_types, trie/nibbles], stew/results, ../../range_desc, @@ -34,14 +34,17 @@ type # Private debugging helpers # ------------------------------------------------------------------------------ -proc pp(w: RPathXStep; db: var HexaryTreeDB): string = +proc pp(w: RPathXStep; db: HexaryTreeDbRef): string = let y = if w.canLock: "lockOk" else: "noLock" "(" & $w.pos & "," & y & "," & w.step.pp(db) & ")" -proc pp(w: seq[RPathXStep]; db: var HexaryTreeDB; indent = 4): string = +proc pp(w: seq[RPathXStep]; db: HexaryTreeDbRef; indent = 4): string = let pfx = "\n" & " ".repeat(indent) w.mapIt(it.pp(db)).join(pfx) +proc pp(rc: Result[TrieNodeStat, HexaryDbError]; db: HexaryTreeDbRef): string = + if rc.isErr: $rc.error else: rc.value.pp(db) + # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ @@ -53,7 +56,7 @@ proc dup(node: RNodeRef): RNodeRef = proc hexaryPath( tag: NodeTag; root: NodeKey; - db: HexaryTreeDB; + db: HexaryTreeDbRef; ): RPath {.gcsafe, raises: [Defect,KeyError].} = ## Shortcut @@ -104,7 +107,7 @@ proc `xData=`(node: RNodeRef; val: Blob) = # ------------------------------------------------------------------------------ proc rTreeExtendLeaf( - db: var HexaryTreeDB; + db: HexaryTreeDbRef; rPath: RPath; key: RepairKey ): RPath = @@ -124,7 +127,7 @@ proc rTreeExtendLeaf( tail: EmptyNibbleRange) proc rTreeExtendLeaf( - db: var HexaryTreeDB; + db: HexaryTreeDbRef; rPath: RPath; key: RepairKey; node: RNodeRef; @@ -140,7 +143,7 @@ proc rTreeExtendLeaf( proc rTreeSplitNode( - db: var HexaryTreeDB; + db: HexaryTreeDbRef; rPath: RPath; key: RepairKey; node: RNodeRef; @@ -207,7 +210,7 @@ proc rTreeSplitNode( proc rTreeInterpolate( rPath: RPath; - db: var HexaryTreeDB; + db: HexaryTreeDbRef; ): RPath {.gcsafe, raises: [Defect,KeyError]} = ## Extend path, add missing nodes to tree. The last node added will be @@ -279,7 +282,7 @@ proc rTreeInterpolate( proc rTreeInterpolate( rPath: RPath; - db: var HexaryTreeDB; + db: HexaryTreeDbRef; payload: Blob; ): RPath {.gcsafe, raises: [Defect,KeyError]} = @@ -293,7 +296,7 @@ proc rTreeInterpolate( proc rTreeUpdateKeys( rPath: RPath; - db: var HexaryTreeDB; + db: HexaryTreeDbRef; ): Result[void,bool] {.gcsafe, raises: [Defect,KeyError]} = ## The argument `rPath` is assumed to organise database nodes as @@ -431,7 +434,7 @@ proc rTreeUpdateKeys( # ------------------------------------------------------------------------------ proc rTreeBranchAppendleaf( - db: var HexaryTreeDB; + db: HexaryTreeDbRef; bNode: RNodeRef; leaf: RLeafSpecs; ): bool = @@ -448,7 +451,7 @@ proc rTreeBranchAppendleaf( return true proc rTreePrefill( - db: var HexaryTreeDB; + db: HexaryTreeDbRef; rootKey: NodeKey; dbItems: var seq[RLeafSpecs]; ) {.gcsafe, raises: [Defect,KeyError].} = @@ -469,7 +472,7 @@ proc rTreePrefill( db.tab[rootKey.to(RepairKey)] = node proc rTreeSquashRootNode( - db: var HexaryTreeDB; + db: HexaryTreeDbRef; rootKey: NodeKey; ): RNodeRef {.gcsafe, raises: [Defect,KeyError].} = @@ -517,16 +520,22 @@ proc rTreeSquashRootNode( # ------------------------------------------------------------------------------ proc hexaryInterpolate*( - db: var HexaryTreeDB; ## Database - rootKey: NodeKey; ## root node hash - dbItems: var seq[RLeafSpecs]; ## list of path and leaf items - bootstrap = false; ## can create root node on-the-fly + db: HexaryTreeDbRef; ## Database + rootKey: NodeKey; ## Root node hash + dbItems: var seq[RLeafSpecs]; ## List of path and leaf items + bootstrap = false; ## Can create root node on-the-fly ): Result[void,HexaryDbError] {.gcsafe, raises: [Defect,KeyError]} = - ## Verifiy `dbItems` by interpolating the collected `dbItems` on the hexary - ## trie of the repair database. If successful, there will be a complete - ## hexary trie avaliable with the `payload` fields of the `dbItems` argument - ## as leaf node values. + ## From the argument list `dbItems`, leaf nodes will be added to the hexary + ## trie while interpolating the path for the leaf nodes by adding missing + ## nodes. This action is typically not a full trie rebuild. Some partial node + ## entries might have been added, already which is typical for a boundary + ## proof that comes with the `snap/1` protocol. + ## + ## If successful, there will be a complete hexary trie avaliable with the + ## `payload` fields of the `dbItems` argument list as leaf node values. The + ## argument list `dbItems` will have been updated by registering the node + ## keys of the leaf items. ## ## The algorithm employed here tries to minimise hashing hexary nodes for ## the price of re-vising the same node again. diff --git a/nimbus/sync/snap/worker/db/hexary_paths.nim b/nimbus/sync/snap/worker/db/hexary_paths.nim index 12402bf43..280aca059 100644 --- a/nimbus/sync/snap/worker/db/hexary_paths.nim +++ b/nimbus/sync/snap/worker/db/hexary_paths.nim @@ -11,33 +11,24 @@ ## Find node paths in hexary tries. import - std/[sequtils, tables], + std/[tables], eth/[common/eth_types_rlp, trie/nibbles], + ../../range_desc, ./hexary_desc {.push raises: [Defect].} -const - HexaryXPathDebugging = false # or true - -type - HexaryGetFn* = proc(key: Blob): Blob {.gcsafe.} - ## Fortesting/debugging: database get() function - # ------------------------------------------------------------------------------ # Private debugging helpers # ------------------------------------------------------------------------------ -proc pp(w: Blob; db: HexaryTreeDB): string = +proc pp(w: Blob; db: HexaryTreeDbRef): string = w.convertTo(RepairKey).pp(db) # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ -proc to(w: NodeKey; T: type Blob): T = - w.ByteArray32.toSeq - proc getNibblesImpl(path: XPath; start = 0): NibblesSeq = ## Re-build the key path for n in start ..< path.path.len: @@ -90,7 +81,7 @@ when false: proc pathExtend( path: RPath; key: RepairKey; - db: HexaryTreeDB; + db: HexaryTreeDbRef; ): RPath {.gcsafe, raises: [Defect,KeyError].} = ## For the given path, extend to the longest possible repair tree `db` @@ -389,7 +380,7 @@ proc leafData*(path: XPath): Blob = proc hexaryPath*( nodeKey: NodeKey; rootKey: RepairKey; - db: HexaryTreeDB; + db: HexaryTreeDbRef; ): RPath {.gcsafe, raises: [Defect,KeyError]} = ## Compute logest possible repair tree `db` path matching the `nodeKey` @@ -397,6 +388,15 @@ proc hexaryPath*( ## functional notation. RPath(tail: nodeKey.to(NibblesSeq)).pathExtend(rootKey,db) +proc hexaryPath*( + partialPath: NibblesSeq; + rootKey: RepairKey; + db: HexaryTreeDbRef; + ): RPath + {.gcsafe, raises: [Defect,KeyError]} = + ## Variant of `hexaryPath`. + RPath(tail: partialPath).pathExtend(rootKey,db) + proc hexaryPath*( nodeKey: NodeKey; root: NodeKey; @@ -413,6 +413,15 @@ proc hexaryPath*( ## in the invoking function due to the `getFn` argument. XPath(tail: nodeKey.to(NibblesSeq)).pathExtend(root.to(Blob), getFn) +proc hexaryPath*( + partialPath: NibblesSeq; + root: NodeKey; + getFn: HexaryGetFn; + ): XPath + {.gcsafe, raises: [Defect,RlpError]} = + ## Variant of `hexaryPath`. + XPath(tail: partialPath).pathExtend(root.to(Blob), getFn) + proc next*( path: XPath; getFn: HexaryGetFn; diff --git a/nimbus/sync/snap/worker/fetch_accounts.nim b/nimbus/sync/snap/worker/fetch_accounts.nim index 991b06ea0..15ade74c7 100644 --- a/nimbus/sync/snap/worker/fetch_accounts.nim +++ b/nimbus/sync/snap/worker/fetch_accounts.nim @@ -1,5 +1,4 @@ -# Nimbus - Fetch account and storage states from peers efficiently -# +# Nimbus # Copyright (c) 2021 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or @@ -10,6 +9,7 @@ # except according to those terms. import + std/sequtils, chronicles, chronos, eth/[common/eth_types, p2p], @@ -17,13 +17,15 @@ import stint, ../../sync_desc, ".."/[range_desc, worker_desc], - "."/[accounts_db, get_account_range, get_storage_ranges] + ./com/[get_account_range, get_error, get_storage_ranges, get_trie_nodes], + ./accounts_db when snapAccountsDumpEnable: import ../../../tests/replay/[undump_accounts, undump_storages] {.push raises: [Defect].} + logScope: topics = "snap-fetch" @@ -145,68 +147,140 @@ proc delUnprocessed(buddy: SnapBuddyRef; iv: LeafRange) = ## Shortcut discard buddy.ctx.data.pivotEnv.availAccounts.reduce(iv) -# ----- -proc waitAfterError(buddy: SnapBuddyRef; error: GetAccountRangeError): bool = - ## Error handling after `GetAccountRange` failed. +proc stopAfterError( + buddy: SnapBuddyRef; + error: ComError; + ): Future[bool] + {.async.} = + ## Error handling after data protocol failed. case error: - of GareResponseTimeout: + of ComResponseTimeout: if maxTimeoutErrors <= buddy.data.errors.nTimeouts: # Mark this peer dead, i.e. avoid fetching from this peer for a while buddy.ctrl.zombie = true else: - # Otherwise try again some time later + # Otherwise try again some time later. Nevertheless, stop the + # current action. buddy.data.errors.nTimeouts.inc - result = true + await sleepAsync(5.seconds) + return true - of GareNetworkProblem, - GareMissingProof, - GareAccountsMinTooSmall, - GareAccountsMaxTooLarge: + of ComNetworkProblem, + ComMissingProof, + ComAccountsMinTooSmall, + ComAccountsMaxTooLarge: # Mark this peer dead, i.e. avoid fetching from this peer for a while buddy.data.stats.major.networkErrors.inc() buddy.ctrl.zombie = true + return true - of GareNothingSerious: + of ComEmptyAccountsArguments, + ComEmptyRequestArguments, + ComInspectDbFailed, + ComImportAccountsFailed, + ComNoDataForProof, + ComNothingSerious: discard - of GareNoAccountsForStateRoot: + of ComNoAccountsForStateRoot, + ComNoStorageForAccounts, + ComNoByteCodesAvailable, + ComNoTrieNodesAvailable, + ComTooManyByteCodes, + ComTooManyStorageSlots, + ComTooManyTrieNodes: # Mark this peer dead, i.e. avoid fetching from this peer for a while buddy.ctrl.zombie = true + return true +# ------------------------------------------------------------------------------ +# Private functions: accounts +# ------------------------------------------------------------------------------ -proc waitAfterError(buddy: SnapBuddyRef; error: GetStorageRangesError): bool = - ## Error handling after `GetStorageRanges` failed. - case error: - of GsreResponseTimeout: - if maxTimeoutErrors <= buddy.data.errors.nTimeouts: - # Mark this peer dead, i.e. avoid fetching from this peer for a while +proc processAccounts( + buddy: SnapBuddyRef; + iv: LeafRange; ## Accounts range to process + ): Future[Result[void,ComError]] + {.async.} = + ## Process accounts and storage by bulk download on the current envirinment + # Reset error counts for detecting repeated timeouts + buddy.data.errors.nTimeouts = 0 + + # Process accounts + let + ctx = buddy.ctx + peer = buddy.peer + env = ctx.data.pivotEnv + stateRoot = env.stateHeader.stateRoot + + # Fetch data for this range delegated to `fetchAccounts()` + let dd = block: + let rc = await buddy.getAccountRange(stateRoot, iv) + if rc.isErr: + buddy.putUnprocessed(iv) # fail => interval back to pool + return err(rc.error) + rc.value + + let + nAccounts = dd.data.accounts.len + nStorage = dd.withStorage.len + + block: + let rc = ctx.data.accountsDb.importAccounts( + peer, stateRoot, iv.minPt, dd.data) + if rc.isErr: + # Bad data, just try another peer + buddy.putUnprocessed(iv) buddy.ctrl.zombie = true - else: - # Otherwise try again some time later - buddy.data.errors.nTimeouts.inc - result = true + trace "Import failed, restoring unprocessed accounts", peer, stateRoot, + range=dd.consumed, nAccounts, nStorage, error=rc.error - of GsreNetworkProblem, - GsreTooManyStorageSlots: - # Mark this peer dead, i.e. avoid fetching from this peer for a while - buddy.data.stats.major.networkErrors.inc() - buddy.ctrl.zombie = true + buddy.dumpBegin(iv, dd, rc.error) # FIXME: Debugging (will go away) + buddy.dumpEnd() # FIXME: Debugging (will go away) + return err(ComImportAccountsFailed) - of GsreNothingSerious, - GsreEmptyAccountsArguments: - discard + buddy.dumpBegin(iv, dd) # FIXME: Debugging (will go away) - of GsreNoStorageForAccounts: - # Mark this peer dead, i.e. avoid fetching from this peer for a while - buddy.ctrl.zombie = true + # Statistics + env.nAccounts.inc(nAccounts) + env.nStorage.inc(nStorage) -# ----- + # Register consumed intervals on the accumulator over all state roots + discard buddy.ctx.data.coveredAccounts.merge(dd.consumed) -proc processStorageSlots( + # Register consumed and bulk-imported (well, not yet) accounts range + block registerConsumed: + block: + # Both intervals `min(iv)` and `min(dd.consumed)` are equal + let rc = iv - dd.consumed + if rc.isOk: + # Now, `dd.consumed` < `iv`, return some unused range + buddy.putUnprocessed(rc.value) + break registerConsumed + block: + # The processed interval might be a bit larger + let rc = dd.consumed - iv + if rc.isOk: + # Remove from unprocessed data. If it is not unprocessed, anymore + # then it was doubly processed which is ok. + buddy.delUnprocessed(rc.value) + break registerConsumed + # End registerConsumed + + # Store accounts on the storage TODO list. + discard env.leftOver.append SnapSlotQueueItemRef(q: dd.withStorage) + + return ok() + +# ------------------------------------------------------------------------------ +# Private functions: accounts storage +# ------------------------------------------------------------------------------ + +proc fetchAndImportStorageSlots( buddy: SnapBuddyRef; reqSpecs: seq[AccountSlotsHeader]; - ): Future[Result[SnapSlotQueueItemRef,GetStorageRangesError]] + ): Future[Result[seq[SnapSlotQueueItemRef],ComError]] {.async.} = ## Fetch storage slots data from the network, store it on disk and ## return yet unprocessed data. @@ -217,169 +291,243 @@ proc processStorageSlots( stateRoot = env.stateHeader.stateRoot # Get storage slots - let storage = block: + var stoRange = block: let rc = await buddy.getStorageRanges(stateRoot, reqSpecs) if rc.isErr: return err(rc.error) rc.value - # ----------------------------- - buddy.dumpStorage(storage.data) - # ----------------------------- + if 0 < stoRange.data.storages.len: + # ------------------------------ + buddy.dumpStorage(stoRange.data) + # ------------------------------ - # Verify/process data and save to disk - block: - let rc = ctx.data.accountsDb.importStorages( - peer, storage.data, storeOk = true) + # Verify/process data and save to disk + block: + let rc = ctx.data.accountsDb.importStorages(peer, stoRange.data) - if rc.isErr: - # Push back parts of the error item - for w in rc.error: - if 0 <= w[0]: - # Reset any partial requests by not copying the `firstSlot` field. So - # all the storage slots are re-fetched completely for this account. - storage.leftOver.q.add AccountSlotsHeader( - accHash: storage.data.storages[w[0]].account.accHash, - storageRoot: storage.data.storages[w[0]].account.storageRoot) + if rc.isErr: + # Push back parts of the error item + for w in rc.error: + if 0 <= w[0]: + # Reset any partial requests by not copying the `firstSlot` field. + # So all the storage slots are re-fetched completely for this + # account. + stoRange.addLeftOver AccountSlotsHeader( + accHash: stoRange.data.storages[w[0]].account.accHash, + storageRoot: stoRange.data.storages[w[0]].account.storageRoot) - if rc.error[^1][0] < 0: - discard - # TODO: disk storage failed or something else happend, so what? + if rc.error[^1][0] < 0: + discard + # TODO: disk storage failed or something else happend, so what? # Return the remaining part to be processed later - return ok(storage.leftOver) + return ok(stoRange.leftOver) -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ -proc fetchAccounts*(buddy: SnapBuddyRef): Future[bool] {.async.} = - ## Fetch accounts data and store them in the database. The function returns - ## `true` if there are no more unprocessed accounts. +proc processStorageSlots( + buddy: SnapBuddyRef; + ): Future[Result[void,ComError]] + {.async.} = + ## Fetch storage data and save it on disk. Storage requests are managed by + ## a request queue for handling partioal replies and re-fetch issues. For + ## all practical puroses, this request queue should mostly be empty. let ctx = buddy.ctx peer = buddy.peer env = ctx.data.pivotEnv stateRoot = env.stateHeader.stateRoot - # Get a range of accounts to fetch from - let iv = block: - let rc = buddy.getUnprocessed() - if rc.isErr: - trace "No more unprocessed accounts", peer, stateRoot - return true - rc.value - - # Fetch data for this range delegated to `fetchAccounts()` - let dd = block: - let rc = await buddy.getAccountRange(stateRoot, iv) - if rc.isErr: - buddy.putUnprocessed(iv) # fail => interval back to pool - if buddy.waitAfterError(rc.error): - await sleepAsync(5.seconds) - return false - rc.value - - # Reset error counts for detecting repeated timeouts - buddy.data.errors.nTimeouts = 0 - - # Process accounts - let - nAccounts = dd.data.accounts.len - nStorage = dd.withStorage.len - - block processAccountsAndStorage: - block: - let rc = ctx.data.accountsDb.importAccounts( - peer, stateRoot, iv.minPt, dd.data, storeOk = true) + while true: + # Pull out the next request item from the queue + let req = block: + let rc = env.leftOver.shift if rc.isErr: - # Bad data, just try another peer - buddy.putUnprocessed(iv) - buddy.ctrl.zombie = true - trace "Import failed, restoring unprocessed accounts", peer, stateRoot, - range=dd.consumed, nAccounts, nStorage, error=rc.error + return ok() + rc.value - # ------------------------------- - buddy.dumpBegin(iv, dd, rc.error) - buddy.dumpEnd() - # ------------------------------- + block: + # Fetch and store account storage slots. On some sort of success, + # the `rc` return value contains a list of left-over items to be + # re-processed. + let rc = await buddy.fetchAndImportStorageSlots(req.q) - break processAccountsAndStorage + if rc.isErr: + # Save accounts/storage list to be processed later, then stop + discard env.leftOver.append req + return err(rc.error) - # --------------------- - buddy.dumpBegin(iv, dd) - # --------------------- + for qLo in rc.value: + # Handle queue left-overs for processing in the next cycle + if qLo.q[0].firstSlot == Hash256.default and 0 < env.leftOver.len: + # Appending to last queue item is preferred over adding new item + let item = env.leftOver.first.value + item.q = item.q & qLo.q + else: + # Put back as-is. + discard env.leftOver.append qLo + # End while - # Statistics - env.nAccounts.inc(nAccounts) - env.nStorage.inc(nStorage) + return ok() - # Register consumed intervals on the accumulator over all state roots - discard buddy.ctx.data.coveredAccounts.merge(dd.consumed) +# ------------------------------------------------------------------------------ +# Private functions: healing +# ------------------------------------------------------------------------------ - # Register consumed and bulk-imported (well, not yet) accounts range - block registerConsumed: - block: - # Both intervals `min(iv)` and `min(dd.consumed)` are equal - let rc = iv - dd.consumed - if rc.isOk: - # Now, `dd.consumed` < `iv`, return some unused range - buddy.putUnprocessed(rc.value) - break registerConsumed - block: - # The processed interval might be a bit larger - let rc = dd.consumed - iv - if rc.isOk: - # Remove from unprocessed data. If it is not unprocessed, anymore - # then it was doubly processed which is ok. - buddy.delUnprocessed(rc.value) - break registerConsumed - # End registerConsumed +proc accountsTrieHealing( + buddy: SnapBuddyRef; + env: SnapPivotRef; + envSource: string; + ): Future[Result[void,ComError]] + {.async.} = + ## ... + # Starting with a given set of potentially dangling nodes, this set is + # updated. + let + ctx = buddy.ctx + peer = buddy.peer + stateRoot = env.stateHeader.stateRoot - # Fetch storage data and save it on disk. Storage requests are managed by - # a request queue for handling partioal replies and re-fetch issues. For - # all practical puroses, this request queue should mostly be empty. - block processStorage: - discard env.leftOver.append SnapSlotQueueItemRef(q: dd.withStorage) + while env.repairState != Done and + (env.dangling.len != 0 or env.repairState == Pristine): - while true: - # Pull out the next request item from the queue - let req = block: - let rc = env.leftOver.shift - if rc.isErr: - break processStorage - rc.value + trace "Accounts healing loop", peer, repairState=env.repairState, + envSource, nDangling=env.dangling.len - block: - # Fetch and store account storage slots. On some sort of success, - # the `rc` return value contains a list of left-over items to be - # re-processed. - let rc = await buddy.processStorageSlots(req.q) + let needNodes = block: + let rc = ctx.data.accountsDb.inspectAccountsTrie( + peer, stateRoot, env.dangling) + if rc.isErr: + let error = rc.error + trace "Accounts healing failed", peer, repairState=env.repairState, + envSource, nDangling=env.dangling.len, error + return err(ComInspectDbFailed) + rc.value.dangling - if rc.isErr: - # Save accounts/storage list to be processed later, then stop - discard env.leftOver.append req - if buddy.waitAfterError(rc.error): - await sleepAsync(5.seconds) - break processAccountsAndStorage + # Clear dangling nodes register so that other processes would not fetch + # the same list simultaneously. + env.dangling.setLen(0) - elif 0 < rc.value.q.len: - # Handle queue left-overs for processing in the next cycle - if rc.value.q[0].firstSlot == Hash256.default and - 0 < env.leftOver.len: - # Appending to last queue item is preferred over adding new item - let item = env.leftOver.first.value - item.q = item.q & rc.value.q - else: - # Put back as-is. - discard env.leftOver.append rc.value - # End while + # Noting to anymore + if needNodes.len == 0: + if env.repairState != Pristine: + env.repairState = Done + trace "Done accounts healing for now", peer, repairState=env.repairState, + envSource, nDangling=env.dangling.len + return ok() - # ------------- - buddy.dumpEnd() - # ------------- + let lastState = env.repairState + env.repairState = KeepGoing - # End processAccountsAndStorage + trace "Need nodes for healing", peer, repairState=env.repairState, + envSource, nDangling=env.dangling.len, nNodes=needNodes.len + + # Fetch nodes + let dd = block: + let rc = await buddy.getTrieNodes(stateRoot, needNodes.mapIt(@[it])) + if rc.isErr: + env.dangling = needNodes + env.repairState = lastState + return err(rc.error) + rc.value + + # Store to disk and register left overs for the next pass + block: + let rc = ctx.data.accountsDb.importRawNodes(peer, dd.nodes) + if rc.isOk: + env.dangling = dd.leftOver.mapIt(it[0]) + elif 0 < rc.error.len and rc.error[^1][0] < 0: + # negative index => storage error + env.dangling = needNodes + else: + let nodeKeys = rc.error.mapIt(dd.nodes[it[0]]) + env.dangling = dd.leftOver.mapIt(it[0]) & nodeKeys + # End while + + return ok() + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc fetchAccounts*(buddy: SnapBuddyRef) {.async.} = + ## Fetch accounts and data and store them in the database. + ## + ## TODO: Healing for storages. Currently, healing in only run for accounts. + let + ctx = buddy.ctx + peer = buddy.peer + env = ctx.data.pivotEnv + stateRoot = env.stateHeader.stateRoot + var + # Complete the previous environment by trie database healing (if any) + healingEnvs = if not ctx.data.prevEnv.isNil: @[ctx.data.prevEnv] else: @[] + + block processAccountsFrame: + # Get a range of accounts to fetch from + let iv = block: + let rc = buddy.getUnprocessed() + if rc.isErr: + # Although there are no accounts left to process, the other peer might + # still work on some accounts. As a general rule, not all from an + # account range gets served so the remaining range will magically + # reappear on the unprocessed ranges database. + trace "No more unprocessed accounts (maybe)", peer, stateRoot + + # Complete healing for sporadic nodes missing. + healingEnvs.add env + break processAccountsFrame + rc.value + + trace "Start fetching accounts", peer, stateRoot, iv, + repairState=env.repairState + + # Process received accounts and stash storage slots to fetch later + block: + let rc = await buddy.processAccounts(iv) + if rc.isErr: + let error = rc.error + if await buddy.stopAfterError(error): + buddy.dumpEnd() # FIXME: Debugging (will go away) + trace "Stop fetching cycle", peer, repairState=env.repairState, + processing="accounts", error + return + break processAccountsFrame + + # End `block processAccountsFrame` + + trace "Start fetching storages", peer, nAccounts=env.leftOver.len, + repairState=env.repairState + + # Process storage slots from environment batch + block: + let rc = await buddy.processStorageSlots() + if rc.isErr: + let error = rc.error + if await buddy.stopAfterError(error): + buddy.dumpEnd() # FIXME: Debugging (will go away) + trace "Stop fetching cycle", peer, repairState=env.repairState, + processing="storage", error + return + + # Check whether there is some environment that can be completed by + # Patricia Merkle Tree healing. + for w in healingEnvs: + let envSource = if env == ctx.data.pivotEnv: "pivot" else: "retro" + trace "Start accounts healing", peer, repairState=env.repairState, + envSource, dangling=w.dangling.len + + let rc = await buddy.accountsTrieHealing(w, envSource) + if rc.isErr: + let error = rc.error + if await buddy.stopAfterError(error): + buddy.dumpEnd() # FIXME: Debugging (will go away) + trace "Stop fetching cycle", peer, repairState=env.repairState, + processing="healing", dangling=w.dangling.len, error + return + + buddy.dumpEnd() # FIXME: Debugging (will go away) + trace "Done fetching cycle", peer, repairState=env.repairState # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/pivot2.nim b/nimbus/sync/snap/worker/pivot2.nim new file mode 100644 index 000000000..5c3a2e4b7 --- /dev/null +++ b/nimbus/sync/snap/worker/pivot2.nim @@ -0,0 +1,345 @@ +# Nimbus +# Copyright (c) 2018-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at +# https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at +# https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +## Borrowed from `full/worker.nim` + +import + std/[hashes, options, sets], + chronicles, + chronos, + eth/[common/eth_types, p2p], + stew/byteutils, + "../.."/[protocol, sync_desc], + ../worker_desc + +{.push raises:[Defect].} + +logScope: + topics = "snap-pivot" + +const + extraTraceMessages = false # or true + ## Additional trace commands + + minPeersToStartSync = 2 + ## Wait for consensus of at least this number of peers before syncing. + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +proc hash(peer: Peer): Hash = + ## Mixin `HashSet[Peer]` handler + hash(cast[pointer](peer)) + +proc pivotNumber(buddy: SnapBuddyRef): BlockNumber = + # data.pivot2Header + if buddy.ctx.data.pivotEnv.isNil: + 0.u256 + else: + buddy.ctx.data.pivotEnv.stateHeader.blockNumber + +template safeTransport( + buddy: SnapBuddyRef; + info: static[string]; + code: untyped) = + try: + code + except TransportError as e: + error info & ", stop", peer=buddy.peer, error=($e.name), msg=e.msg + buddy.ctrl.stopped = true + + +proc rand(r: ref HmacDrbgContext; maxVal: uint64): uint64 = + # github.com/nim-lang/Nim/tree/version-1-6/lib/pure/random.nim#L216 + const + randMax = high(uint64) + if 0 < maxVal: + if maxVal == randMax: + var x: uint64 + r[].generate(x) + return x + while true: + var x: uint64 + r[].generate(x) + # avoid `mod` bias, so `x <= n*maxVal <= randMax` for some integer `n` + if x <= randMax - (randMax mod maxVal): + # uint -> int + return x mod (maxVal + 1) + +proc rand(r: ref HmacDrbgContext; maxVal: int): int = + if 0 < maxVal: # somehow making sense of `maxVal = -1` + return cast[int](r.rand(maxVal.uint64)) + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc getRandomTrustedPeer(buddy: SnapBuddyRef): Result[Peer,void] = + ## Return random entry from `trusted` peer different from this peer set if + ## there are enough + ## + ## Ackn: nim-eth/eth/p2p/blockchain_sync.nim: `randomTrustedPeer()` + let + ctx = buddy.ctx + nPeers = ctx.data.trusted.len + offInx = if buddy.peer in ctx.data.trusted: 2 else: 1 + if 0 < nPeers: + var (walkInx, stopInx) = (0, ctx.data.rng.rand(nPeers - offInx)) + for p in ctx.data.trusted: + if p == buddy.peer: + continue + if walkInx == stopInx: + return ok(p) + walkInx.inc + err() + +proc getBestHeader( + buddy: SnapBuddyRef + ): Future[Result[BlockHeader,void]] {.async.} = + ## Get best block number from best block hash. + ## + ## Ackn: nim-eth/eth/p2p/blockchain_sync.nim: `getBestBlockNumber()` + let + peer = buddy.peer + startHash = peer.state(eth).bestBlockHash + reqLen = 1u + hdrReq = BlocksRequest( + startBlock: HashOrNum( + isHash: true, + hash: startHash), + maxResults: reqLen, + skip: 0, + reverse: true) + + trace trEthSendSendingGetBlockHeaders, peer, + startBlock=startHash.data.toHex, reqLen + + var hdrResp: Option[blockHeadersObj] + buddy.safeTransport("Error fetching block header"): + hdrResp = await peer.getBlockHeaders(hdrReq) + if buddy.ctrl.stopped: + return err() + + if hdrResp.isNone: + trace trEthRecvReceivedBlockHeaders, peer, reqLen, respose="n/a" + return err() + + let hdrRespLen = hdrResp.get.headers.len + if hdrRespLen == 1: + let + header = hdrResp.get.headers[0] + blockNumber = header.blockNumber + trace trEthRecvReceivedBlockHeaders, peer, hdrRespLen, blockNumber + return ok(header) + + trace trEthRecvReceivedBlockHeaders, peer, reqLen, hdrRespLen + return err() + +proc agreesOnChain( + buddy: SnapBuddyRef; + other: Peer + ): Future[Result[void,bool]] {.async.} = + ## Returns `true` if one of the peers `buddy.peer` or `other` acknowledges + ## existence of the best block of the other peer. The values returned mean + ## * ok() -- `peer` is trusted + ## * err(true) -- `peer` is untrusted + ## * err(false) -- `other` is dead + ## + ## Ackn: nim-eth/eth/p2p/blockchain_sync.nim: `peersAgreeOnChain()` + let + peer = buddy.peer + var + start = peer + fetch = other + swapped = false + # Make sure that `fetch` has not the smaller difficulty. + if fetch.state(eth).bestDifficulty < start.state(eth).bestDifficulty: + swap(fetch, start) + swapped = true + + let + startHash = start.state(eth).bestBlockHash + hdrReq = BlocksRequest( + startBlock: HashOrNum( + isHash: true, + hash: startHash), + maxResults: 1, + skip: 0, + reverse: true) + + trace trEthSendSendingGetBlockHeaders, peer, start, fetch, + startBlock=startHash.data.toHex, hdrReqLen=1, swapped + + var hdrResp: Option[blockHeadersObj] + buddy.safeTransport("Error fetching block header"): + hdrResp = await fetch.getBlockHeaders(hdrReq) + if buddy.ctrl.stopped: + if swapped: + return err(true) + # No need to terminate `peer` if it was the `other`, failing nevertheless + buddy.ctrl.stopped = false + return err(false) + + if hdrResp.isSome: + let hdrRespLen = hdrResp.get.headers.len + if 0 < hdrRespLen: + let blockNumber = hdrResp.get.headers[0].blockNumber + trace trEthRecvReceivedBlockHeaders, peer, start, fetch, + hdrRespLen, blockNumber + return ok() + + trace trEthRecvReceivedBlockHeaders, peer, start, fetch, + blockNumber="n/a", swapped + return err(true) + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc pivot2Start*(buddy: SnapBuddyRef) = + discard + +proc pivot2Stop*(buddy: SnapBuddyRef) = + ## Clean up this peer + buddy.ctx.data.untrusted.add buddy.peer + +proc pivot2Restart*(buddy: SnapBuddyRef) = + buddy.data.pivot2Header = none(BlockHeader) + buddy.ctx.data.untrusted.add buddy.peer + + +proc pivot2Exec*(buddy: SnapBuddyRef): Future[bool] {.async.} = + ## Initalise worker. This function must be run in single mode at the + ## beginning of running worker peer. + ## + ## Ackn: nim-eth/eth/p2p/blockchain_sync.nim: `startSyncWithPeer()` + ## + let + ctx = buddy.ctx + peer = buddy.peer + + # Delayed clean up batch list + if 0 < ctx.data.untrusted.len: + when extraTraceMessages: + trace "Removing untrusted peers", peer, trusted=ctx.data.trusted.len, + untrusted=ctx.data.untrusted.len, runState=buddy.ctrl.state + ctx.data.trusted = ctx.data.trusted - ctx.data.untrusted.toHashSet + ctx.data.untrusted.setLen(0) + + if buddy.data.pivot2Header.isNone: + when extraTraceMessages: + # Only log for the first time (if any) + trace "Pivot initialisation", peer, + trusted=ctx.data.trusted.len, runState=buddy.ctrl.state + + let rc = await buddy.getBestHeader() + # Beware of peer terminating the session right after communicating + if rc.isErr or buddy.ctrl.stopped: + return false + let + bestNumber = rc.value.blockNumber + minNumber = buddy.pivotNumber + if bestNumber < minNumber: + buddy.ctrl.zombie = true + trace "Useless peer, best number too low", peer, + trusted=ctx.data.trusted.len, runState=buddy.ctrl.state, + minNumber, bestNumber + buddy.data.pivot2Header = some(rc.value) + + if minPeersToStartSync <= ctx.data.trusted.len: + # We have enough trusted peers. Validate new peer against trusted + let rc = buddy.getRandomTrustedPeer() + if rc.isOK: + let rx = await buddy.agreesOnChain(rc.value) + if rx.isOk: + ctx.data.trusted.incl peer + return true + if not rx.error: + # Other peer is dead + ctx.data.trusted.excl rc.value + + # If there are no trusted peers yet, assume this very peer is trusted, + # but do not finish initialisation until there are more peers. + elif ctx.data.trusted.len == 0: + ctx.data.trusted.incl peer + when extraTraceMessages: + trace "Assume initial trusted peer", peer, + trusted=ctx.data.trusted.len, runState=buddy.ctrl.state + + elif ctx.data.trusted.len == 1 and buddy.peer in ctx.data.trusted: + # Ignore degenerate case, note that `trusted.len < minPeersToStartSync` + discard + + else: + # At this point we have some "trusted" candidates, but they are not + # "trusted" enough. We evaluate `peer` against all other candidates. If + # one of the candidates disagrees, we swap it for `peer`. If all candidates + # agree, we add `peer` to trusted set. The peers in the set will become + # "fully trusted" (and sync will start) when the set is big enough + var + agreeScore = 0 + otherPeer: Peer + deadPeers: HashSet[Peer] + when extraTraceMessages: + trace "Trust scoring peer", peer, + trusted=ctx.data.trusted.len, runState=buddy.ctrl.state + for p in ctx.data.trusted: + if peer == p: + inc agreeScore + else: + let rc = await buddy.agreesOnChain(p) + if rc.isOk: + inc agreeScore + elif buddy.ctrl.stopped: + # Beware of terminated session + return false + elif rc.error: + otherPeer = p + else: + # `Other` peer is dead + deadPeers.incl p + + # Normalise + if 0 < deadPeers.len: + ctx.data.trusted = ctx.data.trusted - deadPeers + if ctx.data.trusted.len == 0 or + ctx.data.trusted.len == 1 and buddy.peer in ctx.data.trusted: + return false + + # Check for the number of peers that disagree + case ctx.data.trusted.len - agreeScore: + of 0: + ctx.data.trusted.incl peer # best possible outcome + when extraTraceMessages: + trace "Agreeable trust score for peer", peer, + trusted=ctx.data.trusted.len, runState=buddy.ctrl.state + of 1: + ctx.data.trusted.excl otherPeer + ctx.data.trusted.incl peer + when extraTraceMessages: + trace "Other peer no longer trusted", peer, + otherPeer, trusted=ctx.data.trusted.len, runState=buddy.ctrl.state + else: + when extraTraceMessages: + trace "Peer not trusted", peer, + trusted=ctx.data.trusted.len, runState=buddy.ctrl.state + discard + + # Evaluate status, finally + if minPeersToStartSync <= ctx.data.trusted.len: + when extraTraceMessages: + trace "Peer trusted now", peer, + trusted=ctx.data.trusted.len, runState=buddy.ctrl.state + return true + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker_desc.nim b/nimbus/sync/snap/worker_desc.nim index 5e058ff06..f75a833e3 100644 --- a/nimbus/sync/snap/worker_desc.nim +++ b/nimbus/sync/snap/worker_desc.nim @@ -102,6 +102,11 @@ type ## only the first element of a `seq[AccountSlotsHeader]` queue can have an ## effective sub-range specification (later ones will be ignored.) + SnapRepairState* = enum + Pristine ## Not initialised yet + KeepGoing ## Unfinished repair process + Done ## Stop repairing + SnapPivotRef* = ref object ## Per-state root cache for particular snap data environment stateHeader*: BlockHeader ## Pivot state, containg state root @@ -110,6 +115,8 @@ type nAccounts*: uint64 ## Number of accounts imported nStorage*: uint64 ## Number of storage spaces imported leftOver*: SnapSlotsQueue ## Re-fetch storage for these accounts + dangling*: seq[Blob] ## Missing nodes for healing process + repairState*: SnapRepairState ## State of healing process when switchPivotAfterCoverage < 1.0: minCoverageReachedOk*: bool ## Stop filling this pivot @@ -122,6 +129,7 @@ type stats*: SnapBuddyStats ## Statistics counters errors*: SnapBuddyErrors ## For error handling pivotHeader*: Option[BlockHeader] ## For pivot state hunter + pivot2Header*: Option[BlockHeader] ## Alternative header workerPivot*: WorkerPivotBase ## Opaque object reference for sub-module BuddyPoolHookFn* = proc(buddy: BuddyRef[CtxData,BuddyData]) {.gcsafe.} @@ -138,10 +146,14 @@ type pivotTable*: SnapPivotTable ## Per state root environment pivotCount*: uint64 ## Total of all created tab entries pivotEnv*: SnapPivotRef ## Environment containing state root + prevEnv*: SnapPivotRef ## Previous state root environment accountRangeMax*: UInt256 ## Maximal length, high(u256)/#peers accountsDb*: AccountsDbRef ## Proof processing for accounts runPoolHook*: BuddyPoolHookFn ## Callback for `runPool()` # -------- + untrusted*: seq[Peer] ## Clean up list (pivot2) + trusted*: HashSet[Peer] ## Peers ready for delivery (pivot2) + # -------- when snapAccountsDumpEnable: proofDumpOk*: bool proofDumpFile*: File diff --git a/nimbus/sync/sync_sched.nim b/nimbus/sync/sync_sched.nim index b2c0e568d..4b4ac477f 100644 --- a/nimbus/sync/sync_sched.nim +++ b/nimbus/sync/sync_sched.nim @@ -129,14 +129,20 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} = # Grab `monitorLock` (was `false` as checked above) and wait until clear # to run as the only activated instance. dsc.monitorLock = true - while 0 < dsc.activeMulti: - await sleepAsync(50.milliseconds) - while dsc.singleRunLock: - await sleepAsync(50.milliseconds) - var count = dsc.buddies.len - for w in dsc.buddies.nextValues: - count.dec - worker.runPool(count == 0) + block poolModeExec: + while 0 < dsc.activeMulti: + await sleepAsync(50.milliseconds) + if worker.ctrl.stopped: + break poolModeExec + while dsc.singleRunLock: + await sleepAsync(50.milliseconds) + if worker.ctrl.stopped: + break poolModeExec + var count = dsc.buddies.len + for w in dsc.buddies.nextValues: + count.dec + worker.runPool(count == 0) + # End `block poolModeExec` dsc.monitorLock = false continue @@ -146,6 +152,8 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} = # Allow task switch await sleepAsync(50.milliseconds) + if worker.ctrl.stopped: + break # Multi mode if worker.ctrl.multiOk: @@ -161,10 +169,14 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} = if not dsc.singleRunLock: # Lock single instance mode and wait for other workers to finish dsc.singleRunLock = true - while 0 < dsc.activeMulti: - await sleepAsync(50.milliseconds) - # Run single instance and release afterwards - await worker.runSingle() + block singleModeExec: + while 0 < dsc.activeMulti: + await sleepAsync(50.milliseconds) + if worker.ctrl.stopped: + break singleModeExec + # Run single instance and release afterwards + await worker.runSingle() + # End `block singleModeExec` dsc.singleRunLock = false # End while @@ -181,7 +193,7 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = peers = dsc.pool.len workers = dsc.buddies.len if dsc.buddies.hasKey(peer.hash): - trace "Reconnecting zombie peer rejected", peer, peers, workers, maxWorkers + trace "Reconnecting zombie peer ignored", peer, peers, workers, maxWorkers return # Initialise worker for this peer diff --git a/nimbus/sync/types.nim b/nimbus/sync/types.nim index 62a9457a1..2d2c9263a 100644 --- a/nimbus/sync/types.nim +++ b/nimbus/sync/types.nim @@ -9,34 +9,24 @@ # distributed except according to those terms. import - std/[math, strutils, hashes], + std/[math, hashes], eth/common/eth_types_rlp, stew/byteutils {.push raises: [Defect].} type - NodeHash* = distinct Hash256 - ## Hash of a trie node or other blob carried over `NodeData` account trie - ## nodes, storage trie nodes, contract code. - ## - ## Note that the `ethXX` and `snapXX` protocol drivers always use the - ## underlying `Hash256` type which needs to be converted to `NodeHash`. - BlockHash* = distinct Hash256 ## Hash of a block, goes with `BlockNumber`. ## ## Note that the `ethXX` protocol driver always uses the ## underlying `Hash256` type which needs to be converted to `BlockHash`. - SomeDistinctHash256 = - NodeHash | BlockHash - # ------------------------------------------------------------------------------ # Public constructors # ------------------------------------------------------------------------------ -proc new*(T: type SomeDistinctHash256): T = +proc new*(T: type BlockHash): T = Hash256().T # ------------------------------------------------------------------------------ @@ -57,11 +47,11 @@ proc to*(longNum: UInt256; T: type float): T = let exp = mantissaLen - 64 (longNum shr exp).truncate(uint64).T * (2.0 ^ exp) -proc to*(w: SomeDistinctHash256; T: type Hash256): T = +proc to*(w: BlockHash; T: type Hash256): T = ## Syntactic sugar w.Hash256 -proc to*(w: seq[SomeDistinctHash256]; T: type seq[Hash256]): T = +proc to*(w: seq[BlockHash]; T: type seq[Hash256]): T = ## Ditto cast[seq[Hash256]](w) @@ -73,22 +63,22 @@ proc to*(bh: BlockHash; T: type HashOrNum): T = # Public functions # ------------------------------------------------------------------------------ -proc read*(rlp: var Rlp, T: type SomeDistinctHash256): T +proc read*(rlp: var Rlp, T: type BlockHash): T {.gcsafe, raises: [Defect,RlpError]} = ## RLP mixin reader rlp.read(Hash256).T -proc append*(writer: var RlpWriter; h: SomeDistinctHash256) = +proc append*(writer: var RlpWriter; h: BlockHash) = ## RLP mixin append(writer, h.Hash256) -proc `==`*(a: SomeDistinctHash256; b: Hash256): bool = +proc `==`*(a: BlockHash; b: Hash256): bool = a.Hash256 == b -proc `==`*[T: SomeDistinctHash256](a,b: T): bool = +proc `==`*[T: BlockHash](a,b: T): bool = a.Hash256 == b.Hash256 -proc hash*(root: SomeDistinctHash256): Hash = +proc hash*(root: BlockHash): Hash = ## Mixin for `Table` or `KeyedQueue` root.Hash256.data.hash @@ -100,7 +90,7 @@ func toHex*(hash: Hash256): string = ## Shortcut for `byteutils.toHex(hash.data)` hash.data.toHex -func `$`*(h: SomeDistinctHash256): string = +func `$`*(h: BlockHash): string = $h.Hash256.data.toHex func `$`*(blob: Blob): string = diff --git a/nimbus/utils/prettify.nim b/nimbus/utils/prettify.nim index 5640ef8e1..2b5a774e2 100644 --- a/nimbus/utils/prettify.nim +++ b/nimbus/utils/prettify.nim @@ -57,6 +57,11 @@ proc toPC*( minDigits = digitsAfterDot + 1 multiplier = (10 ^ (minDigits + 1)).float roundUp = rounding / 10.0 - result = ((num * multiplier) + roundUp).int.intToStr(minDigits) & "%" + let + sign = if num < 0: "-" else: "" + preTruncated = (num.abs * multiplier) + roundUp + if int.high.float <= preTruncated: + return "NaN" + result = sign & preTruncated.int.intToStr(minDigits) & "%" when 0 < digitsAfterDot: result.insert(".", result.len - minDigits) diff --git a/tests/replay/undump_accounts.nim b/tests/replay/undump_accounts.nim index 59e476a8f..f5ef1c3a1 100644 --- a/tests/replay/undump_accounts.nim +++ b/tests/replay/undump_accounts.nim @@ -13,7 +13,7 @@ import eth/common, nimcrypto/utils, stew/byteutils, - ../../nimbus/sync/snap/[range_desc, worker/db/hexary_desc], + ../../nimbus/sync/snap/range_desc, ./gunzip type diff --git a/tests/replay/undump_storages.nim b/tests/replay/undump_storages.nim index af01c72f0..079495283 100644 --- a/tests/replay/undump_storages.nim +++ b/tests/replay/undump_storages.nim @@ -13,7 +13,7 @@ import eth/common, nimcrypto/utils, stew/byteutils, - ../../nimbus/sync/snap/[range_desc, worker/db/hexary_desc], + ../../nimbus/sync/snap/range_desc, ../../nimbus/sync/protocol, ./gunzip diff --git a/tests/test_sync_snap.nim b/tests/test_sync_snap.nim index ab619db74..fa3ec7578 100644 --- a/tests/test_sync_snap.nim +++ b/tests/test_sync_snap.nim @@ -25,8 +25,8 @@ import ../nimbus/p2p/chain, ../nimbus/sync/types, ../nimbus/sync/snap/range_desc, - ../nimbus/sync/snap/worker/accounts_db, - ../nimbus/sync/snap/worker/db/[hexary_desc, rocky_bulk_load], + ../nimbus/sync/snap/worker/[accounts_db, db/hexary_desc, + db/hexary_inspect, db/rocky_bulk_load], ../nimbus/utils/prettify, ./replay/[pp, undump_blocks, undump_accounts, undump_storages], ./test_sync_snap/[bulk_test_xx, snap_test_xx, test_types] @@ -115,6 +115,12 @@ proc pp(rc: Result[Account,HexaryDbError]): string = proc pp(rc: Result[Hash256,HexaryDbError]): string = if rc.isErr: $rc.error else: $rc.value.to(NodeTag) +proc pp( + rc: Result[TrieNodeStat,HexaryDbError]; + db: AccountsDbSessionRef + ): string = + if rc.isErr: $rc.error else: rc.value.pp(db.getAcc) + proc ppKvPc(w: openArray[(string,int)]): string = w.mapIt(&"{it[0]}={it[1]}%").join(", ") @@ -282,11 +288,12 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) = accKeys: seq[Hash256] test &"Snap-proofing {accountsList.len} items for state root ..{root.pp}": - let dbBase = if persistent: AccountsDbRef.init(db.cdb[0]) - else: AccountsDbRef.init(newMemoryDB()) + let + dbBase = if persistent: AccountsDbRef.init(db.cdb[0]) + else: AccountsDbRef.init(newMemoryDB()) + dbDesc = AccountsDbSessionRef.init(dbBase, root, peer) for n,w in accountsList: - check dbBase.importAccounts( - peer, root, w.base, w.data, storeOk = persistent) == OkHexDb + check dbDesc.importAccounts(w.base, w.data, persistent) == OkHexDb test &"Merging {accountsList.len} proofs for state root ..{root.pp}": let dbBase = if persistent: AccountsDbRef.init(db.cdb[1]) @@ -417,11 +424,13 @@ proc storagesRunner( test &"Merging {accountsList.len} accounts for state root ..{root.pp}": for w in accountsList: - check dbBase.importAccounts( - peer, root, w.base, w.data, storeOk = persistent) == OkHexDb + let desc = AccountsDbSessionRef.init(dbBase, root, peer) + check desc.importAccounts(w.base, w.data, persistent) == OkHexDb test &"Merging {storagesList.len} storages lists": - let ignore = knownFailures.toTable + let + dbDesc = AccountsDbSessionRef.init(dbBase, root, peer) + ignore = knownFailures.toTable for n,w in storagesList: let testId = fileInfo & "#" & $n @@ -429,12 +438,197 @@ proc storagesRunner( Result[void,seq[(int,HexaryDbError)]].err(ignore[testId]) else: OkStoDb + check dbDesc.importStorages(w.data, persistent) == expRc - #if expRc.isErr: setTraceLevel() - #else: setErrorLevel() - #echo ">>> testId=", testId, " expect-error=", expRc.isErr - check dbBase.importStorages(peer, w.data, storeOk = persistent) == expRc +proc inspectionRunner( + noisy = true; + persistent = true; + cascaded = true; + sample: openArray[AccountsSample] = snapTestList) = + let + peer = Peer.new + inspectList = sample.mapIt(it.to(seq[UndumpAccounts])) + tmpDir = getTmpDir() + db = if persistent: tmpDir.testDbs(sample[0].name) else: testDbs() + dbDir = db.dbDir.split($DirSep).lastTwo.join($DirSep) + info = if db.persistent: &"persistent db on \"{dbDir}\"" + else: "in-memory db" + fileInfo = "[" & sample[0].file.splitPath.tail.replace(".txt.gz","") & "..]" + + defer: + if db.persistent: + for n in 0 ..< nTestDbInstances: + if db.cdb[n].rocksStoreRef.isNil: + break + db.cdb[n].rocksStoreRef.store.db.rocksdb_close + tmpDir.flushDbDir(sample[0].name) + + suite &"SyncSnap: inspect {fileInfo} lists for {info} for healing": + let + memBase = AccountsDbRef.init(newMemoryDB()) + memDesc = AccountsDbSessionRef.init(memBase, Hash256(), peer) + var + singleStats: seq[(int,TrieNodeStat)] + accuStats: seq[(int,TrieNodeStat)] + perBase,altBase: AccountsDbRef + perDesc,altDesc: AccountsDbSessionRef + if persistent: + perBase = AccountsDbRef.init(db.cdb[0]) + perDesc = AccountsDbSessionRef.init(perBase, Hash256(), peer) + altBase = AccountsDbRef.init(db.cdb[1]) + altDesc = AccountsDbSessionRef.init(altBase, Hash256(), peer) + + test &"Fingerprinting {inspectList.len} single accounts lists " & + "for in-memory-db": + for n,accList in inspectList: + # Separate storage + let + root = accList[0].root + rootKey = root.to(NodeKey) + desc = AccountsDbSessionRef.init(memBase, root, peer) + for w in accList: + check desc.importAccounts(w.base, w.data, persistent=false) == OkHexDb + let rc = desc.inspectAccountsTrie(persistent=false) + check rc.isOk + let + dangling = rc.value.dangling + keys = desc.getAcc.hexaryInspectToKeys( + rootKey, dangling.toHashSet.toSeq) + check dangling.len == keys.len + singleStats.add (desc.getAcc.tab.len,rc.value) + + test &"Fingerprinting {inspectList.len} single accounts lists " & + "for persistent db": + if not persistent: + skip() + else: + for n,accList in inspectList: + if nTestDbInstances <= 2+n or db.cdb[2+n].rocksStoreRef.isNil: + continue + # Separate storage on persistent DB (leaving first db slot empty) + let + root = accList[0].root + rootKey = root.to(NodeKey) + dbBase = AccountsDbRef.init(db.cdb[2+n]) + desc = AccountsDbSessionRef.init(dbBase, root, peer) + for w in accList: + check desc.importAccounts(w.base, w.data, persistent) == OkHexDb + let rc = desc.inspectAccountsTrie(persistent=false) + check rc.isOk + let + dangling = rc.value.dangling + keys = desc.getAcc.hexaryInspectToKeys( + rootKey, dangling.toHashSet.toSeq) + check dangling.len == keys.len + # Must be the same as the in-memory fingerprint + check singleStats[n][1] == rc.value + + test &"Fingerprinting {inspectList.len} accumulated accounts lists " & + "for in-memory-db": + for n,accList in inspectList: + # Accumulated storage + let + root = accList[0].root + rootKey = root.to(NodeKey) + desc = memDesc.dup(root,Peer()) + for w in accList: + check desc.importAccounts(w.base, w.data, persistent=false) == OkHexDb + let rc = desc.inspectAccountsTrie(persistent=false) + check rc.isOk + let + dangling = rc.value.dangling + keys = desc.getAcc.hexaryInspectToKeys( + rootKey, dangling.toHashSet.toSeq) + check dangling.len == keys.len + accuStats.add (desc.getAcc.tab.len,rc.value) + + test &"Fingerprinting {inspectList.len} accumulated accounts lists " & + "for persistent db": + if not persistent: + skip() + else: + for n,accList in inspectList: + # Accumulated storage on persistent DB (using first db slot) + let + root = accList[0].root + rootKey = root.to(NodeKey) + rootSet = [rootKey].toHashSet + desc = perDesc.dup(root,Peer()) + for w in accList: + check desc.importAccounts(w.base, w.data, persistent) == OkHexDb + let rc = desc.inspectAccountsTrie(persistent=false) + check rc.isOk + let + dangling = rc.value.dangling + keys = desc.getAcc.hexaryInspectToKeys( + rootKey, dangling.toHashSet.toSeq) + check dangling.len == keys.len + check accuStats[n][1] == rc.value + + test &"Cascaded fingerprinting {inspectList.len} accumulated accounts " & + "lists for in-memory-db": + if not cascaded: + skip() + else: + let + cscBase = AccountsDbRef.init(newMemoryDB()) + cscDesc = AccountsDbSessionRef.init(cscBase, Hash256(), peer) + var + cscStep: Table[NodeKey,(int,seq[Blob])] + for n,accList in inspectList: + # Accumulated storage + let + root = accList[0].root + rootKey = root.to(NodeKey) + desc = cscDesc.dup(root,Peer()) + for w in accList: + check desc.importAccounts(w.base,w.data,persistent=false) == OkHexDb + if cscStep.hasKeyOrPut(rootKey,(1,seq[Blob].default)): + cscStep[rootKey][0].inc + let + r0 = desc.inspectAccountsTrie(persistent=false) + rc = desc.inspectAccountsTrie(cscStep[rootKey][1],false) + check rc.isOk + let + accumulated = r0.value.dangling.toHashSet + cascaded = rc.value.dangling.toHashSet + check accumulated == cascaded + # Make sure that there are no trivial cases + let trivialCases = toSeq(cscStep.values).filterIt(it[0] <= 1).len + check trivialCases == 0 + + test &"Cascaded fingerprinting {inspectList.len} accumulated accounts " & + "for persistent db": + if not cascaded or not persistent: + skip() + else: + let + cscBase = altBase + cscDesc = altDesc + var + cscStep: Table[NodeKey,(int,seq[Blob])] + for n,accList in inspectList: + # Accumulated storage + let + root = accList[0].root + rootKey = root.to(NodeKey) + desc = cscDesc.dup(root,Peer()) + for w in accList: + check desc.importAccounts(w.base,w.data,persistent) == OkHexDb + if cscStep.hasKeyOrPut(rootKey,(1,seq[Blob].default)): + cscStep[rootKey][0].inc + let + r0 = desc.inspectAccountsTrie(persistent=true) + rc = desc.inspectAccountsTrie(cscStep[rootKey][1],true) + check rc.isOk + let + accumulated = r0.value.dangling.toHashSet + cascaded = rc.value.dangling.toHashSet + check accumulated == cascaded + # Make sure that there are no trivial cases + let trivialCases = toSeq(cscStep.values).filterIt(it[0] <= 1).len + check trivialCases == 0 # ------------------------------------------------------------------------------ # Test Runners: database timing tests @@ -547,6 +741,10 @@ proc storeRunner(noisy = true; persistent = true; cleanUp = true) = defer: if xDbs.persistent and cleanUp: + for n in 0 ..< nTestDbInstances: + if xDbs.cdb[n].rocksStoreRef.isNil: + break + xDbs.cdb[n].rocksStoreRef.store.db.rocksdb_close xTmpDir.flushDbDir("store-runner") xDbs.reset @@ -877,12 +1075,10 @@ proc storeRunner(noisy = true; persistent = true; cleanUp = true) = # ------------------------------------------------------------------------------ proc syncSnapMain*(noisy = defined(debug)) = - # Caveat: running `accountsRunner(persistent=true)` twice will crash as the - # persistent database might not be fully cleared due to some stale - # locks. noisy.accountsRunner(persistent=true) - noisy.accountsRunner(persistent=false) + #noisy.accountsRunner(persistent=false) # problems unless running stand-alone noisy.importRunner() # small sample, just verify functionality + noisy.inspectionRunner() noisy.storeRunner() when isMainModule: @@ -941,15 +1137,17 @@ when isMainModule: # # This one uses dumps from the external `nimbus-eth1-blob` repo - when true and false: + when true: # and false: import ./test_sync_snap/snap_other_xx noisy.showElapsed("accountsRunner()"): for n,sam in snapOtherList: - if n in {999} or true: - false.accountsRunner(persistent=true, sam) + false.accountsRunner(persistent=true, sam) + noisy.showElapsed("inspectRunner()"): + for n,sam in snapOtherHealingList: + false.inspectionRunner(persistent=true, cascaded=false, sam) # This one usues dumps from the external `nimbus-eth1-blob` repo - when true and false: + when true: # and false: import ./test_sync_snap/snap_storage_xx let knownFailures = @[ ("storages3__18__25_dump#11", @[( 233, RightBoundaryProofFailed)]), @@ -960,15 +1158,14 @@ when isMainModule: ] noisy.showElapsed("storageRunner()"): for n,sam in snapStorageList: - if n in {999} or true: - false.storagesRunner(persistent=true, sam, knownFailures) - #if true: quit() + false.storagesRunner(persistent=true, sam, knownFailures) # This one uses readily available dumps when true: # and false: - for n,sam in snapTestList: + false.inspectionRunner() + for sam in snapTestList: false.accountsRunner(persistent=true, sam) - for n,sam in snapTestStorageList: + for sam in snapTestStorageList: false.accountsRunner(persistent=true, sam) false.storagesRunner(persistent=true, sam) diff --git a/tests/test_sync_snap/snap_other_xx.nim b/tests/test_sync_snap/snap_other_xx.nim index 5fee342a5..87d6edc3e 100644 --- a/tests/test_sync_snap/snap_other_xx.nim +++ b/tests/test_sync_snap/snap_other_xx.nim @@ -72,4 +72,62 @@ const snapOther0a, snapOther0b, snapOther1a, snapOther1b, snapOther2, snapOther3, snapOther4, snapOther5, snapOther6] + # + # + # + # + # 0b 7..8 346637e390dce1941c8f8c7bf21adb33cefc198c26bc1964ebf8507471e89000 + # 0000000000000000000000000000000000000000000000000000000000000000 + # 09e8d852bc952f53343967d775b55a7a626ce6f02c828f4b0d4509b790aee55b + # + # 1b 10..17 979c81bf60286f195c9b69d0bf3c6e4b3939389702ed767d55230fe5db57b8f7 + # 0000000000000000000000000000000000000000000000000000000000000000 + # 44fc2f4f885e7110bcba5534e9dce2bc59261e1b6ceac2206f5d356575d58d6a + # + # 2 18..25 93353de9894e0eac48bfe0b0023488379aff8ffd4b6e96e0c2c51f395363c7fb + # 024043dc9f47e85f13267584b6098d37e1f8884672423e5f2b86fe4cc606c9d7 + # 473c70d158603819829a2d637edd5fa8e8f05720d9895e5e87450b6b19d81239 + # + # 4 34..41 d6feef8f3472c5288a5a99409bc0cddbb697637644266a9c8b2e134806ca0fc8 + # 2452fe42091c1f12adfe4ea768e47fe8d7b2494a552122470c89cb4c759fe614 + # 6958f4d824c2b679ad673cc3f373bb6c431e8941d027ed4a1c699925ccc31ea5 + # + # 3 26..33 14d70751ba7fd40303a054c284bca4ef2f63a8e4e1973da90371dffc666bde32 + # 387bb75a840d46baa37a6d723d3b1de78f6a0a41d6094c47ee1dad16533b829e + # 7d77e87f695f4244ff8cd4cbfc750003080578f9f51eac3ab3e50df1a7c088c4 + # + # 6 50..54 11eba9ec2f204c8165a245f9d05bb7ebb5bfdbdbcccc1a849d8ab2b23550cc12 + # 74e30f84b7d6532cf3aeec8931fe6f7ef13d5bad90ebaae451d1f78c4ee41412 + # 9c5f3f14c3b3a6eb4d2201b3bf15cf15554d44ba49d8230a7c8a1709660ca2ef + # + # 5 42..49 f75477bd57be4883875042577bf6caab1bd7f8517f0ce3532d813e043ec9f5d0 + # a04344c35a42386857589e92428b49b96cd0319a315b81bff5c7ae93151b5057 + # e549721af6484420635f0336d90d2d0226ba9bbd599310ae76916b725980bd85 + # + # 1a 9 979c81bf60286f195c9b69d0bf3c6e4b3939389702ed767d55230fe5db57b8f7 + # fa261d159a47f908d499271fcf976b71244b260ca189f709b8b592d18c098b60 + # fa361ef07b5b6cc719347b8d9db35e08986a575b0eca8701caf778f01a08640a + # + # 0a 0..6 346637e390dce1941c8f8c7bf21adb33cefc198c26bc1964ebf8507471e89000 + # bf75c492276113636daa8cdd8b27ca5283e26965fbdc2568633480b6b104cd77 + # fa99c0467106abe1ed33bd2b6acc1582b09e43d28308d04663d1ef9532e57c6e + # + # ------------------------ + + #0 0..6 346637e390dce1941c8f8c7bf21adb33cefc198c26bc1964ebf8507471e89000 + #0 7..8 346637e390dce1941c8f8c7bf21adb33cefc198c26bc1964ebf8507471e89000 + #1 9 979c81bf60286f195c9b69d0bf3c6e4b3939389702ed767d55230fe5db57b8f7 + #1 10..17 979c81bf60286f195c9b69d0bf3c6e4b3939389702ed767d55230fe5db57b8f7 + #2 18..25 93353de9894e0eac48bfe0b0023488379aff8ffd4b6e96e0c2c51f395363c7fb + #3 26..33 14d70751ba7fd40303a054c284bca4ef2f63a8e4e1973da90371dffc666bde32 + #4 34..41 d6feef8f3472c5288a5a99409bc0cddbb697637644266a9c8b2e134806ca0fc8 + #5 42..49 f75477bd57be4883875042577bf6caab1bd7f8517f0ce3532d813e043ec9f5d0 + #6 50..54 11eba9ec2f204c8165a245f9d05bb7ebb5bfdbdbcccc1a849d8ab2b23550cc12 + + # ------------------------ + + snapOtherHealingList* = [ + @[snapOther0b, snapOther2, snapOther4], + @[snapOther0a, snapOther1a, snapOther5]] + # End