diff --git a/nimbus/sync/handlers.nim b/nimbus/sync/handlers.nim index c87b47694..6366dfd36 100644 --- a/nimbus/sync/handlers.nim +++ b/nimbus/sync/handlers.nim @@ -3,6 +3,7 @@ import eth/[common, p2p, trie/db], ./types, ./protocol/eth/eth_types, + ./protocol/trace_config, # gossip noise control ../db/db_chain, ../p2p/chain, ../utils/tx_pool @@ -12,6 +13,7 @@ type db: BaseChainDB chain: Chain txPool: TxPoolRef + disablePool: bool proc new*(_: type EthWireRef, chain: Chain, txPool: TxPoolRef): EthWireRef = EthWireRef( @@ -20,9 +22,15 @@ proc new*(_: type EthWireRef, chain: Chain, txPool: TxPoolRef): EthWireRef = txPool: txPool ) +proc notEnabled(name: string) = + debug "Wire handler method is disabled", meth = name + proc notImplemented(name: string) = debug "Wire handler method not implemented", meth = name +method poolEnabled*(ctx: EthWireRef; ena: bool) = + ctx.disablePool = not ena + method getStatus*(ctx: EthWireRef): EthState {.gcsafe.} = let db = ctx.db @@ -113,10 +121,15 @@ method getBlockHeaders*(ctx: EthWireRef, req: BlocksRequest): seq[BlockHeader] { result.add foundBlock method handleAnnouncedTxs*(ctx: EthWireRef, peer: Peer, txs: openArray[Transaction]) {.gcsafe.} = - ctx.txPool.jobAddTxs(txs) + if ctx.disablePool: + when trMissingOrDisabledGossipOk: + notEnabled("handleAnnouncedTxs") + else: + ctx.txPool.jobAddTxs(txs) method handleAnnouncedTxsHashes*(ctx: EthWireRef, peer: Peer, txHashes: openArray[Hash256]) {.gcsafe.} = - notImplemented("handleAnnouncedTxsHashes") + when trMissingOrDisabledGossipOk: + notImplemented("handleAnnouncedTxsHashes") method handleNewBlock*(ctx: EthWireRef, peer: Peer, blk: EthBlock, totalDifficulty: DifficultyInt) {.gcsafe.} = notImplemented("handleNewBlock") diff --git a/nimbus/sync/protocol/trace_config.nim b/nimbus/sync/protocol/trace_config.nim index e1ebcd4ea..db38ce1bf 100644 --- a/nimbus/sync/protocol/trace_config.nim +++ b/nimbus/sync/protocol/trace_config.nim @@ -13,7 +13,7 @@ const # Some static noisy settings for `eth` debugging trEthTracePacketsOk* = true ## `trace` log each sync network message. - trEthTraceGossipOk* = true + trEthTraceGossipOk* = true and false ## `trace` log each sync network message. trEthTraceHandshakesOk* = true ## `trace` log each network handshake message. @@ -24,7 +24,8 @@ const trSnapTracePacketsOk* = true ## `trace` log each sync network message. -# The files and lines clutter differently when sync tracing is enabled. -# publicLogScope: chroniclesLineNumbers=false + # Shut up particular eth context handler gossip + trMissingOrDisabledGossipOk* = true and false + ## Control `handleAnnouncedTxsHashes`, `handleAnnouncedTxsHashes`, etc. # End diff --git a/nimbus/sync/snap.nim b/nimbus/sync/snap.nim index 4b88e4695..35c4eb020 100644 --- a/nimbus/sync/snap.nim +++ b/nimbus/sync/snap.nim @@ -15,7 +15,7 @@ import ../db/select_backend, ../p2p/chain, ./snap/[worker, worker_desc], - "."/[sync_desc, sync_sched, protocol] + "."/[protocol, sync_desc, sync_sched] {.push raises: [Defect].} @@ -67,6 +67,8 @@ proc init*( result.ctx.chain = chain # explicitely override result.ctx.data.rng = rng result.ctx.data.dbBackend = dbBackend + # Required to have been initialised via `addCapability()` + doAssert not result.ctx.ethWireCtx.isNil proc start*(ctx: SnapSyncRef) = doAssert ctx.startSync() diff --git a/nimbus/sync/snap/range_desc.nim b/nimbus/sync/snap/range_desc.nim index 71019c90b..ed8ca13d5 100644 --- a/nimbus/sync/snap/range_desc.nim +++ b/nimbus/sync/snap/range_desc.nim @@ -23,14 +23,13 @@ type ByteArray32* = array[32,byte] ## Used for 32 byte database keys - NodeTag* = ##\ - ## Trie leaf item, account hash etc. - distinct UInt256 - NodeKey* = distinct ByteArray32 ## Hash key without the hash wrapper (as opposed to `NodeTag` which is a ## number) + NodeTag* = distinct UInt256 + ## Trie leaf item, account hash etc. + NodeTagRange* = Interval[NodeTag,UInt256] ## Interval `[minPt,maxPt]` of` NodeTag` elements, can be managed in an ## `IntervalSet` data type. @@ -93,6 +92,10 @@ proc to*(hash: Hash256; T: type NodeKey): T = ## Syntactic sugar hash.data.NodeKey +proc to*(key: NodeKey; T: type Hash256): T = + ## Syntactic sugar + T(data: key.ByteArray32) + proc to*(key: NodeKey; T: type Blob): T = ## Syntactic sugar key.ByteArray32.toSeq @@ -101,6 +104,15 @@ proc to*(n: SomeUnsignedInt|UInt256; T: type NodeTag): T = ## Syntactic sugar n.u256.T + +proc hash*(a: NodeKey): Hash = + ## Table/KeyedQueue mixin + a.ByteArray32.hash + +proc `==`*(a, b: NodeKey): bool = + ## Table/KeyedQueue mixin + a.ByteArray32 == b.ByteArray32 + # ------------------------------------------------------------------------------ # Public constructors # ------------------------------------------------------------------------------ @@ -194,7 +206,7 @@ proc emptyFactor*(lrs: openArray[NodeTagRangeSet]): float = ## Variant of `emptyFactor()` where intervals are distributed across several ## sets. This function makes sense only if the interval sets are mutually ## disjunct. - var accu: Nodetag + var accu: NodeTag for ivSet in lrs: if 0 < ivSet.total: if high(NodeTag) - ivSet.total < accu: @@ -208,6 +220,7 @@ proc emptyFactor*(lrs: openArray[NodeTagRangeSet]): float = return 1.0 ((high(NodeTag) - accu).u256 + 1).to(float) / (2.0^256) + proc fullFactor*(lrs: NodeTagRangeSet): float = ## Relative covered total, i.e. `#points-covered / 2^256` to be used ## in statistics or triggers @@ -218,6 +231,24 @@ proc fullFactor*(lrs: NodeTagRangeSet): float = else: 1.0 # number of points in `lrs` is `2^256 + 1` +proc fullFactor*(lrs: openArray[NodeTagRangeSet]): float = + ## Variant of `fullFactor()` where intervals are distributed across several + ## sets. This function makes sense only if the interval sets are mutually + ## disjunct. + var accu: NodeTag + for ivSet in lrs: + if 0 < ivSet.total: + if high(NodeTag) - ivSet.total < accu: + return 1.0 + accu = accu + ivSet.total + elif ivSet.chunks == 0: + discard + else: # number of points in `ivSet` is `2^256 + 1` + return 1.0 + if accu == 0.to(NodeTag): + return 0.0 + accu.u256.to(float) / (2.0^256) + # ------------------------------------------------------------------------------ # Public functions: printing & pretty printing # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker.nim b/nimbus/sync/snap/worker.nim index 97a6a3b78..5f5d145d5 100644 --- a/nimbus/sync/snap/worker.nim +++ b/nimbus/sync/snap/worker.nim @@ -9,14 +9,15 @@ # except according to those terms. import - std/[hashes, math, options, sets], + std/[hashes, math, options, sets, strutils], chronicles, chronos, eth/[common/eth_types, p2p], stew/[interval_set, keyed_queue], ../../db/select_backend, - ".."/[protocol, sync_desc], - ./worker/[heal_accounts, store_accounts, store_storages, ticker], + ".."/[handlers, protocol, sync_desc], + ./worker/[heal_accounts, heal_storages, store_accounts, store_storages, + ticker], ./worker/com/[com_error, get_block_header], ./worker/db/snapdb_desc, "."/[range_desc, worker_desc] @@ -167,14 +168,6 @@ proc appendPivotEnv(buddy: SnapBuddyRef; header: BlockHeader) = # Append per-state root environment to LRU queue discard ctx.data.pivotTable.lruAppend(header.stateRoot, env, ctx.buddiesMax) - # Debugging, will go away - block: - let ivSet = env.fetchAccounts.unprocessed[0].clone - for iv in env.fetchAccounts.unprocessed[1].increasing: - doAssert ivSet.merge(iv) == iv.len - doAssert ivSet.chunks == 1 - doAssert ivSet.total == 0 - proc updatePivotImpl(buddy: SnapBuddyRef): Future[bool] {.async.} = ## Helper, negotiate pivot unless present @@ -233,7 +226,7 @@ else: proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater = result = proc: TickerStats = var - aSum, aSqSum, uSum, uSqSum, sSum, sSqSum: float + aSum, aSqSum, uSum, uSqSum, sSum, sSqSum, wSum, wSqSum: float count = 0 for kvp in ctx.data.pivotTable.nextPairs: @@ -253,6 +246,16 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater = sSum += sLen sSqSum += sLen * sLen + # Storage queue size for that account + var stoFill: float + for stoKvp in kvp.data.fetchStorage.nextPairs: + if stoKvp.data.slots.isNil: + stoFill += 1.0 + else: + stoFill += stoKvp.data.slots.unprocessed.fullFactor + wSum += stoFill + wSqSum += stoFill * stoFill + let env = ctx.data.pivotTable.lastValue.get(otherwise = nil) pivotBlock = if env.isNil: none(BlockNumber) @@ -265,7 +268,8 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater = nQueues: ctx.data.pivotTable.len, nAccounts: meanStdDev(aSum, aSqSum, count), nStorage: meanStdDev(sSum, sSqSum, count), - accountsFill: (accFill[0], accFill[1], accCoverage)) + accountsFill: (accFill[0], accFill[1], accCoverage), + storageQueue: meanStdDev(wSum, wSqSum, count)) # ------------------------------------------------------------------------------ # Public start/stop and admin functions @@ -273,6 +277,8 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater = proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool = ## Global set up + noExceptionOops("worker.setup()"): + ctx.ethWireCtx.poolEnabled(false) ctx.data.coveredAccounts = NodeTagRangeSet.init() ctx.data.snapDb = if ctx.data.dbBackend.isNil: SnapDbRef.init(ctx.chain.db.db) @@ -412,18 +418,18 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} = await buddy.storeStorages() if buddy.ctrl.stopped: return + # Pivot might have changed, so restart with the latest one + if env != ctx.data.pivotTable.lastValue.value: return + # If the current database is not complete yet if 0 < env.fetchAccounts.unprocessed[0].chunks or 0 < env.fetchAccounts.unprocessed[1].chunks: - # Healing applies to the latest pivot only. The pivot might have changed - # in the background (while netwoking) due to a new peer worker that has - # negotiated another, newer pivot. - if env == ctx.data.pivotTable.lastValue.value: - await buddy.healAccountsDb() - if buddy.ctrl.stopped: return + await buddy.healAccountsDb() + if buddy.ctrl.stopped: return - # TODO: use/apply storage healer + await buddy.healStoragesDb() + if buddy.ctrl.stopped: return # Check whether accounts might be complete. if env.fetchStorage.len == 0: diff --git a/nimbus/sync/snap/worker/db/hexary_desc.nim b/nimbus/sync/snap/worker/db/hexary_desc.nim index 0e94fafb7..c5fffd73e 100644 --- a/nimbus/sync/snap/worker/db/hexary_desc.nim +++ b/nimbus/sync/snap/worker/db/hexary_desc.nim @@ -146,9 +146,10 @@ type repairKeyGen*: uint64 ## Unique tmp key generator keyPp*: HexaryPpFn ## For debugging, might go away - HexaryGetFn* = proc(key: Blob): Blob {.gcsafe.} - ## Persistent database get() function. For read-only cacses, this function - ## can be seen as the persistent alternative to `HexaryTreeDbRef`. + HexaryGetFn* = proc(key: openArray[byte]): Blob {.gcsafe.} + ## Persistent database `get()` function. For read-only cases, this function + ## can be seen as the persistent alternative to ``tab[]` on a + ## `HexaryTreeDbRef` descriptor. HexaryNodeReport* = object ## Return code for single node operations @@ -357,18 +358,10 @@ proc newRepairKey*(db: HexaryTreeDbRef): RepairKey = # Public functions # ------------------------------------------------------------------------------ -proc hash*(a: NodeKey): Hash = - ## Tables mixin - a.ByteArray32.hash - proc hash*(a: RepairKey): Hash = ## Tables mixin a.ByteArray33.hash -proc `==`*(a, b: NodeKey): bool = - ## Tables mixin - a.ByteArray32 == b.ByteArray32 - proc `==`*(a, b: RepairKey): bool = ## Tables mixin a.ByteArray33 == b.ByteArray33 diff --git a/nimbus/sync/snap/worker/db/snapdb_accounts.nim b/nimbus/sync/snap/worker/db/snapdb_accounts.nim index 764b324fc..19f4781e8 100644 --- a/nimbus/sync/snap/worker/db/snapdb_accounts.nim +++ b/nimbus/sync/snap/worker/db/snapdb_accounts.nim @@ -24,6 +24,7 @@ logScope: type SnapDbAccountsRef* = ref object of SnapDbBaseRef + peer: Peer ## For log messages getFn: HexaryGetFn ## Persistent database `get()` closure const @@ -140,8 +141,9 @@ proc init*( ## Constructor, starts a new accounts session. let db = pv.kvDb new result - result.init(pv, root.to(NodeKey), peer) - result.getFn = proc(key: Blob): Blob = db.get(key) + result.init(pv, root.to(NodeKey)) + result.peer = peer + result.getFn = proc(key: openArray[byte]): Blob = db.get(key) proc dup*( ps: SnapDbAccountsRef; @@ -182,7 +184,7 @@ proc importAccounts*( var accounts: seq[RLeafSpecs] try: if 0 < data.proof.len: - let rc = ps.mergeProofs(ps.root, data.proof) + let rc = ps.mergeProofs(ps.peer, ps.root, data.proof) if rc.isErr: return err(rc.error) block: @@ -227,7 +229,7 @@ proc importAccounts*( pv, root, peer).importAccounts(base, data, persistent=true) -proc importRawAccountNodes*( +proc importRawAccountsNodes*( ps: SnapDbAccountsRef; ## Re-usable session descriptor nodes: openArray[Blob]; ## Node records reportNodes = {Leaf}; ## Additional node types to report @@ -293,7 +295,7 @@ proc importRawAccountNodes*( if nErrors == 0: trace "Raw account nodes imported", peer, slot, nItems, report=result.len -proc importRawAccountNodes*( +proc importRawAccountsNodes*( pv: SnapDbRef; ## Base descriptor on `BaseChainDB` peer: Peer, ## For log messages, only nodes: openArray[Blob]; ## Node records @@ -301,7 +303,7 @@ proc importRawAccountNodes*( ): seq[HexaryNodeReport] = ## Variant of `importRawNodes()` for persistent storage. SnapDbAccountsRef.init( - pv, Hash256(), peer).importRawAccountNodes( + pv, Hash256(), peer).importRawAccountsNodes( nodes, reportNodes, persistent=true) @@ -334,6 +336,8 @@ proc inspectAccountsTrie*( break checkForError trace "Inspect account trie failed", peer, nPathList=pathList.len, nDangling=stats.dangling.len, stoppedAt=stats.level, error + if ignoreError: + return ok(stats) return err(error) when extraTraceMessages: @@ -353,7 +357,7 @@ proc inspectAccountsTrie*( pv, root, peer).inspectAccountsTrie(pathList, persistent=true, ignoreError) -proc getAccountNodeKey*( +proc getAccountsNodeKey*( ps: SnapDbAccountsRef; ## Re-usable session descriptor path: Blob; ## Partial node path persistent = false; ## Read data from disk @@ -369,18 +373,18 @@ proc getAccountNodeKey*( return ok(rc.value) err(NodeNotFound) -proc getAccountNodeKey*( +proc getAccountsNodeKey*( pv: SnapDbRef; ## Base descriptor on `BaseChainDB` peer: Peer; ## For log messages, only root: Hash256; ## state root path: Blob; ## Partial node path ): Result[NodeKey,HexaryDbError] = - ## Variant of `inspectAccountsPath()` for persistent storage. + ## Variant of `getAccountsNodeKey()` for persistent storage. SnapDbAccountsRef.init( - pv, root, peer).getAccountNodeKey(path, persistent=true) + pv, root, peer).getAccountsNodeKey(path, persistent=true) -proc getAccountData*( +proc getAccountsData*( ps: SnapDbAccountsRef; ## Re-usable session descriptor path: NodeKey; ## Account to visit persistent = false; ## Read data from disk @@ -404,14 +408,15 @@ proc getAccountData*( return ok(acc) -proc getAccountData*( +proc getAccountsData*( pv: SnapDbRef; ## Base descriptor on `BaseChainDB` - peer: Peer, ## For log messages, only - root: Hash256; ## state root + peer: Peer; ## For log messages, only + root: Hash256; ## State root path: NodeKey; ## Account to visit ): Result[Account,HexaryDbError] = - ## Variant of `getAccount()` for persistent storage. - SnapDbAccountsRef.init(pv, root, peer).getAccountData(path, persistent=true) + ## Variant of `getAccountsData()` for persistent storage. + SnapDbAccountsRef.init( + pv, root, peer).getAccountsData(path, persistent=true) # ------------------------------------------------------------------------------ # Public functions: additional helpers @@ -439,16 +444,16 @@ proc sortMerge*(acc: openArray[seq[PackedAccount]]): seq[PackedAccount] = accounts[item.accHash.to(NodeTag)] = item result = toSeq(accounts.keys).sorted(cmp).mapIt(accounts[it]) -proc getChainDbAccount*( +proc getAccountsChainDb*( ps: SnapDbAccountsRef; - accHash: Hash256 + accHash: Hash256; ): Result[Account,HexaryDbError] = ## Fetch account via `BaseChainDB` - ps.getAccountData(accHash.to(NodeKey),persistent=true) + ps.getAccountsData(accHash.to(NodeKey),persistent=true) -proc nextChainDbKey*( +proc nextAccountsChainDbKey*( ps: SnapDbAccountsRef; - accHash: Hash256 + accHash: Hash256; ): Result[Hash256,HexaryDbError] = ## Fetch the account path on the `BaseChainDB`, the one next to the ## argument account. @@ -462,9 +467,9 @@ proc nextChainDbKey*( err(AccountNotFound) -proc prevChainDbKey*( +proc prevAccountsChainDbKey*( ps: SnapDbAccountsRef; - accHash: Hash256 + accHash: Hash256; ): Result[Hash256,HexaryDbError] = ## Fetch the account path on the `BaseChainDB`, the one before to the ## argument account. diff --git a/nimbus/sync/snap/worker/db/snapdb_desc.nim b/nimbus/sync/snap/worker/db/snapdb_desc.nim index 707f6406f..6ad864697 100644 --- a/nimbus/sync/snap/worker/db/snapdb_desc.nim +++ b/nimbus/sync/snap/worker/db/snapdb_desc.nim @@ -33,7 +33,6 @@ type ## Session descriptor xDb: HexaryTreeDbRef ## Hexary database base: SnapDbRef ## Back reference to common parameters - peer*: Peer ## For log messages root*: NodeKey ## Session DB root node key # ------------------------------------------------------------------------------ @@ -126,7 +125,6 @@ proc init*( peer: Peer = nil) = ## Session base constructor ps.base = pv - ps.peer = peer ps.root = root ps.xDb = HexaryTreeDbRef.init(pv) @@ -137,7 +135,7 @@ proc init*( peer: Peer = nil): T = ## Variant of session base constructor new result - result.init(ps.base, root, peer) + result.init(ps.base, root) # ------------------------------------------------------------------------------ # Public getters @@ -173,6 +171,7 @@ proc dbBackendRocksDb*(ps: SnapDbBaseRef): bool = proc mergeProofs*( ps: SnapDbBaseRef; ## Session database + peer: Peer; ## For log messages root: NodeKey; ## Root for checking nodes proof: seq[Blob]; ## Node records freeStandingOk = false; ## Remove freestanding nodes @@ -183,7 +182,6 @@ proc mergeProofs*( ## trie at a later stage and used for validating account data. let db = ps.hexaDb - peer = ps.peer var nodes: HashSet[RepairKey] refs = @[root.to(RepairKey)].toHashSet diff --git a/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim b/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim index 31daf0431..133e1af05 100644 --- a/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim +++ b/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim @@ -9,12 +9,13 @@ # except according to those terms. import - std/[tables], + std/tables, chronicles, - eth/[common/eth_types, p2p], + eth/[common, p2p, rlp, trie/db], ../../../protocol, ../../range_desc, - "."/[bulk_storage, hexary_desc, hexary_error, hexary_interpolate, snapdb_desc] + "."/[bulk_storage, hexary_desc, hexary_error, hexary_import, hexary_inspect, + hexary_interpolate, hexary_paths, snapdb_desc] {.push raises: [Defect].} @@ -25,8 +26,13 @@ const extraTraceMessages = false or true type + GetAccFn = proc(accHash: Hash256, key: openArray[byte]): Blob {.gcsafe.} + ## The `get()` function for the storage trie depends on the current account + SnapDbStorageSlotsRef* = ref object of SnapDbBaseRef - accHash*: Hash256 ## Accounts address hash (curr.unused) + peer: Peer ## For log messages + accHash: Hash256 ## Accounts address hash (curr.unused) + getAccFn: GetAccFn ## Persistent database `get()` closure # ------------------------------------------------------------------------------ # Private helpers @@ -38,6 +44,11 @@ proc to(h: Hash256; T: type NodeKey): T = proc convertTo(data: openArray[byte]; T: type Hash256): T = discard result.data.NodeKey.init(data) # size error => zero +proc getAccCls(ps: SnapDbStorageSlotsRef; accHash: Hash256): HexaryGetFn = + ## Fix `accHash` argument in `GetAccFn` closure => `HexaryGetFn` + result = proc(key: openArray[byte]): Blob = ps.getAccFn(accHash,key) + + template noKeyError(info: static[string]; code: untyped) = try: code @@ -56,11 +67,21 @@ template noRlpExceptionOops(info: static[string]; code: untyped) = except Exception as e: raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg +template noGenericExOrKeyError(info: static[string]; code: untyped) = + try: + code + except KeyError as e: + raiseAssert "Not possible (" & info & "): " & e.msg + except Defect as e: + raise e + except Exception as e: + raiseAssert "Ooops " & info & ": name=" & $e.name & " msg=" & e.msg + # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ -proc persistentStorages( +proc persistentStorageSlots( db: HexaryTreeDbRef; ## Current table ps: SnapDbStorageSlotsRef; ## For persistent database ): Result[void,HexaryDbError] @@ -116,7 +137,7 @@ proc importStorageSlots( var slots: seq[RLeafSpecs] if 0 < proof.len: - let rc = tmpDb.mergeProofs(root, proof) + let rc = tmpDb.mergeProofs(ps.peer, root, proof) if rc.isErr: return err(rc.error) block: @@ -145,20 +166,27 @@ proc importStorageSlots( proc init*( T: type SnapDbStorageSlotsRef; pv: SnapDbRef; - account = Hash256(); - root = Hash256(); + account: Hash256; + root: Hash256; peer: Peer = nil ): T = ## Constructor, starts a new accounts session. + let db = pv.kvDb + new result - result.init(pv, root.to(NodeKey), peer) + result.init(pv, root.to(NodeKey)) + result.peer = peer result.accHash = account + # At the moment, the resulting `getAccFn()` is independent of `accHash` + result.getAccFn = proc(accHash: Hash256, key: openArray[byte]): Blob = + db.get(key) + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ -proc importStorages*( +proc importStorageSlots*( ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor data: AccountStorageRange; ## Account storage reply from `snap/1` protocol persistent = false; ## store data on disk @@ -210,7 +238,7 @@ proc importStorages*( # Store to disk if persistent and 0 < ps.hexaDb.tab.len: slot = none(int) - let rc = ps.hexaDb.persistentStorages(ps) + let rc = ps.hexaDb.persistentStorageSlots(ps) if rc.isErr: result.add HexaryNodeReport(slot: slot, error: rc.error) @@ -231,14 +259,236 @@ proc importStorages*( trace "Storage slots imported", peer, nItems, slots=data.storages.len, proofs=data.proof.len -proc importStorages*( +proc importStorageSlots*( pv: SnapDbRef; ## Base descriptor on `BaseChainDB` - peer: Peer, ## For log messages, only + peer: Peer; ## For log messages, only data: AccountStorageRange; ## Account storage reply from `snap/1` protocol ): seq[HexaryNodeReport] = ## Variant of `importStorages()` SnapDbStorageSlotsRef.init( - pv, peer=peer).importStorages(data, persistent=true) + pv, Hash256(), Hash256(), peer).importStorageSlots(data, persistent=true) + + +proc importRawStorageSlotsNodes*( + ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor + nodes: openArray[Blob]; ## Node records + reportNodes = {Leaf}; ## Additional node types to report + persistent = false; ## store data on disk + ): seq[HexaryNodeReport] = + ## Store data nodes given as argument `nodes` on the persistent database. + ## + ## If there were an error when processing a particular argument `notes` item, + ## it will be reported with the return value providing argument slot/index, + ## node type, end error code. + ## + ## If there was an error soring persistent data, the last report item will + ## have an error code, only. + ## + ## Additional node items might be reported if the node type is in the + ## argument set `reportNodes`. These reported items will have no error + ## code set (i.e. `NothingSerious`.) + ## + let + peer = ps.peer + db = HexaryTreeDbRef.init(ps) + nItems = nodes.len + var + nErrors = 0 + slot: Option[int] + try: + # Import nodes + for n,rec in nodes: + if 0 < rec.len: # otherwise ignore empty placeholder + slot = some(n) + var rep = db.hexaryImport(rec) + if rep.error != NothingSerious: + rep.slot = slot + result.add rep + nErrors.inc + trace "Error importing storage slots nodes", peer, inx=n, nItems, + error=rep.error, nErrors + elif rep.kind.isSome and rep.kind.unsafeGet in reportNodes: + rep.slot = slot + result.add rep + + # Store to disk + if persistent and 0 < db.tab.len: + slot = none(int) + let rc = db.persistentStorageSlots(ps) + if rc.isErr: + result.add HexaryNodeReport(slot: slot, error: rc.error) + + except RlpError: + result.add HexaryNodeReport(slot: slot, error: RlpEncoding) + nErrors.inc + trace "Error importing storage slots nodes", peer, slot, nItems, + error=RlpEncoding, nErrors + except KeyError as e: + raiseAssert "Not possible @ importRawSorageSlotsNodes: " & e.msg + except OSError as e: + result.add HexaryNodeReport(slot: slot, error: OSErrorException) + nErrors.inc + trace "Import storage slots nodes exception", peer, slot, nItems, + name=($e.name), msg=e.msg, nErrors + + when extraTraceMessages: + if nErrors == 0: + trace "Raw storage slots nodes imported", peer, slot, nItems, + report=result.len + +proc importRawStorageSlotsNodes*( + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` + peer: Peer, ## For log messages, only + accHash: Hash256; ## Account key + nodes: openArray[Blob]; ## Node records + reportNodes = {Leaf}; ## Additional node types to report + ): seq[HexaryNodeReport] = + ## Variant of `importRawNodes()` for persistent storage. + SnapDbStorageSlotsRef.init( + pv, accHash, Hash256(), peer).importRawStorageSlotsNodes( + nodes, reportNodes, persistent=true) + + +proc inspectStorageSlotsTrie*( + ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor + pathList = seq[Blob].default; ## Starting nodes for search + persistent = false; ## Read data from disk + ignoreError = false; ## Always return partial results if available + ): Result[TrieNodeStat, HexaryDbError] = + ## Starting with the argument list `pathSet`, find all the non-leaf nodes in + ## the hexary trie which have at least one node key reference missing in + ## the trie database. Argument `pathSet` list entries that do not refer to a + ## valid node are silently ignored. + ## + let peer = ps.peer + var stats: TrieNodeStat + noRlpExceptionOops("inspectStorageSlotsTrie()"): + if persistent: + stats = ps.getAccCls(ps.accHash).hexaryInspectTrie(ps.root, pathList) + else: + stats = ps.hexaDb.hexaryInspectTrie(ps.root, pathList) + + block checkForError: + let error = block: + if stats.stopped: + TrieLoopAlert + elif stats.level == 0: + TrieIsEmpty + else: + break checkForError + trace "Inspect storage slots trie failed", peer, nPathList=pathList.len, + nDangling=stats.dangling.len, stoppedAt=stats.level, error + if ignoreError: + return ok(stats) + return err(error) + + when extraTraceMessages: + trace "Inspect storage slots trie ok", peer, nPathList=pathList.len, + nDangling=stats.dangling.len, level=stats.level + return ok(stats) + +proc inspectStorageSlotsTrie*( + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` + peer: Peer; ## For log messages, only + accHash: Hash256; ## Account key + root: Hash256; ## state root + pathList = seq[Blob].default; ## Starting paths for search + ignoreError = false; ## Always return partial results when avail. + ): Result[TrieNodeStat, HexaryDbError] = + ## Variant of `inspectStorageSlotsTrieTrie()` for persistent storage. + SnapDbStorageSlotsRef.init( + pv, accHash, root, peer).inspectStorageSlotsTrie( + pathList, persistent=true, ignoreError) + + +proc getStorageSlotsNodeKey*( + ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor + path: Blob; ## Partial node path + persistent = false; ## Read data from disk + ): Result[NodeKey,HexaryDbError] = + ## For a partial node path argument `path`, return the raw node key. + var rc: Result[NodeKey,void] + noRlpExceptionOops("inspectAccountsPath()"): + if persistent: + rc = ps.getAccCls(ps.accHash).hexaryInspectPath(ps.root, path) + else: + rc = ps.hexaDb.hexaryInspectPath(ps.root, path) + if rc.isOk: + return ok(rc.value) + err(NodeNotFound) + +proc getStorageSlotsNodeKey*( + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` + peer: Peer; ## For log messages, only + accHash: Hash256; ## Account key + root: Hash256; ## state root + path: Blob; ## Partial node path + ): Result[NodeKey,HexaryDbError] = + ## Variant of `getStorageSlotsNodeKey()` for persistent storage. + SnapDbStorageSlotsRef.init( + pv, accHash, root, peer).getStorageSlotsNodeKey(path, persistent=true) + + +proc getStorageSlotsData*( + ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor + path: NodeKey; ## Account to visit + persistent = false; ## Read data from disk + ): Result[Account,HexaryDbError] = + ## Fetch storage slots data. + ## + ## Caveat: There is no unit test yet + let peer = ps.peer + var acc: Account + + noRlpExceptionOops("getStorageSlotsData()"): + var leaf: Blob + if persistent: + leaf = path.hexaryPath(ps.root, ps.getAccCls(ps.accHash)).leafData + else: + leaf = path.hexaryPath(ps.root.to(RepairKey),ps.hexaDb).leafData + + if leaf.len == 0: + return err(AccountNotFound) + acc = rlp.decode(leaf,Account) + + return ok(acc) + +proc getStorageSlotsData*( + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` + peer: Peer, ## For log messages, only + accHash: Hash256; ## Account key + root: Hash256; ## state root + path: NodeKey; ## Account to visit + ): Result[Account,HexaryDbError] = + ## Variant of `getStorageSlotsData()` for persistent storage. + SnapDbStorageSlotsRef.init( + pv, accHash, root, peer).getStorageSlotsData(path, persistent=true) + + +proc haveStorageSlotsData*( + ps: SnapDbStorageSlotsRef; ## Re-usable session descriptor + persistent = false; ## Read data from disk + ): bool = + ## Return `true` if there is at least one intermediate hexary node for this + ## accounts storage slots trie. + ## + ## Caveat: There is no unit test yet + noGenericExOrKeyError("haveStorageSlotsData()"): + if persistent: + let getFn = ps.getAccCls(ps.accHash) + return 0 < ps.root.ByteArray32.getFn().len + else: + return ps.hexaDb.tab.hasKey(ps.root.to(RepairKey)) + +proc haveStorageSlotsData*( + pv: SnapDbRef; ## Base descriptor on `BaseChainDB` + peer: Peer, ## For log messages, only + accHash: Hash256; ## Account key + root: Hash256; ## state root + ): bool = + ## Variant of `haveStorageSlotsData()` for persistent storage. + SnapDbStorageSlotsRef.init( + pv, accHash, root, peer).haveStorageSlotsData(persistent=true) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/heal_accounts.nim b/nimbus/sync/snap/worker/heal_accounts.nim index a4fc9a54a..c458e6451 100644 --- a/nimbus/sync/snap/worker/heal_accounts.nim +++ b/nimbus/sync/snap/worker/heal_accounts.nim @@ -110,7 +110,7 @@ import chronos, eth/[common/eth_types, p2p, trie/nibbles, trie/trie_defs, rlp], stew/[interval_set, keyed_queue], - ../../../utils/prettify, + ../../../utils/prettify, ../../sync_desc, ".."/[range_desc, worker_desc], ./com/[com_error, get_trie_nodes], @@ -150,6 +150,7 @@ proc updateMissingNodesList(buddy: SnapBuddyRef) = ## time. These nodes will me moved to `checkNodes` for further processing. let ctx = buddy.ctx + db = ctx.data.snapDb peer = buddy.peer env = buddy.data.pivotEnv stateRoot = env.stateHeader.stateRoot @@ -160,7 +161,7 @@ proc updateMissingNodesList(buddy: SnapBuddyRef) = trace "Start accounts healing", peer, ctx=buddy.healingCtx() for accKey in env.fetchAccounts.missingNodes: - let rc = ctx.data.snapDb.getAccountNodeKey(peer, stateRoot, accKey) + let rc = db.getAccountsNodeKey(peer, stateRoot, accKey) if rc.isOk: # Check nodes for dangling links env.fetchAccounts.checkNodes.add accKey @@ -177,12 +178,12 @@ proc appendMoreDanglingNodesToMissingNodesList(buddy: SnapBuddyRef): bool = ## fed back to the vey same list `checkNodes` let ctx = buddy.ctx + db = ctx.data.snapDb peer = buddy.peer env = buddy.data.pivotEnv stateRoot = env.stateHeader.stateRoot - rc = ctx.data.snapDb.inspectAccountsTrie( - peer, stateRoot, env.fetchAccounts.checkNodes) + rc = db.inspectAccountsTrie(peer, stateRoot, env.fetchAccounts.checkNodes) if rc.isErr: when extraTraceMessages: @@ -190,7 +191,7 @@ proc appendMoreDanglingNodesToMissingNodesList(buddy: SnapBuddyRef): bool = ctx=buddy.healingCtx(), error=rc.error # Attempt to switch peers, there is not much else we can do here buddy.ctrl.zombie = true - return + return false # Global/env batch list to be replaced by by `rc.value.leaves` return value env.fetchAccounts.checkNodes.setLen(0) @@ -257,7 +258,6 @@ proc kvAccountLeaf( ## Read leaf node from persistent database (if any) let peer = buddy.peer - env = buddy.data.pivotEnv nodeRlp = rlpFromBytes node (_,prefix) = hexPrefixDecode partialPath @@ -277,7 +277,7 @@ proc registerAccountLeaf( accKey: NodeKey; acc: Account) = ## Process single account node as would be done with an interval by - ## the `storeAccounts()` functoon + ## the `storeAccounts()` function let peer = buddy.peer env = buddy.data.pivotEnv @@ -304,7 +304,8 @@ proc registerAccountLeaf( storageRoot: acc.storageRoot) when extraTraceMessages: - trace "Isolated node for healing", peer, ctx=buddy.healingCtx(), accKey=pt + trace "Isolated account for healing", peer, + ctx=buddy.healingCtx(), accKey=pt # ------------------------------------------------------------------------------ # Public functions @@ -314,6 +315,7 @@ proc healAccountsDb*(buddy: SnapBuddyRef) {.async.} = ## Fetching and merging missing account trie database nodes. let ctx = buddy.ctx + db = ctx.data.snapDb peer = buddy.peer env = buddy.data.pivotEnv @@ -353,7 +355,7 @@ proc healAccountsDb*(buddy: SnapBuddyRef) {.async.} = return # Store nodes to disk - let report = ctx.data.snapDb.importRawAccountNodes(peer, nodesData) + let report = db.importRawAccountsNodes(peer, nodesData) if 0 < report.len and report[^1].slot.isNone: # Storage error, just run the next lap (not much else that can be done) error "Accounts healing, error updating persistent database", peer, diff --git a/nimbus/sync/snap/worker/heal_storages.nim b/nimbus/sync/snap/worker/heal_storages.nim new file mode 100644 index 000000000..c54a881d9 --- /dev/null +++ b/nimbus/sync/snap/worker/heal_storages.nim @@ -0,0 +1,400 @@ +# Nimbus +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +## Heal storage DB: +## ================ +## +## This module works similar to `heal_accounts` applied to each +## per-account storage slots hexary trie. + +import + std/sequtils, + chronicles, + chronos, + eth/[common/eth_types, p2p, trie/nibbles, trie/trie_defs, rlp], + stew/[interval_set, keyed_queue], + ../../../utils/prettify, + ../../sync_desc, + ".."/[range_desc, worker_desc], + ./com/[com_error, get_trie_nodes], + ./db/[hexary_desc, hexary_error, snapdb_storage_slots] + +{.push raises: [Defect].} + +logScope: + topics = "snap-heal" + +const + extraTraceMessages = false or true + ## Enabled additional logging noise + +# ------------------------------------------------------------------------------ +# Private logging helpers +# ------------------------------------------------------------------------------ + +proc healingCtx( + buddy: SnapBuddyRef; + kvp: SnapSlotsQueuePair; + ): string = + let + slots = kvp.data.slots + "[" & + "covered=" & slots.unprocessed.emptyFactor.toPC(0) & + "nCheckNodes=" & $slots.checkNodes.len & "," & + "nMissingNodes=" & $slots.missingNodes.len & "]" + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc updateMissingNodesList( + buddy: SnapBuddyRef; + kvp: SnapSlotsQueuePair) = + ## Check whether previously missing nodes from the `missingNodes` list + ## have been magically added to the database since it was checked last + ## time. These nodes will me moved to `checkNodes` for further processing. + let + ctx = buddy.ctx + db = ctx.data.snapDb + peer = buddy.peer + accHash = kvp.data.accHash + storageRoot = kvp.key.to(Hash256) + slots = kvp.data.slots + var + nodes: seq[Blob] + + when extraTraceMessages: + trace "Start storage slots healing", peer, ctx=buddy.healingCtx(kvp) + + for slotKey in slots.missingNodes: + let rc = db.getStorageSlotsNodeKey(peer, accHash, storageRoot, slotKey) + if rc.isOk: + # Check nodes for dangling links + slots.checkNodes.add slotKey + else: + # Node is still missing + nodes.add slotKey + + slots.missingNodes = nodes + + +proc appendMoreDanglingNodesToMissingNodesList( + buddy: SnapBuddyRef; + kvp: SnapSlotsQueuePair; + ): bool = + ## Starting with a given set of potentially dangling intermediate trie nodes + ## `checkNodes`, this set is filtered and processed. The outcome is fed back + ## to the vey same list `checkNodes` + let + ctx = buddy.ctx + db = ctx.data.snapDb + peer = buddy.peer + accHash = kvp.data.accHash + storageRoot = kvp.key.to(Hash256) + slots = kvp.data.slots + + rc = db.inspectStorageSlotsTrie( + peer, accHash, storageRoot, slots.checkNodes) + + if rc.isErr: + when extraTraceMessages: + error "Storage slots healing failed => stop", peer, + ctx=buddy.healingCtx(kvp), error=rc.error + # Attempt to switch peers, there is not much else we can do here + buddy.ctrl.zombie = true + return false + + # Update batch lists + slots.checkNodes.setLen(0) + slots.missingNodes = slots.missingNodes & rc.value.dangling + + true + + +proc getMissingNodesFromNetwork( + buddy: SnapBuddyRef; + kvp: SnapSlotsQueuePair; + ): Future[seq[Blob]] + {.async.} = + ## Extract from `missingNodes` the next batch of nodes that need + ## to be merged it into the database + let + ctx = buddy.ctx + peer = buddy.peer + accHash = kvp.data.accHash + storageRoot = kvp.key.to(Hash256) + slots = kvp.data.slots + + nMissingNodes = slots.missingNodes.len + inxLeft = max(0, nMissingNodes - maxTrieNodeFetch) + + # There is no point in processing too many nodes at the same time. So leave + # the rest on the `missingNodes` queue to be handled later. + let fetchNodes = slots.missingNodes[inxLeft ..< nMissingNodes] + slots.missingNodes.setLen(inxLeft) + + # Fetch nodes from the network. Note that the remainder of the `missingNodes` + # list might be used by another process that runs semi-parallel. + let + req = @[accHash.data.toSeq] & fetchNodes.mapIt(@[it]) + rc = await buddy.getTrieNodes(storageRoot, req) + if rc.isOk: + # Register unfetched missing nodes for the next pass + slots.missingNodes = slots.missingNodes & rc.value.leftOver.mapIt(it[0]) + return rc.value.nodes + + # Restore missing nodes list now so that a task switch in the error checker + # allows other processes to access the full `missingNodes` list. + slots.missingNodes = slots.missingNodes & fetchNodes + + let error = rc.error + if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): + discard + when extraTraceMessages: + trace "Error fetching storage slots nodes for healing => stop", peer, + ctx=buddy.healingCtx(kvp), error + else: + discard + when extraTraceMessages: + trace "Error fetching storage slots nodes for healing", peer, + ctx=buddy.healingCtx(kvp), error + + return @[] + + +proc kvStorageSlotsLeaf( + buddy: SnapBuddyRef; + kvp: SnapSlotsQueuePair; + partialPath: Blob; + node: Blob; + ): (bool,NodeKey) + {.gcsafe, raises: [Defect,RlpError]} = + ## Read leaf node from persistent database (if any) + let + peer = buddy.peer + + nodeRlp = rlpFromBytes node + (_,prefix) = hexPrefixDecode partialPath + (_,segment) = hexPrefixDecode nodeRlp.listElem(0).toBytes + nibbles = prefix & segment + if nibbles.len == 64: + return (true, nibbles.getBytes.convertTo(NodeKey)) + + when extraTraceMessages: + trace "Isolated node path for healing => ignored", peer, + ctx=buddy.healingCtx(kvp) + + +proc registerStorageSlotsLeaf( + buddy: SnapBuddyRef; + kvp: SnapSlotsQueuePair; + slotKey: NodeKey) = + ## Process single trie node as would be done with an interval by + ## the `storeStorageSlots()` function + let + peer = buddy.peer + env = buddy.data.pivotEnv + slots = kvp.data.slots + pt = slotKey.to(NodeTag) + + # Find range set (from list) containing `pt` + var ivSet: NodeTagRangeSet + block foundCoveringRange: + for w in slots.unprocessed: + if 0 < w.covered(pt,pt): + ivSet = w + break foundCoveringRange + return # already processed, forget this account leaf + + # Register this isolated leaf node that was added + discard ivSet.reduce(pt,pt) + + when extraTraceMessages: + trace "Isolated storage slot for healing", + peer, ctx=buddy.healingCtx(kvp), slotKey=pt + +# ------------------------------------------------------------------------------ +# Private functions: do the healing for one work item (sub-trie) +# ------------------------------------------------------------------------------ + +proc storageSlotsHealing( + buddy: SnapBuddyRef; + kvp: SnapSlotsQueuePair; + ): Future[bool] + {.async.} = + ## Returns `true` is the sub-trie is complete (probably inherited), and + ## `false` if there are nodes left to be completed. + let + ctx = buddy.ctx + db = ctx.data.snapDb + peer = buddy.peer + accHash = kvp.data.accHash + slots = kvp.data.slots + + # Update for changes since last visit + buddy.updateMissingNodesList(kvp) + + # ??? + if slots.checkNodes.len != 0: + if not buddy.appendMoreDanglingNodesToMissingNodesList(kvp): + return false + + # Check whether the trie is complete. + if slots.missingNodes.len == 0: + trace "Storage slots healing complete", peer, ctx=buddy.healingCtx(kvp) + return true + + # Get next batch of nodes that need to be merged it into the database + let nodesData = await buddy.getMissingNodesFromNetwork(kvp) + if nodesData.len == 0: + return + + # Store nodes to disk + let report = db.importRawStorageSlotsNodes(peer, accHash, nodesData) + if 0 < report.len and report[^1].slot.isNone: + # Storage error, just run the next lap (not much else that can be done) + error "Storage slots healing, error updating persistent database", peer, + ctx=buddy.healingCtx(kvp), nNodes=nodesData.len, error=report[^1].error + slots.missingNodes = slots.missingNodes & nodesData + return false + + when extraTraceMessages: + trace "Storage slots healing, nodes merged into database", peer, + ctx=buddy.healingCtx(kvp), nNodes=nodesData.len + + # Filter out error and leaf nodes + for w in report: + if w.slot.isSome: # non-indexed entries appear typically at the end, though + let + inx = w.slot.unsafeGet + nodePath = nodesData[inx] + + if w.error != NothingSerious or w.kind.isNone: + # error, try downloading again + slots.missingNodes.add nodePath + + elif w.kind.unsafeGet != Leaf: + # re-check this node + slots.checkNodes.add nodePath + + else: + # Node has been stored, double check + let (isLeaf, slotKey) = + buddy.kvStorageSlotsLeaf(kvp, nodePath, nodesData[inx]) + if isLeaf: + # Update `uprocessed` registry, collect storage roots (if any) + buddy.registerStorageSlotsLeaf(kvp, slotKey) + else: + slots.checkNodes.add nodePath + + when extraTraceMessages: + trace "Storage slots healing job done", peer, ctx=buddy.healingCtx(kvp) + + +proc healingIsComplete( + buddy: SnapBuddyRef; + kvp: SnapSlotsQueuePair; + ): Future[bool] + {.async.} = + ## Check whether the storage trie can be completely inherited and prepare for + ## healing if not. + ## + ## Returns `true` is the sub-trie is complete (probably inherited), and + ## `false` if there are nodes left to be completed. + let + ctx = buddy.ctx + db = ctx.data.snapDb + peer = buddy.peer + accHash = kvp.data.accHash + storageRoot = kvp.key.to(Hash256) + + # Check whether this work item can be completely inherited + if kvp.data.inherit: + let rc = db.inspectStorageSlotsTrie(peer, accHash, storageRoot) + + if rc.isErr: + # Oops, not much we can do here (looping trie?) + error "Problem inspecting storage trie", peer, storageRoot, error=rc.error + return false + + # Check whether the hexary trie can be inherited as-is. + if rc.value.dangling.len == 0: + return true # done + + # Set up healing structure for this work item + let slots = SnapTrieRangeBatchRef( + missingNodes: rc.value.dangling) + kvp.data.slots = slots + + # Full range covered vy unprocessed items + for n in 0 ..< kvp.data.slots.unprocessed.len: + slots.unprocessed[n] = NodeTagRangeSet.init() + discard slots.unprocessed[0].merge( + NodeTagRange.new(low(NodeTag),high(NodeTag))) + + # Proceed with healing + return await buddy.storageSlotsHealing(kvp) + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc healStoragesDb*(buddy: SnapBuddyRef) {.async.} = + ## Fetching and merging missing slorage slots trie database nodes. + let + ctx = buddy.ctx + db = ctx.data.snapDb + peer = buddy.peer + env = buddy.data.pivotEnv + var + toBeHealed: seq[SnapSlotsQueuePair] + + # Search the current slot item batch list for items to complete via healing + for kvp in env.fetchStorage.nextPairs: + + # Marked items indicate that a partial sub-trie existsts which might have + # been inherited from an earlier state root. + if not kvp.data.inherit: + let slots = kvp.data.slots + + # Otherwise check partally fetched sub-tries only if they have a certain + # degree of completeness. + if slots.isNil or slots.unprocessed.emptyFactor < healSlorageSlotsTrigger: + continue + + # Add to local batch to be processed, below + env.fetchStorage.del(kvp.key) # ok to delete this item from batch queue + toBeHealed.add kvp # to be held in local queue + if maxStoragesHeal <= toBeHealed.len: + break + + when extraTraceMessages: + let nToBeHealed = toBeHealed.len + if 0 < nToBeHealed: + trace "Processing storage healing items", peer, nToBeHealed + + # Run against local batch + for n in 0 ..< toBeHealed.len: + let + kvp = toBeHealed[n] + isComplete = await buddy.healingIsComplete(kvp) + if isComplete: + env.nStorage.inc + else: + env.fetchStorage.merge kvp + + if buddy.ctrl.stopped: + # Oops, peer has gone + env.fetchStorage.merge toBeHealed[n+1 ..< toBeHealed.len] + break + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/store_accounts.nim b/nimbus/sync/snap/worker/store_accounts.nim index be292b051..5a214cd04 100644 --- a/nimbus/sync/snap/worker/store_accounts.nim +++ b/nimbus/sync/snap/worker/store_accounts.nim @@ -115,9 +115,6 @@ proc storeAccounts*(buddy: SnapBuddyRef) {.async.} = return rc.value - when extraTraceMessages: - trace "Start fetching accounts", peer, stateRoot, iv - # Process received accounts and stash storage slots to fetch later let dd = block: let rc = await buddy.getAccountRange(stateRoot, iv) @@ -126,15 +123,20 @@ proc storeAccounts*(buddy: SnapBuddyRef) {.async.} = let error = rc.error if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): when extraTraceMessages: - trace "Error fetching accounts => stop", peer, error + trace "Error fetching accounts => stop", peer, + stateRoot, req=iv.len, error return # Reset error counts for detecting repeated timeouts buddy.data.errors.nTimeouts = 0 rc.value let - nAccounts = dd.data.accounts.len - nStorage = dd.withStorage.len + gotAccounts = dd.data.accounts.len + gotStorage = dd.withStorage.len + + when extraTraceMessages: + trace "Fetched accounts", peer, gotAccounts, gotStorage, + stateRoot, req=iv.len, got=dd.consumed block: let rc = ctx.data.snapDb.importAccounts(peer, stateRoot, iv.minPt, dd.data) @@ -144,13 +146,12 @@ proc storeAccounts*(buddy: SnapBuddyRef) {.async.} = buddy.ctrl.zombie = true when extraTraceMessages: let error = ComImportAccountsFailed - trace "Accounts import failed => stop", peer, stateRoot, - range=dd.consumed, nAccounts, nStorage, error + trace "Accounts import failed => stop", peer, gotAccounts, gotStorage, + stateRoot, req=iv.len, got=dd.consumed, error return # Statistics - env.nAccounts.inc(nAccounts) - env.nStorage.inc(nStorage) + env.nAccounts.inc(gotAccounts) # Register consumed intervals on the accumulator over all state roots buddy.markGloballyProcessed(dd.consumed) @@ -178,8 +179,8 @@ proc storeAccounts*(buddy: SnapBuddyRef) {.async.} = env.fetchStorage.merge dd.withStorage when extraTraceMessages: - trace "Done fetching accounts", peer, stateRoot, nAccounts, - withStorage=dd.withStorage.len, iv + trace "Done fetching accounts", peer, gotAccounts, gotStorage, + stateRoot, req=iv.len, got=dd.consumed # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/store_storages.nim b/nimbus/sync/snap/worker/store_storages.nim index 94cd44183..4d19a8373 100644 --- a/nimbus/sync/snap/worker/store_storages.nim +++ b/nimbus/sync/snap/worker/store_storages.nim @@ -11,20 +11,33 @@ ## Fetch accounts stapshot ## ======================= ## -## Worker items state diagram: +## Flow chart for storage slots download +## ------------------------------------- ## :: -## unprocessed slot requests | peer workers + storages database update -## =================================================================== +## {missing-storage-slots} <-----------------+ +## | | +## v | +## | +## | | +## v | +## {storage-slots} | +## | | +## v | +## | +## | | | +## v v | +## {completed} {partial} | +## | | | +## | +------------------------+ +## v +## ## -## +-----------------------------------------------+ -## | | -## v | -## ------------+-------> ------+ -## | | -## +-------> ------+ -## | | -## +-------> ------+ -## : : +## Legend: +## * `<..>`: some action, process, etc. +## * `{missing-storage-slots}`: list implemented as `env.fetchStorage` +## * `(storage-slots}`: list is optimised out +## * `{completed}`: list is optimised out +## * `{partial}`: list is optimised out ## import @@ -36,7 +49,7 @@ import ../../sync_desc, ".."/[range_desc, worker_desc], ./com/[com_error, get_storage_ranges], - ./db/[hexary_error, snapdb_storage_slots] + ./db/snapdb_storage_slots {.push raises: [Defect].} @@ -51,43 +64,82 @@ const # ------------------------------------------------------------------------------ proc getNextSlotItems(buddy: SnapBuddyRef): seq[AccountSlotsHeader] = + ## Get list of work item from the batch queue. + ## + ## * If the storage slots requested come with an explicit sub-range of slots + ## (i.e. not an implied complete list), then the result has only on work + ## item. An explicit list of slots is only calculated if there was a queue + ## item with a partially completed slots download. + ## + ## * Otherwise, a list of at most `maxStoragesFetch` work items is returned. + ## These work items were checked for that there was no trace of a previously + ## installed (probably partial) storage trie on the database (e.g. inherited + ## from an earlier state root pivot.) + ## + ## If there is an indication that the storage trie may have some data + ## already it is ignored here and marked `inherit` so that it will be + ## picked up by the healing process. let + ctx = buddy.ctx + peer = buddy.peer env = buddy.data.pivotEnv + (reqKey, reqData) = block: - let rc = env.fetchStorage.shift + let rc = env.fetchStorage.first # peek if rc.isErr: return (rc.value.key, rc.value.data) - # Assemble first request - result.add AccountSlotsHeader( - accHash: reqData.accHash, - storageRoot: Hash256(data: reqKey)) - - # Check whether it comes with a sub-range - if not reqData.slots.isNil: - # Extract some interval and return single item request queue + # Assemble first request which might come with a sub-range. + while not reqData.slots.isNil: + # Extract first interval and return single item request queue for ivSet in reqData.slots.unprocessed: let rc = ivSet.ge() if rc.isOk: - # Extraxt interval => done - result[0].subRange = some rc.value + # Extraxt this interval from the range set discard ivSet.reduce rc.value - # Puch back on batch queue unless it becomes empty - if not reqData.slots.unprocessed.isEmpty: - discard env.fetchStorage.unshift(reqKey, reqData) - return + # Delete from batch queue if the range set becomes empty + if reqData.slots.unprocessed.isEmpty: + env.fetchStorage.del(reqKey) - # Append more full requests to returned list - while result.len < maxStoragesFetch: - let rc = env.fetchStorage.shift - if rc.isErr: - return - result.add AccountSlotsHeader( - accHash: rc.value.data.accHash, - storageRoot: Hash256(data: rc.value.key)) + when extraTraceMessages: + trace "Prepare fetching partial storage slots", peer, + nStorageQueue=env.fetchStorage.len, subRange=rc.value, + account=reqData.accHash.to(NodeTag) + + return @[AccountSlotsHeader( + accHash: reqData.accHash, + storageRoot: reqKey.to(Hash256), + subRange: some rc.value)] + + # Oops, empty range set? Remove range and move item to the right end + reqData.slots = nil + discard env.fetchStorage.lruFetch(reqKey) + + # So there are no partial ranges (aka `slots`) anymore. Assemble maximal + # request queue. + for kvp in env.fetchStorage.nextPairs: + let it = AccountSlotsHeader( + accHash: kvp.data.accHash, + storageRoot: kvp.key.to(Hash256)) + + # Verify whether a storage sub-trie exists, already + if kvp.data.inherit or + ctx.data.snapDb.haveStorageSlotsData(peer, it.accHash, it.storageRoot): + kvp.data.inherit = true + when extraTraceMessages: + trace "Inheriting storage slots", peer, + nStorageQueue=env.fetchStorage.len, account=it.accHash.to(NodeTag) + continue + + result.add it + env.fetchStorage.del(kvp.key) # ok to delete this item from batch queue + + # Maximal number of items to fetch + if maxStoragesFetch <= result.len: + break # ------------------------------------------------------------------------------ # Public functions @@ -110,63 +162,90 @@ proc storeStorages*(buddy: SnapBuddyRef) {.async.} = if req.len == 0: return # currently nothing to do - when extraTraceMessages: - trace "Start fetching storage slots", peer, - nSlots=env.fetchStorage.len, - nReq=req.len - # Get storages slots data from the network var stoRange = block: let rc = await buddy.getStorageRanges(stateRoot, req) if rc.isErr: + env.fetchStorage.merge req + let error = rc.error if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): - trace "Error fetching storage slots => stop", peer, - nSlots=env.fetchStorage.len, - nReq=req.len, - error discard - env.fetchStorage.merge req + trace "Error fetching storage slots => stop", peer, + nReq=req.len, nStorageQueue=env.fetchStorage.len, error return rc.value # Reset error counts for detecting repeated timeouts buddy.data.errors.nTimeouts = 0 - if 0 < stoRange.data.storages.len: + var gotStorage = stoRange.data.storages.len + + when extraTraceMessages: + trace "Fetched storage slots", peer, gotStorage, + nLeftOvers=stoRange.leftOver.len, nReq=req.len, + nStorageQueue=env.fetchStorage.len + + if 0 < gotStorage: # Verify/process storages data and save it to disk - let report = ctx.data.snapDb.importStorages(peer, stoRange.data) + let report = ctx.data.snapDb.importStorageSlots(peer, stoRange.data) + if 0 < report.len: + let topStoRange = stoRange.data.storages.len - 1 if report[^1].slot.isNone: # Failed to store on database, not much that can be done here - trace "Error writing storage slots to database", peer, - nSlots=env.fetchStorage.len, - nReq=req.len, - error=report[^1].error env.fetchStorage.merge req + gotStorage.dec(report.len - 1) # for logging only + + error "Error writing storage slots to database", peer, gotStorage, + nReq=req.len, nStorageQueue=env.fetchStorage.len, + error=report[^1].error return # Push back error entries to be processed later for w in report: - if w.slot.isSome: - let n = w.slot.unsafeGet - # if w.error in {RootNodeMismatch, RightBoundaryProofFailed}: - # ??? - trace "Error processing storage slots", peer, - nSlots=env.fetchStorage.len, - nReq=req.len, - nReqInx=n, - error=report[n].error - # Reset any partial requests to requesting the full interval. So - # all the storage slots are re-fetched completely for this account. - env.fetchStorage.merge AccountSlotsHeader( - accHash: stoRange.data.storages[n].account.accHash, - storageRoot: stoRange.data.storages[n].account.storageRoot) + # All except the last item always index to a node argument. The last + # item has been checked for, already. + let inx = w.slot.get + + # if w.error in {RootNodeMismatch, RightBoundaryProofFailed}: + # ??? + + # Reset any partial result (which would be the last entry) to + # requesting the full interval. So all the storage slots are + # re-fetched completely for this account. + env.fetchStorage.merge AccountSlotsHeader( + accHash: stoRange.data.storages[inx].account.accHash, + storageRoot: stoRange.data.storages[inx].account.storageRoot) + + # Last entry might be partial + if inx == topStoRange: + # No partial result processing anymore to consider + stoRange.data.proof = @[] + + # Update local statistics counter for `nStorage` counter update + gotStorage.dec + + trace "Error processing storage slots", peer, gotStorage, + nReqInx=inx, nReq=req.len, nStorageQueue=env.fetchStorage.len, + error=report[inx].error + + # Update statistics + if gotStorage == 1 and + req[0].subRange.isSome and + env.fetchStorage.hasKey req[0].storageRoot.to(NodeKey): + # Successful partial request, but not completely done with yet. + gotStorage = 0 + + env.nStorage.inc(gotStorage) + + # Return unprocessed left overs to batch queue + env.fetchStorage.merge stoRange.leftOver when extraTraceMessages: - trace "Done fetching storage slots", peer, - nSlots=env.fetchStorage.len + trace "Done fetching storage slots", peer, gotStorage, + nStorageQueue=env.fetchStorage.len # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/ticker.nim b/nimbus/sync/snap/worker/ticker.nim index 4c1f59b4c..94b203ece 100644 --- a/nimbus/sync/snap/worker/ticker.nim +++ b/nimbus/sync/snap/worker/ticker.nim @@ -27,9 +27,9 @@ type TickerStats* = object pivotBlock*: Option[BlockNumber] nAccounts*: (float,float) ## mean and standard deviation - nStorage*: (float,float) ## mean and standard deviation accountsFill*: (float,float,float) ## mean, standard deviation, merged total - accCoverage*: float ## as factor + nStorage*: (float,float) ## mean and standard deviation + storageQueue*: (float,float) ## mean and standard deviation nQueues*: int TickerStatsUpdater* = @@ -107,17 +107,18 @@ proc setLogTicker(t: TickerRef; at: Moment) {.gcsafe.} proc runLogTicker(t: TickerRef) {.gcsafe.} = let data = t.statsCb() - if data != t.lastStats or - t.lastTick + tickerLogSuppressMax < t.tick: + if data != t.lastStats or t.lastTick + tickerLogSuppressMax < t.tick: t.lastStats = data t.lastTick = t.tick var - nAcc, nStore, bulk: string + nAcc, nSto, bulk: string pivot = "n/a" let accCov = data.accountsFill[0].toPC(1) & "(" & data.accountsFill[1].toPC(1) & ")" & "/" & data.accountsFill[2].toPC(0) + stoQue = data.storageQueue[0].uint64.toSI & + "(" & data.storageQueue[1].uint64.toSI & ")" buddies = t.nBuddies tick = t.tick.toSI mem = getTotalMem().uint.toSI @@ -126,10 +127,10 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} = if data.pivotBlock.isSome: pivot = &"#{data.pivotBlock.get}/{data.nQueues}" nAcc = &"{(data.nAccounts[0]+0.5).int64}({(data.nAccounts[1]+0.5).int64})" - nStore = &"{(data.nStorage[0]+0.5).int64}({(data.nStorage[1]+0.5).int64})" + nSto = &"{(data.nStorage[0]+0.5).int64}({(data.nStorage[1]+0.5).int64})" info "Snap sync statistics", - tick, buddies, pivot, nAcc, accCov, nStore, mem + tick, buddies, pivot, nAcc, accCov, nSto, stoQue, mem t.tick.inc t.setLogTicker(Moment.fromNow(tickerLogInterval)) diff --git a/nimbus/sync/snap/worker_desc.nim b/nimbus/sync/snap/worker_desc.nim index 7bcafd8b7..b57e2eb0c 100644 --- a/nimbus/sync/snap/worker_desc.nim +++ b/nimbus/sync/snap/worker_desc.nim @@ -58,8 +58,15 @@ const ## all account ranges retrieved for all pivot state roots (see ## `coveredAccounts` in `CtxData`.) - maxStoragesFetch* = 128 - ## Maximal number of storage tries to fetch with a signe message. + healSlorageSlotsTrigger* = 0.70 + ## Consider per account storage slost healing if this particular sub-trie + ## has reached this factor of completeness + + maxStoragesFetch* = 5 * 1024 + ## Maximal number of storage tries to fetch with a single message. + + maxStoragesHeal* = 32 + ## Maximal number of storage tries to to heal in a single batch run. maxTrieNodeFetch* = 1024 ## Informal maximal number of trie nodes to fetch at once. This is nor @@ -84,22 +91,26 @@ const ## Internal size of LRU cache (for debugging) type - WorkerSeenBlocks = KeyedQueue[ByteArray32,BlockNumber] + WorkerSeenBlocks = KeyedQueue[NodeKey,BlockNumber] ## Temporary for pretty debugging, `BlockHash` keyed lru cache - SnapSlotsQueue* = KeyedQueue[ByteArray32,SnapSlotQueueItemRef] + SnapSlotsQueue* = KeyedQueue[NodeKey,SnapSlotQueueItemRef] ## Handles list of storage slots data for fetch indexed by storage root. ## ## Typically, storage data requests cover the full storage slots trie. If ## there is only a partial list of slots to fetch, the queue entry is ## stored left-most for easy access. + SnapSlotsQueuePair* = KeyedQueuePair[NodeKey,SnapSlotQueueItemRef] + ## Key-value return code from `SnapSlotsQueue` handler + SnapSlotQueueItemRef* = ref object ## Storage slots request data. This entry is similar to `AccountSlotsHeader` ## where the optional `subRange` interval has been replaced by an interval ## range + healing support. accHash*: Hash256 ## Owner account, maybe unnecessary slots*: SnapTrieRangeBatchRef ## slots to fetch, nil => all slots + inherit*: bool ## mark this trie seen already SnapSlotsSet* = HashSet[SnapSlotQueueItemRef] ## Ditto but without order, to be used as veto set @@ -126,7 +137,6 @@ type # Accounts download fetchAccounts*: SnapTrieRangeBatch ## Set of accounts ranges to fetch - # vetoSlots*: SnapSlotsSet ## Do not ask for these slots, again accountsDone*: bool ## All accounts have been processed # Storage slots download @@ -134,8 +144,8 @@ type serialSync*: bool ## Done with storage, block sync next # Info - nAccounts*: uint64 ## Number of accounts imported - nStorage*: uint64 ## Number of storage spaces imported + nAccounts*: uint64 ## Imported # of accounts + nStorage*: uint64 ## Imported # of account storage tries SnapPivotTable* = ##\ ## LRU table, indexed by state root @@ -186,18 +196,61 @@ proc hash*(a: Hash256): Hash = # Public helpers # ------------------------------------------------------------------------------ +proc merge*(q: var SnapSlotsQueue; kvp: SnapSlotsQueuePair) = + ## Append/prepend a queue item record into the batch queue. + let + reqKey = kvp.key + rc = q.eq(reqKey) + if rc.isOk: + # Entry exists already + let qData = rc.value + if not qData.slots.isNil: + # So this entry is not maximal and can be extended + if kvp.data.slots.isNil: + # Remove restriction for this entry and move it to the right end + qData.slots = nil + discard q.lruFetch(reqKey) + else: + # Merge argument intervals into target set + for ivSet in kvp.data.slots.unprocessed: + for iv in ivSet.increasing: + discard qData.slots.unprocessed[0].reduce(iv) + discard qData.slots.unprocessed[1].merge(iv) + else: + # Only add non-existing entries + if kvp.data.slots.isNil: + # Append full range to the right of the list + discard q.append(reqKey, kvp.data) + else: + # Partial range, add healing support and interval + discard q.unshift(reqKey, kvp.data) + proc merge*(q: var SnapSlotsQueue; fetchReq: AccountSlotsHeader) = ## Append/prepend a slot header record into the batch queue. - let reqKey = fetchReq.storageRoot.data - - if not q.hasKey(reqKey): + let + reqKey = fetchReq.storageRoot.to(NodeKey) + rc = q.eq(reqKey) + if rc.isOk: + # Entry exists already + let qData = rc.value + if not qData.slots.isNil: + # So this entry is not maximal and can be extended + if fetchReq.subRange.isNone: + # Remove restriction for this entry and move it to the right end + qData.slots = nil + discard q.lruFetch(reqKey) + else: + # Merge argument interval into target set + let iv = fetchReq.subRange.unsafeGet + discard qData.slots.unprocessed[0].reduce(iv) + discard qData.slots.unprocessed[1].merge(iv) + else: let reqData = SnapSlotQueueItemRef(accHash: fetchReq.accHash) # Only add non-existing entries if fetchReq.subRange.isNone: # Append full range to the right of the list discard q.append(reqKey, reqData) - else: # Partial range, add healing support and interval reqData.slots = SnapTrieRangeBatchRef() @@ -206,7 +259,9 @@ proc merge*(q: var SnapSlotsQueue; fetchReq: AccountSlotsHeader) = discard reqData.slots.unprocessed[0].merge(fetchReq.subRange.unsafeGet) discard q.unshift(reqKey, reqData) -proc merge*(q: var SnapSlotsQueue; reqList: openArray[AccountSlotsHeader]) = +proc merge*( + q: var SnapSlotsQueue; + reqList: openArray[SnapSlotsQueuePair|AccountSlotsHeader]) = ## Variant fof `merge()` for a list argument for w in reqList: q.merge w @@ -217,30 +272,31 @@ proc merge*(q: var SnapSlotsQueue; reqList: openArray[AccountSlotsHeader]) = proc pp*(ctx: SnapCtxRef; bh: BlockHash): string = ## Pretty printer for debugging - let rc = ctx.data.seenBlock.lruFetch(bh.to(Hash256).data) + let rc = ctx.data.seenBlock.lruFetch(bh.Hash256.to(NodeKey)) if rc.isOk: return "#" & $rc.value "%" & $bh.to(Hash256).data.toHex proc pp*(ctx: SnapCtxRef; bh: BlockHash; bn: BlockNumber): string = ## Pretty printer for debugging - let rc = ctx.data.seenBlock.lruFetch(bh.to(Hash256).data) + let rc = ctx.data.seenBlock.lruFetch(bh.Hash256.to(NodeKey)) if rc.isOk: return "#" & $rc.value - "#" & $ctx.data.seenBlock.lruAppend(bh.to(Hash256).data, bn, seenBlocksMax) + "#" & $ctx.data.seenBlock.lruAppend(bh.Hash256.to(NodeKey), bn, seenBlocksMax) proc pp*(ctx: SnapCtxRef; bhn: HashOrNum): string = if not bhn.isHash: return "#" & $bhn.number - let rc = ctx.data.seenBlock.lruFetch(bhn.hash.data) + let rc = ctx.data.seenBlock.lruFetch(bhn.hash.to(NodeKey)) if rc.isOk: return "%" & $rc.value return "%" & $bhn.hash.data.toHex proc seen*(ctx: SnapCtxRef; bh: BlockHash; bn: BlockNumber) = ## Register for pretty printing - if not ctx.data.seenBlock.lruFetch(bh.to(Hash256).data).isOk: - discard ctx.data.seenBlock.lruAppend(bh.to(Hash256).data, bn, seenBlocksMax) + if not ctx.data.seenBlock.lruFetch(bh.Hash256.to(NodeKey)).isOk: + discard ctx.data.seenBlock.lruAppend( + bh.Hash256.to(NodeKey), bn, seenBlocksMax) proc pp*(a: MDigest[256]; collapse = true): string = if not collapse: diff --git a/nimbus/sync/sync_desc.nim b/nimbus/sync/sync_desc.nim index 4c89bbc6f..7b424dd8c 100644 --- a/nimbus/sync/sync_desc.nim +++ b/nimbus/sync/sync_desc.nim @@ -16,7 +16,8 @@ import eth/[common, p2p], ../p2p/chain, - ../db/db_chain + ../db/db_chain, + ./handlers export chain, @@ -46,6 +47,7 @@ type CtxRef*[S] = ref object ## Shared state among all syncing peer workers (aka buddies.) buddiesMax*: int ## Max number of buddies + ethWireCtx*: EthWireRef ## Eth protocol wire context (if available) chain*: Chain ## Block chain database (no need for `Peer`) poolMode*: bool ## Activate `runPool()` workers if set `true` data*: S ## Shared context for all worker peers diff --git a/nimbus/sync/sync_sched.nim b/nimbus/sync/sync_sched.nim index 0637a5e9f..c6c4b517b 100644 --- a/nimbus/sync/sync_sched.nim +++ b/nimbus/sync/sync_sched.nim @@ -74,7 +74,7 @@ import chronos, eth/[common/eth_types, p2p, p2p/peer_pool, p2p/private/p2p_types], stew/keyed_queue, - ./sync_desc + "."/[handlers, sync_desc] {.push raises: [Defect].} @@ -266,6 +266,7 @@ proc initSync*[S,W]( # are full. The effect is that a re-connect on the latest zombie will be # rejected as long as its worker descriptor is registered. dsc.ctx = CtxRef[S]( + ethWireCtx: cast[EthWireRef](node.protocolState protocol.eth), buddiesMax: max(1, slots + 1), chain: chain) dsc.pool = node.peerPool diff --git a/tests/test_sync_snap.nim b/tests/test_sync_snap.nim index fd9f58799..9b46f3339 100644 --- a/tests/test_sync_snap.nim +++ b/tests/test_sync_snap.nim @@ -321,16 +321,16 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) = # need to check for additional records only on either end of a range. var keySet = packed.accounts.mapIt(it.accHash).toHashSet for w in accountsList: - var key = desc.prevChainDbKey(w.data.accounts[0].accHash) + var key = desc.prevAccountsChainDbKey(w.data.accounts[0].accHash) while key.isOk and key.value notin keySet: keySet.incl key.value - let newKey = desc.prevChainDbKey(key.value) + let newKey = desc.prevAccountsChainDbKey(key.value) check newKey != key key = newKey - key = desc.nextChainDbKey(w.data.accounts[^1].accHash) + key = desc.nextAccountsChainDbKey(w.data.accounts[^1].accHash) while key.isOk and key.value notin keySet: keySet.incl key.value - let newKey = desc.nextChainDbKey(key.value) + let newKey = desc.nextAccountsChainDbKey(key.value) check newKey != key key = newKey accKeys = toSeq(keySet).mapIt(it.to(NodeTag)).sorted(cmp) @@ -346,9 +346,9 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) = count.inc let pfx = $count & "#" - byChainDB = desc.getChainDbAccount(accHash) - byNextKey = desc.nextChainDbKey(accHash) - byPrevKey = desc.prevChainDbKey(accHash) + byChainDB = desc.getAccountsChainDb(accHash) + byNextKey = desc.nextAccountsChainDbKey(accHash) + byPrevKey = desc.prevAccountsChainDbKey(accHash) noisy.say "*** find", "<", count, "> byChainDb=", byChainDB.pp check byChainDB.isOk @@ -431,7 +431,7 @@ proc storagesRunner( test &"Merging {storagesList.len} storages lists": let - dbDesc = SnapDbStorageSlotsRef.init(dbBase, peer=peer) + dbDesc = SnapDbStorageSlotsRef.init(dbBase,Hash256(),Hash256(),peer) ignore = knownFailures.toTable for n,w in storagesList: let @@ -440,7 +440,7 @@ proc storagesRunner( Result[void,seq[(int,HexaryDbError)]].err(ignore[testId]) else: OkStoDb - check dbDesc.importStorages(w.data, persistent).toStoDbRc == expRc + check dbDesc.importStorageSlots(w.data, persistent).toStoDbRc == expRc proc inspectionRunner( noisy = true;